Source code for pymavryk.michelson.types.big_map

from copy import copy
from copy import deepcopy
from typing import Callable
from typing import Generator
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from typing import Union

from pymavryk.context.abstract import AbstractContext
from pymavryk.michelson.forge import forge_script_expr
from pymavryk.michelson.micheline import Micheline
from pymavryk.michelson.micheline import MichelineLiteral
from pymavryk.michelson.micheline import MichelineSequence
from pymavryk.michelson.micheline import parse_micheline_literal
from pymavryk.michelson.types.base import MichelsonType
from pymavryk.michelson.types.base import Undefined
from pymavryk.michelson.types.map import EltLiteral
from pymavryk.michelson.types.map import MapType


[docs]class BigMapType(MapType, prim='big_map', args_len=2): def __init__( self, items: List[Tuple[MichelsonType, MichelsonType]], ptr: Optional[int] = None, removed_keys: Optional[List[MichelsonType]] = None, ): super(BigMapType, self).__init__(items=items) self.ptr = ptr self.removed_keys = removed_keys or [] self.context: Optional[AbstractContext] = None def __len__(self): return len(self.items) + len(self.removed_keys) def __iter__(self) -> Generator[Tuple[MichelsonType, Optional[MichelsonType]], None, None]: # type: ignore yield from iter(self.items) for key in self.removed_keys: yield key, None def __repr__(self): if self.context: return f'<{self.ptr}>' else: elements = [f'{repr(k)}: {repr(v)}' for k, v in self] return f'{{{", ".join(elements)}}}' def __deepcopy__(self, memodict): return self.duplicate() def __getitem__(self, key_obj) -> Optional[MichelsonType]: # type: ignore key = self.args[0].from_python_object(key_obj) return self.get(key, dup=False)
[docs] @staticmethod def empty(key_type: Type[MichelsonType], val_type: Type[MichelsonType]) -> 'BigMapType': cls = BigMapType.create_type(args=[key_type, val_type]) return cls(items=[]) # type: ignore
[docs] @staticmethod def from_items(items: List[Tuple[MichelsonType, MichelsonType]]): raise AssertionError('forbidden')
[docs] @classmethod def generate_pydoc(cls, definitions: list, inferred_name=None, comparable=False): doc = super(BigMapType, cls).generate_pydoc(definitions, inferred_name=inferred_name) return f'{doc} || int /* Big_map ID */'
[docs] @classmethod def dummy(cls, context: AbstractContext) -> 'BigMapType': return cls(items=[])
[docs] @classmethod def from_micheline_value(cls, val_expr) -> 'BigMapType': if isinstance(val_expr, dict): ptr = parse_micheline_literal(val_expr, {'int': int}) return cls(items=[], ptr=ptr) else: items = super(BigMapType, cls).parse_micheline_value(val_expr) return cls(items=items)
[docs] @classmethod def from_python_object(cls, py_obj: Union[int, dict]) -> 'BigMapType': if isinstance(py_obj, int): return cls(items=[], ptr=py_obj) else: items = super(BigMapType, cls).parse_python_object(py_obj) return cls(items=items)
[docs] def to_literal(self) -> Type[Micheline]: if self.ptr is not None: return MichelineLiteral.create(self.ptr) else: return MichelineSequence.create_type( args=[ EltLiteral.create_type( args=[ k.to_literal(), v.to_literal(), ] ) for k, v in self.items ] )
[docs] def to_micheline_value(self, mode='readable', lazy_diff: Optional[bool] = False): if lazy_diff is None: lazy_diff = self.ptr is None if lazy_diff: return super(BigMapType, self).to_micheline_value(mode=mode) else: assert self.ptr is not None, f'Big_map id is not defined' return {'int': str(self.ptr)}
[docs] def to_python_object(self, try_unpack=False, lazy_diff: Optional[bool] = False, comparable=False): if lazy_diff is None: lazy_diff = self.ptr is None if lazy_diff: assert not comparable, f'big_map is not comparable' res = super(BigMapType, self).to_python_object(try_unpack=try_unpack) removals = {key.to_python_object(try_unpack=try_unpack, comparable=True): None for key in self.removed_keys} return {**res, **removals} else: assert self.ptr is not None, f'Big_map id is not defined' return self.ptr
[docs] def merge_lazy_diff(self, lazy_diff: List[dict]) -> 'BigMapType': assert self.ptr is not None, f'Big_map id is not defined' assert isinstance(lazy_diff, list), f'expected list, got {type(lazy_diff).__name__}' diff = next((item for item in lazy_diff if item['kind'] == 'big_map' and item['id'] == str(self.ptr)), None) if diff: items: List[Tuple[MichelsonType, MichelsonType]] = [] removed_keys: List[MichelsonType] = [] for update in diff['diff'].get('updates', []): key = self.args[0].from_micheline_value(update['key']) if update.get('value'): value = self.args[1].from_micheline_value(update['value']) items.append((key, value)) else: removed_keys.append(key) res = type(self)(ptr=self.ptr, items=items, removed_keys=removed_keys) res.context = self.context return res else: return copy(self)
[docs] def aggregate_lazy_diff(self, lazy_diff: List[dict], mode='readable') -> 'BigMapType': assert self.ptr is not None, f'Big_map ID is not defined' if self.context: src_ptr, dst_ptr, action = self.context.get_big_map_diff(self.ptr) else: src_ptr, dst_ptr, action = self.ptr, self.ptr, 'update' def make_update(key: MichelsonType, val: Optional[MichelsonType]) -> dict: update = { 'key': key.to_micheline_value(mode=mode), 'key_hash': forge_script_expr(key.pack(legacy=True)), } if val is not None: update['value'] = val.to_micheline_value(mode=mode) return update diff = { 'action': action, 'updates': [make_update(key, val) for key, val in self], } if action == 'alloc': key_type, val_type = [arg.as_micheline_expr() for arg in self.args] diff['key_type'] = key_type # type: ignore diff['value_type'] = val_type # type: ignore elif action == 'copy': pass # TODO: lazy_diff.append( { 'kind': 'big_map', 'id': str(dst_ptr), 'diff': diff, } ) res = type(self)(items=[], ptr=dst_ptr) res.context = self.context return res
[docs] def find(self, predicate: Callable[['MichelsonType'], bool]) -> Optional['MichelsonType']: if predicate(self): return self return None
[docs] def attach_context(self, context: AbstractContext, big_map_copy=False): assert self.context is None, f'context already attached' self.context = context if self.ptr is None: self.ptr = context.get_tmp_big_map_id() else: self.ptr = context.register_big_map(self.ptr, copy=big_map_copy) if context.tzt: # type: ignore context.tzt_big_maps[self.ptr] = self # type: ignore
[docs] def get(self, key: MichelsonType, dup=True) -> Optional[MichelsonType]: self.args[0].assert_type_equal(type(key)) val = next((v for k, v in self if k == key), Undefined) # search in diff if val is Undefined: assert self.context, f'context is not attached' key_hash = forge_script_expr(key.pack(legacy=True)) val_expr = self.context.get_big_map_value(self.ptr, key_hash) # type: ignore if val_expr is None: return None else: return self.args[1].from_micheline_value(val_expr) else: return val # type: ignore
[docs] def update(self, key: MichelsonType, val: Optional[MichelsonType]) -> Tuple[Optional[MichelsonType], MichelsonType]: removed_keys = set(self.removed_keys) prev_val = self.get(key, dup=False) if prev_val is not None: if val is not None: items = [(k, v if k != key else val) for k, v in self] else: # remove items = [(k, v) for k, v in self if k != key] removed_keys.add(key) else: if val is not None: items = sorted(self.items + [(key, val)], key=lambda x: x[0]) if key in removed_keys: removed_keys.remove(key) else: # do nothing items = self.items # type: ignore res = type(self)(items=items, ptr=self.ptr, removed_keys=list(removed_keys)) # type: ignore res.context = self.context return prev_val, res
[docs] def get_key_hash(self, key_obj): key = self.args[0].from_python_object(key_obj) return forge_script_expr(key.pack(legacy=True))
[docs] def duplicate(self): res = type(self)( items=deepcopy(self.items), ptr=self.ptr, removed_keys=deepcopy(self.removed_keys), ) res.context = self.context return res