Source code for pymavryk.michelson.types.domain

from decimal import Decimal
from typing import Type
from typing import cast

from pymavryk.context.abstract import AbstractContext
from pymavryk.context.abstract import get_originated_address
from pymavryk.crypto.encoding import base58_decode
from pymavryk.crypto.encoding import is_address
from pymavryk.crypto.encoding import is_chain_id
from pymavryk.crypto.encoding import is_kt
from pymavryk.crypto.encoding import is_pkh
from pymavryk.crypto.encoding import is_public_key
from pymavryk.crypto.encoding import is_sig
from pymavryk.crypto.encoding import is_txr_address
from pymavryk.michelson.forge import forge_address
from pymavryk.michelson.forge import forge_base58
from pymavryk.michelson.forge import forge_contract
from pymavryk.michelson.forge import forge_public_key
from pymavryk.michelson.forge import optimize_timestamp
from pymavryk.michelson.forge import unforge_address
from pymavryk.michelson.forge import unforge_chain_id
from pymavryk.michelson.forge import unforge_contract
from pymavryk.michelson.forge import unforge_public_key
from pymavryk.michelson.forge import unforge_signature
from pymavryk.michelson.format import format_timestamp
from pymavryk.michelson.format import micheline_to_michelson
from pymavryk.michelson.micheline import Micheline
from pymavryk.michelson.micheline import parse_micheline_literal
from pymavryk.michelson.parse import michelson_to_micheline
from pymavryk.michelson.types.base import Undefined
from pymavryk.michelson.types.core import IntType
from pymavryk.michelson.types.core import MichelsonType
from pymavryk.michelson.types.core import NatType
from pymavryk.michelson.types.core import StringType


[docs]class TimestampType(IntType, prim='timestamp'): # type: ignore
[docs] @classmethod def from_value(cls, value: int) -> 'TimestampType': return cls(value)
[docs] @classmethod def from_micheline_value(cls, val_expr) -> 'TimestampType': value = parse_micheline_literal(val_expr, {'int': int, 'string': optimize_timestamp}) return cls.from_value(value)
[docs] @classmethod def from_python_object(cls, py_obj) -> 'TimestampType': if isinstance(py_obj, int): value = py_obj elif isinstance(py_obj, str): value = optimize_timestamp(py_obj) else: raise AssertionError(f'unexpected value type {py_obj}') return cls.from_value(value)
[docs] def to_micheline_value(self, mode='readable', lazy_diff=False): if mode in ['optimized', 'legacy_optimized']: return {'int': str(self.value)} elif mode == 'readable': return {'string': format_timestamp(self.value)} else: raise AssertionError(f'unsupported mode {mode}')
[docs] def to_python_object(self, try_unpack=False, lazy_diff=False, comparable=False): return self.value
[docs]class MumavType(NatType, prim='mumav'): def __repr__(self): return str(Decimal(self.value) / 10**6)
[docs] @classmethod def from_value(cls, value: int) -> 'MumavType': assert value >= 0, f'expected natural number, got {value}' if value.bit_length() > 63: raise OverflowError(f'mumav overflow, got {value.bit_length()} bits, should not exceed 63') return cls(value)
[docs] @classmethod def from_python_object(cls, py_obj) -> 'MumavType': if isinstance(py_obj, int): value = py_obj elif isinstance(py_obj, Decimal): value = int(py_obj * (10**6)) elif isinstance(py_obj, str): value = int(Decimal(py_obj) * (10**6)) else: raise AssertionError(f'unexpected value type {py_obj}') return cls.from_value(value)
[docs]class AddressType(StringType, prim='address'): def __repr__(self): return f'{self.value[:6]}{self.value[-3:]}' def __lt__(self, other: 'AddressType') -> bool: # type: ignore if is_pkh(self.value) and is_kt(other.value): return True elif is_kt(self.value) and is_pkh(other.value): return False else: return self.value < other.value
[docs] @classmethod def dummy(cls, context: AbstractContext) -> 'AddressType': return cls.from_value(context.get_dummy_address())
[docs] @classmethod def from_value(cls, value: str) -> 'AddressType': if value.endswith('%default'): value = value.split('%')[0] assert is_address(value), f'expected mv/KT/sr address, got {value}' return cls(value)
[docs] @classmethod def from_micheline_value(cls, val_expr) -> 'AddressType': value = parse_micheline_literal( val_expr, { 'bytes': lambda x: unforge_contract(bytes.fromhex(x)), 'string': lambda x: x, }, ) return cls.from_value(value)
[docs] @classmethod def from_python_object(cls, py_obj) -> 'AddressType': return cls.from_value(py_obj)
[docs] def to_micheline_value(self, mode='readable', lazy_diff=False): if mode in ['optimized', 'legacy_optimized']: return {'bytes': forge_contract(self.value).hex()} # because address can also have an entrypoint elif mode == 'readable': return {'string': self.value} else: raise AssertionError(f'unsupported mode {mode}')
[docs] def to_python_object(self, try_unpack=False, lazy_diff=False, comparable=False): return self.value
[docs]class TXRAddress(StringType, prim='tx_rollup_l2_address'): def __repr__(self): return f'{self.value[:6]}{self.value[-3:]}' def __lt__(self, other: 'TXRAddress') -> bool: # type: ignore if is_kt(other.value): return True elif is_pkh(other.value): return False else: return self.value < other.value
[docs] @classmethod def dummy(cls, context: AbstractContext) -> 'TXRAddress': return cls.from_value(context.get_dummy_txr_address())
[docs] @classmethod def from_value(cls, value: str) -> 'TXRAddress': if value.endswith('%default'): value = value.split('%')[0] assert is_txr_address(value), f'expected txr1 address, got {value}' return cls(value)
[docs] @classmethod def from_micheline_value(cls, val_expr) -> 'TXRAddress': value = parse_micheline_literal( val_expr, { 'bytes': lambda x: unforge_contract(bytes.fromhex(x)), 'string': lambda x: x, }, ) return cls.from_value(value)
[docs] @classmethod def from_python_object(cls, py_obj) -> 'TXRAddress': return cls.from_value(py_obj)
[docs] def to_micheline_value(self, mode='readable', lazy_diff=False): if mode in ['optimized', 'legacy_optimized']: return {'bytes': forge_contract(self.value).hex()} # because address can also have an entrypoint elif mode == 'readable': return {'string': self.value} else: raise AssertionError(f'unsupported mode {mode}')
[docs] def to_python_object(self, try_unpack=False, lazy_diff=False, comparable=False): return self.value
[docs]class KeyType(StringType, prim='key'): @property def raw(self) -> bytes: return base58_decode(self.value.encode()) @property def prefix(self) -> str: return self.value[:4] def __lt__(self, other: 'KeyType') -> bool: # type: ignore """ Keys are ordered as follows: edpk < sppk < p2pk All keys are in compressed form in Mavryk (flag | X) where flag specifies if Y is odd or even https://crypto.stackexchange.com/questions/70754/ec-key-compression For secp256r1 (aka p256) we need to cut the first byte (for unknown reason) """ curves = { 'edpk': (0, 0), 'sppk': (1, 0), 'p2pk': (2, 1), } res = curves[self.prefix][0] - curves[other.prefix][0] if res < 0: return True elif res > 0: return False else: offset = curves[self.prefix][1] return self.raw[offset:] < other.raw[offset:]
[docs] @classmethod def dummy(cls, context: AbstractContext) -> 'KeyType': return cls.from_value(context.get_dummy_public_key())
[docs] @classmethod def from_value(cls, value: str) -> 'KeyType': assert is_public_key(value), f'expected ed/sp/p2 public key, got {value}' return cls(value)
[docs] @classmethod def from_micheline_value(cls, val_expr) -> 'KeyType': value = parse_micheline_literal( val_expr, { 'bytes': lambda x: unforge_public_key(bytes.fromhex(x)), 'string': lambda x: x, }, ) return cls.from_value(value)
[docs] @classmethod def parse_python_object(cls, py_obj) -> 'KeyType': return cls.from_value(py_obj)
[docs] def to_micheline_value(self, mode='readable', lazy_diff=False): if mode in ['optimized', 'legacy_optimized']: return {'bytes': forge_public_key(self.value).hex()} elif mode == 'readable': return {'string': self.value} else: raise AssertionError(f'unsupported mode {mode}')
[docs] def to_python_object(self, try_unpack=False, lazy_diff=False, comparable=False): return self.value
[docs]class KeyHashType(StringType, prim='key_hash'):
[docs] @classmethod def dummy(cls, context: AbstractContext) -> 'KeyHashType': return cls.from_value(context.get_dummy_key_hash())
[docs] @classmethod def from_value(cls, value: str) -> 'KeyHashType': assert is_pkh(value), f'expected mv1/mv2/mv3 key hash, got {value}' return cls(value)
[docs] @classmethod def from_micheline_value(cls, val_expr) -> 'KeyHashType': value = parse_micheline_literal( val_expr, {'bytes': lambda x: unforge_address(bytes.fromhex(x)), 'string': lambda x: x} ) return cls.from_value(value)
[docs] @classmethod def parse_python_object(cls, py_obj): return cls.from_value(py_obj)
[docs] def to_micheline_value(self, mode='readable', lazy_diff=False): if mode in ['optimized', 'legacy_optimized']: return {'bytes': forge_address(self.value, tz_only=True).hex()} elif mode == 'readable': return {'string': self.value} else: raise AssertionError(f'unsupported mode {mode}')
[docs] def to_python_object(self, try_unpack=False, lazy_diff=False, comparable=False): return self.value
[docs]class SignatureType(StringType, prim='signature'):
[docs] @classmethod def dummy(cls, context: AbstractContext) -> 'SignatureType': return cls.from_value(context.get_dummy_signature())
[docs] @classmethod def from_value(cls, value: str) -> 'SignatureType': assert is_sig(value), f'expected signature, got {value}' return cls(value)
[docs] @classmethod def from_micheline_value(cls, val_expr) -> 'SignatureType': value = parse_micheline_literal( val_expr, { 'bytes': lambda x: unforge_signature(bytes.fromhex(x)), 'string': lambda x: x, }, ) return cls.from_value(value)
[docs] @classmethod def from_python_object(cls, py_obj) -> 'SignatureType': return cls.from_value(py_obj)
[docs] def to_micheline_value(self, mode='readable', lazy_diff=False): if mode in ['optimized', 'legacy_optimized']: return {'bytes': forge_base58(self.value).hex()} elif mode == 'readable': return {'string': self.value} else: raise AssertionError(f'unsupported mode {mode}')
[docs] def to_python_object(self, try_unpack=False, lazy_diff=False, comparable=False): return self.value
[docs]class ChainIdType(StringType, prim='chain_id'):
[docs] @classmethod def dummy(cls, context: AbstractContext) -> 'ChainIdType': return cls.from_value(context.get_dummy_chain_id())
[docs] @classmethod def from_value(cls, value: str) -> 'ChainIdType': assert is_chain_id(value), f'expected chain id, got {value}' return cls(value)
[docs] @classmethod def from_micheline_value(cls, val_expr) -> 'ChainIdType': value = parse_micheline_literal( val_expr, {'bytes': lambda x: unforge_chain_id(bytes.fromhex(x)), 'string': lambda x: x} ) return cls.from_value(value)
[docs] @classmethod def from_python_object(cls, py_obj) -> 'ChainIdType': return cls.from_value(py_obj)
[docs] def to_micheline_value(self, mode='readable', lazy_diff=False): if mode in ['optimized', 'legacy_optimized']: return {'bytes': forge_base58(self.value).hex()} elif mode == 'readable': return {'string': self.value} else: raise AssertionError(f'unsupported mode {mode}')
[docs] def to_python_object(self, try_unpack=False, lazy_diff=False, comparable=False): return self.value
[docs]class ContractType(AddressType, prim='contract', args_len=1): def __repr__(self): address, entrypoint = self.get_address(), self.get_entrypoint() return f'{address[:6]}{address[-3:]}%{entrypoint}'
[docs] @classmethod def generate_pydoc(cls, definitions: list, inferred_name=None, comparable=False): super(ContractType, cls).generate_pydoc(definitions) param_expr = micheline_to_michelson(cls.args[0].as_micheline_expr()) if cls.args[0].args: name = cls.field_name or cls.type_name or inferred_name or f'{cls.prim}_{len(definitions)}' param_name = f'{name}_param' definitions.insert(0, (param_name, param_expr)) return f'contract (${param_name})' else: return f'contract ({param_expr})'
[docs] @classmethod def from_python_object(cls, py_obj) -> 'ContractType': if py_obj is None or py_obj is Undefined: py_obj = get_originated_address(0) res = super(ContractType, cls).from_python_object(py_obj) return cast(ContractType, res)
[docs] def get_address(self) -> str: return self.value.split('%')[0]
[docs] def get_entrypoint(self) -> str: res = self.value.split('%') return res[1] if len(res) == 2 else 'default'
[docs] def to_python_object(self, try_unpack=False, lazy_diff=False, comparable=False): assert not comparable, f'{self.prim} is not comparable' return super(ContractType, self).to_python_object()
[docs]class LambdaType(MichelsonType, prim='lambda', args_len=2): # type: ignore def __init__(self, value: Type[Micheline]): super(LambdaType, self).__init__() self.value = value def __repr__(self): return 'Lambda' def __eq__(self, other) -> bool: if not isinstance(other, LambdaType): return False # FIXME: That seems ugly return self.value.as_micheline_expr() == other.value.as_micheline_expr()
[docs] @classmethod def generate_pydoc(cls, definitions: list, inferred_name=None, comparable=False): super(LambdaType, cls).generate_pydoc(definitions) name = cls.field_name or cls.type_name or inferred_name or f'{cls.prim}_{len(definitions)}' expr = {} for i, suffix in enumerate(['return', 'param']): arg_expr = micheline_to_michelson(cls.args[i].as_micheline_expr()) if cls.args[i].args: arg_name = f'{name}_{suffix}' definitions.insert(0, (arg_name, arg_expr)) expr[suffix] = f'${arg_name}' else: expr[suffix] = arg_expr return f'lambda ({expr["param"]} -> {expr["return"]})'
[docs] @classmethod def dummy(cls, context: AbstractContext) -> 'LambdaType': return cls(Micheline.match(context.get_dummy_lambda()))
[docs] @classmethod def from_micheline_value(cls, val_expr) -> 'LambdaType': assert isinstance(val_expr, list), f'expected list, got {type(val_expr).__name__}' return cls(Micheline.match(val_expr))
[docs] @classmethod def from_python_object(cls, py_obj) -> 'LambdaType': assert isinstance(py_obj, str), f'expected string, got {type(py_obj).__name__}' value = michelson_to_micheline(py_obj) return cls.from_micheline_value(value)
[docs] def to_literal(self) -> Type[Micheline]: return self.value
[docs] def to_micheline_value(self, mode='readable', lazy_diff=False): # TODO: optimized mode -> harcoded values in the code return self.value.as_micheline_expr()
[docs] def to_python_object(self, try_unpack=False, lazy_diff=False, comparable=False): assert not comparable, f'{self.prim} is not comparable' return micheline_to_michelson(self.to_micheline_value())