Source code for pymavryk.michelson.program

from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from typing import cast

from pymavryk.context.impl import ExecutionContext
from pymavryk.crypto.encoding import base58_encode
from pymavryk.michelson.instructions.base import MichelsonInstruction
from pymavryk.michelson.instructions.base import format_stdout
from pymavryk.michelson.instructions.tzt import BigMapInstruction
from pymavryk.michelson.instructions.tzt import StackEltInstruction
from pymavryk.michelson.micheline import MichelineSequence
from pymavryk.michelson.micheline import get_script_section
from pymavryk.michelson.micheline import get_script_sections
from pymavryk.michelson.micheline import try_catch
from pymavryk.michelson.micheline import validate_sections
from pymavryk.michelson.sections.code import CodeSection
from pymavryk.michelson.sections.parameter import ParameterSection
from pymavryk.michelson.sections.storage import StorageSection
from pymavryk.michelson.sections.tzt import AmountSection
from pymavryk.michelson.sections.tzt import BalanceSection
from pymavryk.michelson.sections.tzt import BigMapsSection
from pymavryk.michelson.sections.tzt import ChainIdSection
from pymavryk.michelson.sections.tzt import InputSection
from pymavryk.michelson.sections.tzt import NowSection
from pymavryk.michelson.sections.tzt import OutputSection
from pymavryk.michelson.sections.tzt import SelfSection
from pymavryk.michelson.sections.tzt import SenderSection
from pymavryk.michelson.sections.tzt import SourceSection
from pymavryk.michelson.sections.view import ViewSection
from pymavryk.michelson.stack import MichelsonStack
from pymavryk.michelson.types import ListType
from pymavryk.michelson.types import MichelsonType
from pymavryk.michelson.types import OperationType
from pymavryk.michelson.types import PairType


[docs]class MichelsonProgram: """Michelson .tz contract interpreter interface""" parameter: Type[ParameterSection] storage: Type[StorageSection] code: Type[CodeSection] views: List[Type[ViewSection]] def __init__(self, name: str, parameter: ParameterSection, storage: StorageSection) -> None: self.name = name self.parameter_value = parameter self.storage_value = storage
[docs] @staticmethod def load(context: ExecutionContext, with_code=False) -> Type['MichelsonProgram']: """Create MichelsonProgram type from filled context""" cls = type( MichelsonProgram.__name__, (MichelsonProgram,), { 'parameter': ParameterSection.match(context.get_parameter_expr()), 'storage': StorageSection.match(context.get_storage_expr()), 'code': CodeSection.match(context.get_code_expr() if with_code else []), 'views': [ViewSection.match(expr) for expr in context.get_views_expr()] if with_code else [], }, ) return cast(Type['MichelsonProgram'], cls)
[docs] @staticmethod def create(sequence: Type[MichelineSequence]) -> Type['MichelsonProgram']: """Create MichelsonProgram type from micheline""" validate_sections( sequence, ( 'parameter', 'storage', 'code', ), ) cls = type( MichelsonProgram.__name__, (MichelsonProgram,), { 'parameter': get_script_section(sequence, cls=ParameterSection, required=True), # type: ignore 'storage': get_script_section(sequence, cls=StorageSection, required=True), # type: ignore 'code': get_script_section(sequence, cls=CodeSection, required=True), # type: ignore 'views': get_script_sections(sequence, cls=ViewSection), # type: ignore }, ) return cast(Type['MichelsonProgram'], cls)
[docs] @staticmethod def match(expr) -> Type['MichelsonProgram']: seq = cast(Type[MichelineSequence], MichelineSequence.match(expr)) if not issubclass(seq, MichelineSequence): raise Exception(f'Expected sequence, got {seq.prim}') return MichelsonProgram.create(seq)
[docs] @classmethod def as_micheline_expr(cls) -> List[Dict[str, Any]]: return [ cls.parameter.as_micheline_expr(), cls.storage.as_micheline_expr(), cls.code.as_micheline_expr(), *[view.as_micheline_expr() for view in cls.views], ]
[docs] @classmethod def get_view(cls, name: str) -> Type[ViewSection]: return next(view for view in cls.views if view.name == name)
[docs] @classmethod def instantiate(cls, entrypoint: str, parameter, storage) -> 'MichelsonProgram': parameter_value = cls.parameter.from_parameters({'entrypoint': entrypoint, 'value': parameter}) storage_value = cls.storage.from_micheline_value(storage) return cls(entrypoint, parameter_value, storage_value)
[docs] @classmethod def instantiate_view(cls, name: str, parameter, storage) -> 'MichelsonProgram': view = cls.get_view(name) parameter_ty = ParameterSection.create_type(args=[view.args[1]]) parameter_value = parameter_ty.from_micheline_value(parameter) storage_value = cls.storage.from_micheline_value(storage) return cls(name, parameter_value, storage_value)
[docs] @try_catch('BEGIN') def begin(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext) -> None: """Prepare stack for contract execution""" self.parameter_value.attach_context(context) self.storage_value.attach_context(context) res = PairType.from_comb([self.parameter_value.item, self.storage_value.item]) stack.push(res) stdout.append(format_stdout(f'BEGIN %{self.name}', [], [res]))
[docs] def execute(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext) -> MichelsonInstruction: """Execute contract in interpreter""" return cast(MichelsonInstruction, self.code.args[0].execute(stack, stdout, context))
[docs] def execute_view(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext): """Execute view in interpreter""" view = self.get_view(self.name) return cast(MichelsonInstruction, view.args[3].execute(stack, stdout, context))
[docs] @try_catch('END') def end( self, stack: MichelsonStack, stdout: List[str], output_mode='readable', ) -> Tuple[List[dict], Any, List[dict], PairType]: """Finish contract execution""" res = cast(PairType, stack.pop1()) if len(stack): raise Exception(f'Stack is not empty: {repr(stack)}') res.assert_type_equal( PairType.create_type( args=[ListType.create_type(args=[OperationType]), self.storage.args[0]], ), message='list of operations + resulting storage', ) operations = [op.content for op in res.items[0]] # type: ignore lazy_diff = [] # type: ignore storage = res.items[1].aggregate_lazy_diff(lazy_diff).to_micheline_value(mode=output_mode) stdout.append(format_stdout(f'END %{self.name}', [res], [])) return operations, storage, lazy_diff, res
[docs] @try_catch('RET') def ret( self, stack: MichelsonStack, stdout: List[str], output_mode='readable', ) -> MichelsonType: view = self.get_view(self.name) res = stack.pop1() if len(stack): raise Exception(f'Stack is not empty: {repr(stack)}') res.assert_type_equal(view.args[2], message='view return type') stdout.append(format_stdout(f'RET %{self.name}', [res], [])) return view.args[2].from_micheline_value(res.to_micheline_value(mode=output_mode))
[docs]class TztMichelsonProgram: """Michelson .tzt contract interpreter interface""" code: Type[CodeSection] input: Type[InputSection] output: Type[OutputSection] big_maps: Optional[Type[BigMapsSection]]
[docs] @staticmethod def load(context: ExecutionContext, with_code=False): """Create TztMichelsonProgram type from filled context""" cls = type( TztMichelsonProgram.__name__, (TztMichelsonProgram,), { 'input': InputSection.match(context.get_input_expr()), 'output': OutputSection.match(context.get_output_expr()), 'code': CodeSection.match(context.get_code_expr() if with_code else []), 'big_maps': BigMapsSection.match(context.get_big_maps_expr()) if context.get_big_maps_expr() else None, }, ) return cast(Type['TztMichelsonProgram'], cls)
[docs] @staticmethod def create(sequence: Type[MichelineSequence]) -> Type['TztMichelsonProgram']: """Create TztMichelsonProgram type from micheline""" validate_sections(sequence, ('input', 'output', 'code')) cls = type( TztMichelsonProgram.__name__, (TztMichelsonProgram,), { 'input': get_script_section(sequence, cls=InputSection, required=True), # type: ignore 'output': get_script_section(sequence, cls=OutputSection, required=True), # type: ignore 'code': get_script_section(sequence, cls=CodeSection, required=True), # type: ignore 'big_maps': get_script_section(sequence, cls=BigMapsSection, required=False), # type: ignore }, ) return cast(Type['TztMichelsonProgram'], cls)
[docs] @staticmethod def match(expr) -> Type['TztMichelsonProgram']: seq = cast(Type[MichelineSequence], MichelineSequence.match(expr)) if not issubclass(seq, MichelineSequence): raise Exception(f'expected sequence, got {seq.prim}') return TztMichelsonProgram.create(seq)
[docs] @classmethod def as_micheline_expr(cls) -> List[Dict[str, Any]]: # TODO: Serialize all sections return [ cls.code.as_micheline_expr(), cls.input.as_micheline_expr(), cls.output.as_micheline_expr(), ]
[docs] @classmethod def instantiate(cls) -> 'TztMichelsonProgram': return cls()
[docs] def fill_context(self, script, context: ExecutionContext) -> None: sender = context.get_sender_expr() if sender: context.sender = SenderSection.match(sender).args[0].get_string() # type: ignore amount = context.get_amount_expr() if amount: context.amount = AmountSection.match(amount).args[0].get_int() # type: ignore balance = context.get_balance_expr() if balance: context.balance = BalanceSection.match(balance).args[0].get_int() # type: ignore _self = context.get_self_expr() if _self: context.address = SelfSection.match(_self).args[0].get_string() # type: ignore now = context.get_now_expr() if now: context.now = NowSection.match(now).args[0].get_int() # type: ignore source = context.get_source_expr() if source: context.source = SourceSection.match(source).args[0].get_string() # type: ignore chain_id = context.get_chain_id_expr() if chain_id: # FIXME: Move to some common place context.chain_id = base58_encode( cast(bytes, ChainIdSection.match(chain_id).args[0].literal), prefix=b'Net', ).decode()
[docs] def register_bigmaps(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext) -> None: if self.big_maps: for item in self.big_maps.args[0].args[::-1]: if not issubclass(item, BigMapInstruction): raise Exception('Only `Big_map` instructions can be used in `big_maps` section') item.add(stack, stdout, context)
[docs] def begin(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext) -> None: """Prepare stack for contract execution""" for item in self.input.args[0].args[::-1]: if issubclass(item, StackEltInstruction): item.push(stack, stdout, context) else: raise Exception('Only `Stack_elt` instructions can be used in `input` section', item)
[docs] def execute(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext) -> MichelsonInstruction: """Execute contract in interpreter""" return cast(MichelsonInstruction, self.code.args[0].execute(stack, stdout, context))
[docs] def end(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext) -> None: """Finish contract execution""" for item in self.output.args[0].args: if not issubclass(item, StackEltInstruction): raise Exception('Only `Stack_elt` instructions can be used in `output` section') item.pull(stack, stdout, context) if len(stack): raise Exception('Stack is not empty after processing `output` section')