Source code for pymavryk.michelson.instructions.control

from typing import List
from typing import Tuple
from typing import Type
from typing import Union
from typing import cast

from pymavryk.context.abstract import AbstractContext
from pymavryk.michelson.instructions.adt import PairInstruction
from pymavryk.michelson.instructions.base import MichelsonInstruction
from pymavryk.michelson.instructions.base import Wildcard
from pymavryk.michelson.instructions.base import format_stdout
from pymavryk.michelson.instructions.stack import PushInstruction
from pymavryk.michelson.micheline import MichelineSequence
from pymavryk.michelson.micheline import MichelsonRuntimeError
from pymavryk.michelson.stack import MichelsonStack
from pymavryk.michelson.types import BoolType
from pymavryk.michelson.types import LambdaType
from pymavryk.michelson.types import ListType
from pymavryk.michelson.types import MapType
from pymavryk.michelson.types import MichelsonType
from pymavryk.michelson.types import OptionType
from pymavryk.michelson.types import OrType
from pymavryk.michelson.types import PairType
from pymavryk.michelson.types import SetType


[docs]def execute_dip( prim: str, stack: MichelsonStack, stdout: List[str], count: int, body: Type[MichelsonInstruction], context: AbstractContext, ) -> MichelsonInstruction: stdout.append(format_stdout(prim, [*Wildcard.n(count)], [])) stack.protect(count=count) item = body.execute(stack, stdout, context=context) stack.restore(count=count) stdout.append(format_stdout(prim, [], [*Wildcard.n(count)], count)) return item
[docs]class DipnInstruction(MichelsonInstruction, prim='DIP', args_len=2): def __init__(self, item: MichelsonInstruction): super(DipnInstruction, self).__init__() self.item = item
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): depth = cls.args[0].get_int() # type: ignore item = execute_dip(cls.prim, stack, stdout, count=depth, body=cls.args[1], context=context) # type: ignore return cls(item)
[docs]class DipInstruction(MichelsonInstruction, prim='DIP', args_len=1): def __init__(self, item: MichelsonInstruction): super(DipInstruction, self).__init__() self.item = item
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): item = execute_dip(cls.prim, stack, stdout, count=1, body=cls.args[0], context=context) # type: ignore return cls(item)
[docs]class LambdaInstruction(MichelsonInstruction, prim='LAMBDA', args_len=3):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): lambda_type = LambdaType.create_type(args=cls.args[:2]) res = lambda_type(cls.args[2]) # type: ignore stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class LambdaRecInstruction(MichelsonInstruction, prim='LAMBDA_REC', args_len=3): depth = 0
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): lambda_type = LambdaType.create_type(args=cls.args[:2]) inner = LambdaRecInstruction.create_type(args=cls.args.copy()) inner.depth = cls.depth + 1 # type: ignore if cls.depth + 1 > 256: raise MichelsonRuntimeError("Maximum recursive depth reached") body = MichelineSequence.create_type(args=[inner, cls.args[2]]) res = lambda_type(body) # type: ignore stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class ExecInstruction(MichelsonInstruction, prim='EXEC'): def __init__(self, item: MichelsonInstruction): super(ExecInstruction, self).__init__(stack_items_added=1) self.item = item
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): param, lambda_ = cast(Tuple[MichelsonType, LambdaType], stack.pop2()) assert isinstance(lambda_, LambdaType), f'expected lambda, got {lambda_.prim}' param.assert_type_equal(lambda_.args[0]) stdout.append(format_stdout(cls.prim, [param, lambda_], [])) # type: ignore lambda_stack = MichelsonStack.from_items([param]) lambda_body = cast(MichelsonInstruction, lambda_.value) item = lambda_body.execute(lambda_stack, stdout, context=context) res = lambda_stack.pop1() res.assert_type_equal(lambda_.args[1]) assert len(lambda_stack) == 0, f'lambda stack is not empty {lambda_stack}' stack.push(res) return cls(item)
[docs]class ApplyInstruction(MichelsonInstruction, prim='APPLY'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): left, lambda_ = cast(Tuple[MichelsonType, LambdaType], stack.pop2()) lambda_.assert_type_in(LambdaType) lambda_.args[0].assert_type_in(PairType) left_type, right_type = lambda_.args[0].args left.assert_type_equal(left_type) new_value = MichelineSequence.create_type( args=[ PushInstruction.create_type(args=[left_type, left.to_literal()]), PairInstruction, lambda_.value, ] ) res = LambdaType.create_type(args=[right_type, lambda_.args[1]])(new_value) # type: ignore stack.push(res) stdout.append(format_stdout(cls.prim, [left, lambda_], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class FailwithInstruction(MichelsonInstruction, prim='FAILWITH'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): a = stack.pop1() assert a.is_packable(), f'expected packable type, got {a.prim}' raise MichelsonRuntimeError(repr(a))
[docs]class IfInstruction(MichelsonInstruction, prim='IF', args_len=2): def __init__(self, item: MichelsonInstruction): super(IfInstruction, self).__init__() self.item = item
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): cond = cast(BoolType, stack.pop1()) cond.assert_type_equal(BoolType) stdout.append(format_stdout(cls.prim, [cond], [])) # type: ignore branch = cls.args[0] if bool(cond) else cls.args[1] item = branch.execute(stack, stdout, context=context) return cls(item)
[docs]class IfConsInstruction(MichelsonInstruction, prim='IF_CONS', args_len=2): def __init__(self, stack_items_added: int, item: MichelsonInstruction): super(IfConsInstruction, self).__init__(stack_items_added) self.item = item
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): lst = cast(ListType, stack.pop1()) lst.assert_type_in(ListType) if len(lst) > 0: head, tail = lst.split_head() stack.push(tail) stack.push(head) stdout.append(format_stdout(cls.prim, [lst], [head, tail])) # type: ignore branch = cls.args[0] stack_items_added = 2 else: stdout.append(format_stdout(cls.prim, [lst], [])) # type: ignore branch = cls.args[1] stack_items_added = 0 item = branch.execute(stack, stdout, context=context) return cls(stack_items_added, item)
[docs]class IfLeftInstruction(MichelsonInstruction, prim='IF_LEFT', args_len=2): def __init__(self, item: MichelsonInstruction): super(IfLeftInstruction, self).__init__(stack_items_added=1) self.item = item
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): or_ = cast(OrType, stack.pop1()) or_.assert_type_in(OrType) branch = cls.args[0] if or_.is_left() else cls.args[1] res = or_.resolve() stack.push(res) stdout.append(format_stdout(cls.prim, [or_], [res])) # type: ignore item = branch.execute(stack, stdout, context=context) return cls(item)
[docs]class IfNoneInstruction(MichelsonInstruction, prim='IF_NONE', args_len=2): def __init__(self, stack_items_added: int, item: MichelsonInstruction): super(IfNoneInstruction, self).__init__(stack_items_added) self.item = item
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): opt = cast(OptionType, stack.pop1()) opt.assert_type_in(OptionType) if opt.is_none(): branch = cls.args[0] stdout.append(format_stdout(cls.prim, [opt], [])) # type: ignore stack_items_added = 0 else: some = opt.get_some() stack.push(some) stdout.append(format_stdout(cls.prim, [opt], [some])) # type: ignore branch = cls.args[1] stack_items_added = 1 item = branch.execute(stack, stdout, context=context) return cls(stack_items_added, item)
[docs]class LoopInstruction(MichelsonInstruction, prim='LOOP', args_len=1): def __init__(self, items: List[MichelsonInstruction]): super(LoopInstruction, self).__init__() self.items = items
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): items = [] while True: cond = cast(BoolType, stack.pop1()) cond.assert_type_equal(BoolType) stdout.append(format_stdout(cls.prim, [cond], [])) # type: ignore if bool(cond): item = cls.args[0].execute(stack, stdout, context=context) items.append(item) else: break return cls(items)
[docs]class LoopLeftInstruction(MichelsonInstruction, prim='LOOP_LEFT', args_len=1): def __init__(self, stack_items_added: int, items: List[MichelsonInstruction]): super(LoopLeftInstruction, self).__init__(stack_items_added) self.items = items
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): stack_items_added = 0 items = [] while True: or_ = cast(OrType, stack.pop1()) or_.assert_type_in(OrType) var = or_.resolve() stack.push(var) stack_items_added += 1 stdout.append(format_stdout(cls.prim, [or_], [var])) # type: ignore if or_.is_left(): item = cls.args[0].execute(stack, stdout, context=context) items.append(item) else: break return cls(stack_items_added, items)
[docs]class MapInstruction(MichelsonInstruction, prim='MAP', args_len=1): def __init__(self, stack_items_added: int, items: List[MichelsonInstruction]): super(MapInstruction, self).__init__(stack_items_added) self.items = items
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): stack_items_added = 0 # noqa: SIM113 src = cast(Union[ListType, MapType], stack.pop1()) executions = [] items = [] popped = [src] for elt in src: if isinstance(src, MapType): elt = PairType.from_comb(list(elt)) # type: ignore stack.push(elt) # type: ignore stack_items_added += 1 stdout.append(format_stdout(cls.prim, popped, [elt])) # type: ignore execution = cls.args[0].execute(stack, stdout, context=context) executions.append(execution) new_elt = stack.pop1() if isinstance(src, MapType): items.append((elt[0], new_elt)) else: items.append(new_elt) # type: ignore popped = [new_elt] # type: ignore if items: res = type(src).from_items(items) # type: ignore else: res = src # TODO: need to deduce argument types stack.push(res) stack_items_added += 1 stdout.append(format_stdout(cls.prim, popped, [res])) # type: ignore return cls(stack_items_added, executions)
[docs]class IterInstruction(MichelsonInstruction, prim='ITER', args_len=1): def __init__(self, stack_items_added: int, items: List[MichelsonInstruction]): super(IterInstruction, self).__init__(stack_items_added) self.items = items
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): stack_items_added = 0 # noqa: SIM113 src = cast(Union[ListType, MapType, SetType], stack.pop1()) executions = [] popped = [src] for elt in src: if isinstance(src, MapType): elt = PairType.from_comb(list(elt)) # type: ignore stack_items_added += 1 stack.push(elt) # type: ignore stdout.append(format_stdout(cls.prim, popped, [elt])) # type: ignore execution = cls.args[0].execute(stack, stdout, context=context) executions.append(execution) popped = [] return cls(stack_items_added, executions)