Source code for pymavryk.michelson.instructions.mavryk

import logging
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from typing import cast

from pymavryk.context.abstract import AbstractContext
from pymavryk.michelson.instructions.base import MichelsonInstruction
from pymavryk.michelson.instructions.base import format_stdout
from pymavryk.michelson.micheline import MichelineLiteral
from pymavryk.michelson.micheline import MichelineSequence
from pymavryk.michelson.micheline import MichelsonRuntimeError
from pymavryk.michelson.sections import ParameterSection
from pymavryk.michelson.sections import StorageSection
from pymavryk.michelson.sections import ViewSection
from pymavryk.michelson.stack import MichelsonStack
from pymavryk.michelson.types import AddressType
from pymavryk.michelson.types import ChainIdType
from pymavryk.michelson.types import ContractType
from pymavryk.michelson.types import KeyHashType
from pymavryk.michelson.types import MumavType
from pymavryk.michelson.types import NatType
from pymavryk.michelson.types import OperationType
from pymavryk.michelson.types import OptionType
from pymavryk.michelson.types import PairType
from pymavryk.michelson.types import TimestampType
from pymavryk.michelson.types import UnitType
from pymavryk.michelson.types.base import MichelsonType


[docs]class AmountInstruction(MichelsonInstruction, prim='AMOUNT'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): amount = context.get_amount() res = MumavType.from_value(amount) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class BalanceInstruction(MichelsonInstruction, prim='BALANCE'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): balance = context.get_balance() res = MumavType.from_value(balance) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class ChainIdInstruction(MichelsonInstruction, prim='CHAIN_ID'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): chain_id = context.get_chain_id() res = ChainIdType.from_value(chain_id) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]def get_entrypoint_type(context: AbstractContext, name: str, address=None) -> Optional[Type[MichelsonType]]: expr = context.get_parameter_expr(address) if expr is None: return None parameter = ParameterSection.match(expr) entrypoints = parameter.list_entrypoints() assert name in entrypoints, f'unknown entrypoint {name}' return entrypoints[name]
[docs]class SelfInstruction(MichelsonInstruction, prim='SELF'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): entrypoint = next(iter(cls.field_names), 'default') self_type = get_entrypoint_type(context, entrypoint) assert self_type, f'parameter type is not defined' self_address = context.get_self_address() res_type = ContractType.create_type(args=[self_type]) res = res_type.from_value(f'{self_address}%{entrypoint}') # type: ignore stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class SelfAddressInstruction(MichelsonInstruction, prim='SELF_ADDRESS'):
[docs] @classmethod def execute(cls, stack: 'MichelsonStack', stdout: List[str], context: AbstractContext): res = AddressType.from_value(context.get_self_address()) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class SenderInstruction(MichelsonInstruction, prim='SENDER'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): sender = context.get_sender() res = AddressType.from_value(sender) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class SourceInstruction(MichelsonInstruction, prim='SOURCE'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): source = context.get_source() res = AddressType.from_value(source) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class NowInstruction(MichelsonInstruction, prim='NOW'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): now = context.get_now() res = TimestampType.from_value(now) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class AddressInstruction(MichelsonInstruction, prim='ADDRESS'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): contract = cast(ContractType, stack.pop1()) contract.assert_type_in(ContractType) res = AddressType.from_value(contract.get_address()) stack.push(res) stdout.append(format_stdout(cls.prim, [contract], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class ContractInstruction(MichelsonInstruction, prim='CONTRACT', args_len=1):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): entrypoint = next(iter(cls.field_names), 'default') address = cast(AddressType, stack.pop1()) address.assert_type_in(AddressType) entrypoint_type = get_entrypoint_type(context, entrypoint, address=str(address)) contract_type = ContractType.create_type(args=cls.args) try: if entrypoint_type is None: stdout.append(f'{cls.prim}: skip type checking for {str(address)}') else: entrypoint_type.assert_type_equal(cls.args[0]) res = OptionType.from_some(contract_type.from_value(f'{str(address)}%{entrypoint}')) # type: ignore except AssertionError: res = OptionType.none(contract_type) stack.push(res) stdout.append(format_stdout(cls.prim, [address], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class ImplicitAccountInstruction(MichelsonInstruction, prim='IMPLICIT_ACCOUNT'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): key_hash = cast(KeyHashType, stack.pop1()) key_hash.assert_type_equal(KeyHashType) res = ContractType.create_type(args=[UnitType]).from_value(str(key_hash)) # type: ignore stack.push(res) stdout.append(format_stdout(cls.prim, [key_hash], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class CreateContractInstruction(MichelsonInstruction, prim='CREATE_CONTRACT', args_len=1):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): sequence = cast(MichelineSequence, cls.args[0]) assert len(sequence.args) >= 3, f'expected more than 2 sections, got {len(sequence.args)}' assert {arg.prim for arg in sequence.args[:3]} == {'parameter', 'storage', 'code'}, f'unexpected sections' storage_type = cast(Type[MichelsonType], next(arg.args[0] for arg in sequence.args if arg.prim == 'storage')) delegate, amount, initial_storage = cast(Tuple[OptionType, MumavType, MichelsonType], stack.pop3()) delegate.assert_type_equal(OptionType.create_type(args=[KeyHashType])) amount.assert_type_equal(MumavType) initial_storage.assert_type_equal(storage_type) originated_address = AddressType.from_value(context.get_originated_address()) context.spend_balance(int(amount)) origination = OperationType.origination( source=context.get_self_address(), script=cls.args[0], # type: ignore storage=initial_storage, balance=int(amount), delegate=None if delegate.is_none() else str(delegate.get_some()), ) stack.push(originated_address) stack.push(origination) stdout.append(format_stdout(cls.prim, [delegate, amount, initial_storage], [origination, originated_address])) # type: ignore return cls(stack_items_added=2)
[docs]class SetDelegateInstruction(MichelsonInstruction, prim='SET_DELEGATE'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): delegate = cast(OptionType, stack.pop1()) delegate.assert_type_equal(OptionType.create_type(args=[KeyHashType])) delegation = OperationType.delegation( source=context.get_self_address(), delegate=None if delegate.is_none() else str(delegate.get_some()), ) stack.push(delegation) stdout.append(format_stdout(cls.prim, [delegate], [delegation])) # type: ignore return cls(stack_items_added=1)
[docs]class TransferTokensInstruction(MichelsonInstruction, prim='TRANSFER_TOKENS'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): parameter, amount, destination = cast(Tuple[MichelsonType, MumavType, ContractType], stack.pop3()) amount.assert_type_equal(MumavType) assert isinstance(destination, ContractType), f'expected contract, got {destination.prim}' param_type = destination.args[0] parameter.assert_type_equal(param_type) ep_type = get_entrypoint_type(context, destination.get_entrypoint(), address=destination.get_address()) if ep_type: parameter.assert_type_equal(ep_type, message='destination contract parameter') transaction = OperationType.transaction( source=context.get_self_address(), destination=destination.get_address(), amount=int(amount), entrypoint=destination.get_entrypoint(), value=parameter.to_micheline_value(), param_type=param_type, ) stack.push(transaction) stdout.append(format_stdout(cls.prim, [parameter, amount, destination], [transaction])) # type: ignore return cls(stack_items_added=1)
[docs]class VotingPowerInstruction(MichelsonInstruction, prim='VOTING_POWER'):
[docs] @classmethod def execute(cls, stack: 'MichelsonStack', stdout: List[str], context: AbstractContext): address = cast(KeyHashType, stack.pop1()) address.assert_type_equal(KeyHashType) res = NatType.from_value(context.get_voting_power(str(address))) stack.push(res) stdout.append(format_stdout(cls.prim, [address], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class TotalVotingPowerInstruction(MichelsonInstruction, prim='TOTAL_VOTING_POWER'):
[docs] @classmethod def execute(cls, stack: 'MichelsonStack', stdout: List[str], context: AbstractContext): res = NatType.from_value(context.get_total_voting_power()) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class LevelInstruction(MichelsonInstruction, prim='LEVEL'):
[docs] @classmethod def execute(cls, stack: 'MichelsonStack', stdout: List[str], context: AbstractContext): res = NatType.from_value(context.get_level()) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class ViewInstruction(MichelsonInstruction, prim='VIEW', args_len=2):
[docs] @classmethod def execute(cls, stack: 'MichelsonStack', stdout: List[str], context: AbstractContext): input_value, view_address = cast(Tuple[MichelsonType, AddressType], stack.pop2()) name = cast(Type[MichelineLiteral], cls.args[0]).get_string() address: Optional[str] = str(view_address) if address == context.get_self_address(): address = None else: # FIXME: spawn new context with patched BALANCE and others logging.warning('PyMavryk does not support external views with BALANCE or other context-dependent opcodes') return_ty = cast(Type[MichelsonType], cls.args[1]) result = context.get_view_result(address=address, name=name) if result is not None: logging.info('Using patched VIEW result') res = OptionType.from_some(return_ty.from_python_object(result)) else: try: view_expr = context.get_view_expr(name, address=address) if view_expr is None: raise MichelsonRuntimeError(f'Failed to load view {str(view_address)}%{name}') view_ty = ViewSection.match(view_expr) return_ty.assert_type_equal(view_ty.args[2], message=f'view {name} return type') except (MichelsonRuntimeError, AssertionError) as e: stdout.append(f'VIEW: {str(e)}') res = OptionType.none(return_ty) else: storage_expr = context.get_storage_value(address) storage_ty = StorageSection.match(context.get_storage_expr()) storage_value = storage_ty.from_micheline_value(storage_expr).item parameter = PairType.from_comb([input_value, storage_value]) view_stack = MichelsonStack([parameter]) # FIXME: need to patch context.balance view_code = cast(MichelineSequence, view_ty.args[3]) view_code.execute(view_stack, stdout, context) if len(view_stack) != 1: raise MichelsonRuntimeError('Expected single item on the stack, got', view_stack) res = OptionType.from_some(view_stack.pop1()) stack.push(res) stdout.append(format_stdout(cls.prim, [input_value, view_address], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class OpenChestInstruction(MichelsonInstruction, prim='OPEN_CHEST'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): raise NotImplementedError
[docs]class MinBlockTimeInstruction(MichelsonInstruction, prim='MIN_BLOCK_TIME'):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): res = NatType.from_value(context.get_min_block_time()) stack.push(res) stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore return cls(stack_items_added=1)
[docs]class EmitInstruction(MichelsonInstruction, prim='EMIT', args_len=1):
[docs] @classmethod def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext): event_type = cast(Type[MichelsonType], cls.args[0]) payload = stack.pop1() payload.assert_type_equal(event_type) tag = cls.field_names[0] if len(cls.field_names) == 1 else '' res = OperationType.event( source=context.get_self_address(), event_type=event_type, payload=payload.to_micheline_value(), tag=tag ) stack.push(res) stdout.append(format_stdout(cls.prim, [payload], [res], arg=f'%{tag}')) # type: ignore return cls(stack_items_added=0)