import logging
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from typing import cast
from pymavryk.context.abstract import AbstractContext
from pymavryk.michelson.instructions.base import MichelsonInstruction
from pymavryk.michelson.instructions.base import format_stdout
from pymavryk.michelson.micheline import MichelineLiteral
from pymavryk.michelson.micheline import MichelineSequence
from pymavryk.michelson.micheline import MichelsonRuntimeError
from pymavryk.michelson.sections import ParameterSection
from pymavryk.michelson.sections import StorageSection
from pymavryk.michelson.sections import ViewSection
from pymavryk.michelson.stack import MichelsonStack
from pymavryk.michelson.types import AddressType
from pymavryk.michelson.types import ChainIdType
from pymavryk.michelson.types import ContractType
from pymavryk.michelson.types import KeyHashType
from pymavryk.michelson.types import MumavType
from pymavryk.michelson.types import NatType
from pymavryk.michelson.types import OperationType
from pymavryk.michelson.types import OptionType
from pymavryk.michelson.types import PairType
from pymavryk.michelson.types import TimestampType
from pymavryk.michelson.types import UnitType
from pymavryk.michelson.types.base import MichelsonType
[docs]class AmountInstruction(MichelsonInstruction, prim='AMOUNT'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
amount = context.get_amount()
res = MumavType.from_value(amount)
stack.push(res)
stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class BalanceInstruction(MichelsonInstruction, prim='BALANCE'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
balance = context.get_balance()
res = MumavType.from_value(balance)
stack.push(res)
stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class ChainIdInstruction(MichelsonInstruction, prim='CHAIN_ID'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
chain_id = context.get_chain_id()
res = ChainIdType.from_value(chain_id)
stack.push(res)
stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]def get_entrypoint_type(context: AbstractContext, name: str, address=None) -> Optional[Type[MichelsonType]]:
expr = context.get_parameter_expr(address)
if expr is None:
return None
parameter = ParameterSection.match(expr)
entrypoints = parameter.list_entrypoints()
assert name in entrypoints, f'unknown entrypoint {name}'
return entrypoints[name]
[docs]class SelfInstruction(MichelsonInstruction, prim='SELF'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
entrypoint = next(iter(cls.field_names), 'default')
self_type = get_entrypoint_type(context, entrypoint)
assert self_type, f'parameter type is not defined'
self_address = context.get_self_address()
res_type = ContractType.create_type(args=[self_type])
res = res_type.from_value(f'{self_address}%{entrypoint}') # type: ignore
stack.push(res)
stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class SelfAddressInstruction(MichelsonInstruction, prim='SELF_ADDRESS'):
[docs] @classmethod
def execute(cls, stack: 'MichelsonStack', stdout: List[str], context: AbstractContext):
res = AddressType.from_value(context.get_self_address())
stack.push(res)
stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class SenderInstruction(MichelsonInstruction, prim='SENDER'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
sender = context.get_sender()
res = AddressType.from_value(sender)
stack.push(res)
stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class SourceInstruction(MichelsonInstruction, prim='SOURCE'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
source = context.get_source()
res = AddressType.from_value(source)
stack.push(res)
stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class NowInstruction(MichelsonInstruction, prim='NOW'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
now = context.get_now()
res = TimestampType.from_value(now)
stack.push(res)
stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class AddressInstruction(MichelsonInstruction, prim='ADDRESS'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
contract = cast(ContractType, stack.pop1())
contract.assert_type_in(ContractType)
res = AddressType.from_value(contract.get_address())
stack.push(res)
stdout.append(format_stdout(cls.prim, [contract], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class ContractInstruction(MichelsonInstruction, prim='CONTRACT', args_len=1):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
entrypoint = next(iter(cls.field_names), 'default')
address = cast(AddressType, stack.pop1())
address.assert_type_in(AddressType)
entrypoint_type = get_entrypoint_type(context, entrypoint, address=str(address))
contract_type = ContractType.create_type(args=cls.args)
try:
if entrypoint_type is None:
stdout.append(f'{cls.prim}: skip type checking for {str(address)}')
else:
entrypoint_type.assert_type_equal(cls.args[0])
res = OptionType.from_some(contract_type.from_value(f'{str(address)}%{entrypoint}')) # type: ignore
except AssertionError:
res = OptionType.none(contract_type)
stack.push(res)
stdout.append(format_stdout(cls.prim, [address], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class ImplicitAccountInstruction(MichelsonInstruction, prim='IMPLICIT_ACCOUNT'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
key_hash = cast(KeyHashType, stack.pop1())
key_hash.assert_type_equal(KeyHashType)
res = ContractType.create_type(args=[UnitType]).from_value(str(key_hash)) # type: ignore
stack.push(res)
stdout.append(format_stdout(cls.prim, [key_hash], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class CreateContractInstruction(MichelsonInstruction, prim='CREATE_CONTRACT', args_len=1):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
sequence = cast(MichelineSequence, cls.args[0])
assert len(sequence.args) >= 3, f'expected more than 2 sections, got {len(sequence.args)}'
assert {arg.prim for arg in sequence.args[:3]} == {'parameter', 'storage', 'code'}, f'unexpected sections'
storage_type = cast(Type[MichelsonType], next(arg.args[0] for arg in sequence.args if arg.prim == 'storage'))
delegate, amount, initial_storage = cast(Tuple[OptionType, MumavType, MichelsonType], stack.pop3())
delegate.assert_type_equal(OptionType.create_type(args=[KeyHashType]))
amount.assert_type_equal(MumavType)
initial_storage.assert_type_equal(storage_type)
originated_address = AddressType.from_value(context.get_originated_address())
context.spend_balance(int(amount))
origination = OperationType.origination(
source=context.get_self_address(),
script=cls.args[0], # type: ignore
storage=initial_storage,
balance=int(amount),
delegate=None if delegate.is_none() else str(delegate.get_some()),
)
stack.push(originated_address)
stack.push(origination)
stdout.append(format_stdout(cls.prim, [delegate, amount, initial_storage], [origination, originated_address])) # type: ignore
return cls(stack_items_added=2)
[docs]class SetDelegateInstruction(MichelsonInstruction, prim='SET_DELEGATE'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
delegate = cast(OptionType, stack.pop1())
delegate.assert_type_equal(OptionType.create_type(args=[KeyHashType]))
delegation = OperationType.delegation(
source=context.get_self_address(),
delegate=None if delegate.is_none() else str(delegate.get_some()),
)
stack.push(delegation)
stdout.append(format_stdout(cls.prim, [delegate], [delegation])) # type: ignore
return cls(stack_items_added=1)
[docs]class TransferTokensInstruction(MichelsonInstruction, prim='TRANSFER_TOKENS'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
parameter, amount, destination = cast(Tuple[MichelsonType, MumavType, ContractType], stack.pop3())
amount.assert_type_equal(MumavType)
assert isinstance(destination, ContractType), f'expected contract, got {destination.prim}'
param_type = destination.args[0]
parameter.assert_type_equal(param_type)
ep_type = get_entrypoint_type(context, destination.get_entrypoint(), address=destination.get_address())
if ep_type:
parameter.assert_type_equal(ep_type, message='destination contract parameter')
transaction = OperationType.transaction(
source=context.get_self_address(),
destination=destination.get_address(),
amount=int(amount),
entrypoint=destination.get_entrypoint(),
value=parameter.to_micheline_value(),
param_type=param_type,
)
stack.push(transaction)
stdout.append(format_stdout(cls.prim, [parameter, amount, destination], [transaction])) # type: ignore
return cls(stack_items_added=1)
[docs]class VotingPowerInstruction(MichelsonInstruction, prim='VOTING_POWER'):
[docs] @classmethod
def execute(cls, stack: 'MichelsonStack', stdout: List[str], context: AbstractContext):
address = cast(KeyHashType, stack.pop1())
address.assert_type_equal(KeyHashType)
res = NatType.from_value(context.get_voting_power(str(address)))
stack.push(res)
stdout.append(format_stdout(cls.prim, [address], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class TotalVotingPowerInstruction(MichelsonInstruction, prim='TOTAL_VOTING_POWER'):
[docs] @classmethod
def execute(cls, stack: 'MichelsonStack', stdout: List[str], context: AbstractContext):
res = NatType.from_value(context.get_total_voting_power())
stack.push(res)
stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class LevelInstruction(MichelsonInstruction, prim='LEVEL'):
[docs] @classmethod
def execute(cls, stack: 'MichelsonStack', stdout: List[str], context: AbstractContext):
res = NatType.from_value(context.get_level())
stack.push(res)
stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class ViewInstruction(MichelsonInstruction, prim='VIEW', args_len=2):
[docs] @classmethod
def execute(cls, stack: 'MichelsonStack', stdout: List[str], context: AbstractContext):
input_value, view_address = cast(Tuple[MichelsonType, AddressType], stack.pop2())
name = cast(Type[MichelineLiteral], cls.args[0]).get_string()
address: Optional[str] = str(view_address)
if address == context.get_self_address():
address = None
else:
# FIXME: spawn new context with patched BALANCE and others
logging.warning('PyMavryk does not support external views with BALANCE or other context-dependent opcodes')
return_ty = cast(Type[MichelsonType], cls.args[1])
result = context.get_view_result(address=address, name=name)
if result is not None:
logging.info('Using patched VIEW result')
res = OptionType.from_some(return_ty.from_python_object(result))
else:
try:
view_expr = context.get_view_expr(name, address=address)
if view_expr is None:
raise MichelsonRuntimeError(f'Failed to load view {str(view_address)}%{name}')
view_ty = ViewSection.match(view_expr)
return_ty.assert_type_equal(view_ty.args[2], message=f'view {name} return type')
except (MichelsonRuntimeError, AssertionError) as e:
stdout.append(f'VIEW: {str(e)}')
res = OptionType.none(return_ty)
else:
storage_expr = context.get_storage_value(address)
storage_ty = StorageSection.match(context.get_storage_expr())
storage_value = storage_ty.from_micheline_value(storage_expr).item
parameter = PairType.from_comb([input_value, storage_value])
view_stack = MichelsonStack([parameter])
# FIXME: need to patch context.balance
view_code = cast(MichelineSequence, view_ty.args[3])
view_code.execute(view_stack, stdout, context)
if len(view_stack) != 1:
raise MichelsonRuntimeError('Expected single item on the stack, got', view_stack)
res = OptionType.from_some(view_stack.pop1())
stack.push(res)
stdout.append(format_stdout(cls.prim, [input_value, view_address], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class OpenChestInstruction(MichelsonInstruction, prim='OPEN_CHEST'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
raise NotImplementedError
[docs]class MinBlockTimeInstruction(MichelsonInstruction, prim='MIN_BLOCK_TIME'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
res = NatType.from_value(context.get_min_block_time())
stack.push(res)
stdout.append(format_stdout(cls.prim, [], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class EmitInstruction(MichelsonInstruction, prim='EMIT', args_len=1):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
event_type = cast(Type[MichelsonType], cls.args[0])
payload = stack.pop1()
payload.assert_type_equal(event_type)
tag = cls.field_names[0] if len(cls.field_names) == 1 else ''
res = OperationType.event(
source=context.get_self_address(), event_type=event_type, payload=payload.to_micheline_value(), tag=tag
)
stack.push(res)
stdout.append(format_stdout(cls.prim, [payload], [res], arg=f'%{tag}')) # type: ignore
return cls(stack_items_added=0)