Source code for pymavryk.michelson.types.base

from collections.abc import Iterable
from copy import copy
from copy import deepcopy
from typing import Any
from typing import Callable
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from typing import Union
from typing import cast

from pymavryk.context.abstract import AbstractContext
from pymavryk.michelson.forge import forge_micheline
from pymavryk.michelson.forge import unforge_micheline
from pymavryk.michelson.micheline import Micheline

type_mappings = {
    'nat': 'int  /* Natural number */',
    'unit': 'None || Unit /* `from pymavryk import Unit` for resolving None ambiguity */',
    'bytes': 'str  /* Hex string */ ||\n\tbytes  /* Python byte string */',
    'timestamp': 'int  /* Unix time in seconds */ ||\n\tstring  /* Formatted datetime `%Y-%m-%dT%H:%M:%SZ` */',
    'mumav': 'int  /* Amount in `utz` (10^-6) */ ||\n\tDecimal  /* Amount in `tz` */',
    'contract': 'str  /* Base58 encoded `KT` address with optional entrypoint */'
    ' ||\n\tNone  /* when you need to avoid type checking */'
    ' ||\n\tUndefined  /* `from pymavryk import Undefined` for resolving None ambiguity  */',
    'address': 'str  /* Base58 encoded `tz` or `KT` address */',
    'key': 'str  /* Base58 encoded public key */',
    'key_hash': 'str  /* Base58 encoded public key hash */',
    'signature': 'str  /* Base58 encoded signature */',
    'lambda': 'str  /* Michelson source code */',
    'chain_id': 'str  /* Base58 encoded chain ID */',
    'ticket': '/* no literal form, tickets can only be created by another contract */',
    'operation': '/* no literal form, operations can only be spawned by another contract or lambda */',
}


[docs]class undefined: def __repr__(self): return '_' def __eq__(self, other): return isinstance(other, undefined) def __lt__(self, other): return False def __hash__(self): return hash(None)
Undefined = undefined()
[docs]def parse_name(annots: Optional[List[str]], prefix: str) -> Optional[str]: if not annots: return None sub_annots = [x[1:] for x in annots if x.startswith(prefix)] assert len(sub_annots) <= 1, f'multiple "{prefix}" annotations are not allowed: {sub_annots}' return sub_annots[0] if sub_annots else None
[docs]class MichelsonType(Micheline): field_name: Optional[str] = None type_name: Optional[str] = None args: List[Union[Type['MichelsonType'], Any]] = [] # NOTE: for sorting def __lt__(self, other: 'MichelsonType'): assert not self.is_comparable(), f'must be implemented for comparable types' # NOTE: for contains def __eq__(self, other: 'MichelsonType'): # type: ignore assert not self.is_comparable(), f'must be implemented for comparable types' def __getitem__(self, key): raise AssertionError(f'forbidden')
[docs] @staticmethod def match(expr) -> Type['MichelsonType']: return cast(Type['MichelsonType'], Micheline.match(expr))
[docs] @classmethod def create_type( cls, args: List[Type['Micheline']], annots: Optional[list] = None, **kwargs, ) -> Type['MichelsonType']: type_args = [arg for arg in args if issubclass(arg, MichelsonType)] if cls.prim in ['list', 'set', 'map', 'big_map', 'option', 'contract', 'lambda']: for arg in type_args: assert arg.field_name is None, f'{cls.prim} argument type cannot be annotated: %{arg.field_name}' if cls.prim in ['set', 'map', 'big_map', 'ticket']: assert type_args[0].is_comparable(), f'{cls.prim} key type has to be comparable (not {type_args[0].prim})' if cls.prim == 'big_map': assert type_args[0].is_big_map_friendly(), f'impossible big_map value type' res = type( cls.__name__, (cls,), dict( field_name=parse_name(annots, '%'), # type: ignore type_name=parse_name(annots, ':'), # type: ignore args=args, **kwargs, ), ) return cast(Type['MichelsonType'], res)
[docs] @classmethod def get_anon_type(cls) -> Type['MichelsonType']: return cls.create_type(args=cls.args)
[docs] @classmethod def as_micheline_expr(cls) -> dict: annots = [] if cls.type_name is not None: annots.append(f':{cls.type_name}') if cls.field_name is not None: annots.append(f'%{cls.field_name}') args = [arg.as_micheline_expr() for arg in cls.args] expr = {'prim': cls.prim, 'annots': annots, 'args': args} return {k: v for k, v in expr.items() if v}
[docs] @classmethod def generate_pydoc(cls, definitions: List[Tuple[str, str]], inferred_name=None, comparable=False) -> str: assert len(cls.args) == 0 or cls.prim in [ 'contract', 'lambda', 'ticket', 'sapling_state', 'sapling_transaction', 'sapling_transaction_deprecated', ], 'defined for simple types only' if cls.prim in type_mappings: if all(x != cls.prim for x, _ in definitions): definitions.append((cls.prim, type_mappings[cls.prim])) return cls.prim # type: ignore
[docs] @classmethod def is_comparable(cls): if cls.prim in [ 'bls12_381_fr', 'bls12_381_g1', 'bls12_381_g2', 'sapling_state', 'sapling_transaction', 'sapling_transaction_deprecated', 'big_map', 'contract', 'lambda', 'list', 'map', 'set', 'operation', 'ticket', ]: return False return all(map(lambda x: x.is_comparable(), cls.args))
[docs] @classmethod def is_passable(cls): if cls.prim in ['operation']: return False elif cls.prim == 'lambda': return True return all(map(lambda x: x.is_passable(), cls.args))
[docs] @classmethod def is_storable(cls): if cls.prim in ['contract', 'operation']: return False elif cls.prim == 'lambda': return True return all(map(lambda x: x.is_storable(), cls.args))
[docs] @classmethod def is_pushable(cls): if cls.prim in ['big_map', 'operation', 'sapling_state', 'ticket']: return False elif cls.prim == 'lambda': return True return all(map(lambda x: x.is_pushable(), cls.args))
[docs] @classmethod def is_packable(cls): if cls.prim in ['big_map', 'operation', 'sapling_state', 'ticket']: return False elif cls.prim == 'lambda': return True return all(map(lambda x: x.is_packable(), cls.args))
[docs] @classmethod def is_duplicable(cls): if cls.prim in ['ticket']: return False elif cls.prim == 'lambda': return True return all(map(lambda x: x.is_duplicable(), cls.args))
[docs] @classmethod def is_big_map_friendly(cls): if cls.prim in ['big_map', 'operation', 'sapling_state']: return False elif cls.prim == 'lambda': return True return all(map(lambda x: x.is_big_map_friendly(), cls.args))
[docs] @classmethod def unpack(cls, data: bytes) -> 'MichelsonType': assert cls.is_packable(), f'{cls.prim} cannot be packed' assert data.startswith(b'\x05'), f'packed data should start with 05' val_expr = unforge_micheline(data[1:]) return cls.from_micheline_value(val_expr)
[docs] @classmethod def from_literal(cls, literal: Type[Micheline]) -> 'MichelsonType': expr = literal.as_micheline_expr() return cls.from_micheline_value(expr)
[docs] @classmethod def dummy(cls, context: AbstractContext) -> 'MichelsonType': raise NotImplementedError
[docs] @classmethod def from_micheline_value(cls, val_expr) -> 'MichelsonType': raise NotImplementedError
[docs] @classmethod def from_python_object(cls, py_obj) -> 'MichelsonType': raise NotImplementedError
[docs] def to_micheline_value(self, mode='readable', lazy_diff: Optional[bool] = False): raise NotImplementedError
[docs] def to_python_object(self, try_unpack=False, lazy_diff: Optional[bool] = False, comparable=False): raise NotImplementedError
[docs] def to_literal(self) -> Type[Micheline]: raise NotImplementedError
[docs] def attach_context(self, context: AbstractContext, big_map_copy=False): # NOTE: mutation assert len(self.args) == 0 or self.prim in ['contract', 'lambda', 'ticket', 'set']
[docs] def merge_lazy_diff(self, lazy_diff: List[dict]) -> 'MichelsonType': assert len(self.args) == 0 or self.prim in ['contract', 'lambda', 'ticket', 'set'] return copy(self)
[docs] def aggregate_lazy_diff(self, lazy_diff: List[dict], mode='readable') -> 'MichelsonType': assert len(self.args) == 0 or self.prim in ['contract', 'lambda', 'ticket', 'set'] return copy(self)
[docs] def find(self, predicate: Callable[['MichelsonType'], bool]) -> Optional['MichelsonType']: if predicate(self): return self if isinstance(self, Iterable): for item in self: if isinstance(item, MichelsonType): res = item.find(predicate) if res is not None: return res return None
[docs] def forge(self, mode='readable') -> bytes: val_expr = self.to_micheline_value(mode=mode) return forge_micheline(val_expr)
[docs] def pack(self, legacy=False) -> bytes: assert self.is_packable(), f'{self.prim} cannot be packed' data = self.forge(mode='legacy_optimized' if legacy else 'optimized') return b'\x05' + data
[docs] def duplicate(self): assert self.is_duplicable(), f'{self.prim} is not duplicable' return deepcopy(self)
[docs]def generate_pydoc(ty: Type[MichelsonType], title=None): definitions = [] # type: ignore top_def = ty.generate_pydoc(definitions, inferred_name=title) if not top_def.startswith('$'): definitions.insert(0, (title or ty.field_name or ty.type_name or 'element', top_def)) return '\n'.join(map(lambda x: f'${x[0]}:\n\t{x[1]}\n', definitions))