from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from typing import cast
from pymavryk.context.impl import ExecutionContext
from pymavryk.crypto.encoding import base58_encode
from pymavryk.michelson.instructions.base import MichelsonInstruction
from pymavryk.michelson.instructions.base import format_stdout
from pymavryk.michelson.instructions.tzt import BigMapInstruction
from pymavryk.michelson.instructions.tzt import StackEltInstruction
from pymavryk.michelson.micheline import MichelineSequence
from pymavryk.michelson.micheline import get_script_section
from pymavryk.michelson.micheline import get_script_sections
from pymavryk.michelson.micheline import try_catch
from pymavryk.michelson.micheline import validate_sections
from pymavryk.michelson.sections.code import CodeSection
from pymavryk.michelson.sections.parameter import ParameterSection
from pymavryk.michelson.sections.storage import StorageSection
from pymavryk.michelson.sections.tzt import AmountSection
from pymavryk.michelson.sections.tzt import BalanceSection
from pymavryk.michelson.sections.tzt import BigMapsSection
from pymavryk.michelson.sections.tzt import ChainIdSection
from pymavryk.michelson.sections.tzt import InputSection
from pymavryk.michelson.sections.tzt import NowSection
from pymavryk.michelson.sections.tzt import OutputSection
from pymavryk.michelson.sections.tzt import SelfSection
from pymavryk.michelson.sections.tzt import SenderSection
from pymavryk.michelson.sections.tzt import SourceSection
from pymavryk.michelson.sections.view import ViewSection
from pymavryk.michelson.stack import MichelsonStack
from pymavryk.michelson.types import ListType
from pymavryk.michelson.types import MichelsonType
from pymavryk.michelson.types import OperationType
from pymavryk.michelson.types import PairType
[docs]class MichelsonProgram:
"""Michelson .tz contract interpreter interface"""
parameter: Type[ParameterSection]
storage: Type[StorageSection]
code: Type[CodeSection]
views: List[Type[ViewSection]]
def __init__(self, name: str, parameter: ParameterSection, storage: StorageSection) -> None:
self.name = name
self.parameter_value = parameter
self.storage_value = storage
[docs] @staticmethod
def load(context: ExecutionContext, with_code=False) -> Type['MichelsonProgram']:
"""Create MichelsonProgram type from filled context"""
cls = type(
MichelsonProgram.__name__,
(MichelsonProgram,),
{
'parameter': ParameterSection.match(context.get_parameter_expr()),
'storage': StorageSection.match(context.get_storage_expr()),
'code': CodeSection.match(context.get_code_expr() if with_code else []),
'views': [ViewSection.match(expr) for expr in context.get_views_expr()] if with_code else [],
},
)
return cast(Type['MichelsonProgram'], cls)
[docs] @staticmethod
def create(sequence: Type[MichelineSequence]) -> Type['MichelsonProgram']:
"""Create MichelsonProgram type from micheline"""
validate_sections(
sequence,
(
'parameter',
'storage',
'code',
),
)
cls = type(
MichelsonProgram.__name__,
(MichelsonProgram,),
{
'parameter': get_script_section(sequence, cls=ParameterSection, required=True), # type: ignore
'storage': get_script_section(sequence, cls=StorageSection, required=True), # type: ignore
'code': get_script_section(sequence, cls=CodeSection, required=True), # type: ignore
'views': get_script_sections(sequence, cls=ViewSection), # type: ignore
},
)
return cast(Type['MichelsonProgram'], cls)
[docs] @staticmethod
def match(expr) -> Type['MichelsonProgram']:
seq = cast(Type[MichelineSequence], MichelineSequence.match(expr))
if not issubclass(seq, MichelineSequence):
raise Exception(f'Expected sequence, got {seq.prim}')
return MichelsonProgram.create(seq)
[docs] @classmethod
def as_micheline_expr(cls) -> List[Dict[str, Any]]:
return [
cls.parameter.as_micheline_expr(),
cls.storage.as_micheline_expr(),
cls.code.as_micheline_expr(),
*[view.as_micheline_expr() for view in cls.views],
]
[docs] @classmethod
def get_view(cls, name: str) -> Type[ViewSection]:
return next(view for view in cls.views if view.name == name)
[docs] @classmethod
def instantiate(cls, entrypoint: str, parameter, storage) -> 'MichelsonProgram':
parameter_value = cls.parameter.from_parameters({'entrypoint': entrypoint, 'value': parameter})
storage_value = cls.storage.from_micheline_value(storage)
return cls(entrypoint, parameter_value, storage_value)
[docs] @classmethod
def instantiate_view(cls, name: str, parameter, storage) -> 'MichelsonProgram':
view = cls.get_view(name)
parameter_ty = ParameterSection.create_type(args=[view.args[1]])
parameter_value = parameter_ty.from_micheline_value(parameter)
storage_value = cls.storage.from_micheline_value(storage)
return cls(name, parameter_value, storage_value)
[docs] @try_catch('BEGIN')
def begin(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext) -> None:
"""Prepare stack for contract execution"""
self.parameter_value.attach_context(context)
self.storage_value.attach_context(context)
res = PairType.from_comb([self.parameter_value.item, self.storage_value.item])
stack.push(res)
stdout.append(format_stdout(f'BEGIN %{self.name}', [], [res]))
[docs] def execute(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext) -> MichelsonInstruction:
"""Execute contract in interpreter"""
return cast(MichelsonInstruction, self.code.args[0].execute(stack, stdout, context))
[docs] def execute_view(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext):
"""Execute view in interpreter"""
view = self.get_view(self.name)
return cast(MichelsonInstruction, view.args[3].execute(stack, stdout, context))
[docs] @try_catch('END')
def end(
self,
stack: MichelsonStack,
stdout: List[str],
output_mode='readable',
) -> Tuple[List[dict], Any, List[dict], PairType]:
"""Finish contract execution"""
res = cast(PairType, stack.pop1())
if len(stack):
raise Exception(f'Stack is not empty: {repr(stack)}')
res.assert_type_equal(
PairType.create_type(
args=[ListType.create_type(args=[OperationType]), self.storage.args[0]],
),
message='list of operations + resulting storage',
)
operations = [op.content for op in res.items[0]] # type: ignore
lazy_diff = [] # type: ignore
storage = res.items[1].aggregate_lazy_diff(lazy_diff).to_micheline_value(mode=output_mode)
stdout.append(format_stdout(f'END %{self.name}', [res], []))
return operations, storage, lazy_diff, res
[docs] @try_catch('RET')
def ret(
self,
stack: MichelsonStack,
stdout: List[str],
output_mode='readable',
) -> MichelsonType:
view = self.get_view(self.name)
res = stack.pop1()
if len(stack):
raise Exception(f'Stack is not empty: {repr(stack)}')
res.assert_type_equal(view.args[2], message='view return type')
stdout.append(format_stdout(f'RET %{self.name}', [res], []))
return view.args[2].from_micheline_value(res.to_micheline_value(mode=output_mode))
[docs]class TztMichelsonProgram:
"""Michelson .tzt contract interpreter interface"""
code: Type[CodeSection]
input: Type[InputSection]
output: Type[OutputSection]
big_maps: Optional[Type[BigMapsSection]]
[docs] @staticmethod
def load(context: ExecutionContext, with_code=False):
"""Create TztMichelsonProgram type from filled context"""
cls = type(
TztMichelsonProgram.__name__,
(TztMichelsonProgram,),
{
'input': InputSection.match(context.get_input_expr()),
'output': OutputSection.match(context.get_output_expr()),
'code': CodeSection.match(context.get_code_expr() if with_code else []),
'big_maps': BigMapsSection.match(context.get_big_maps_expr()) if context.get_big_maps_expr() else None,
},
)
return cast(Type['TztMichelsonProgram'], cls)
[docs] @staticmethod
def create(sequence: Type[MichelineSequence]) -> Type['TztMichelsonProgram']:
"""Create TztMichelsonProgram type from micheline"""
validate_sections(sequence, ('input', 'output', 'code'))
cls = type(
TztMichelsonProgram.__name__,
(TztMichelsonProgram,),
{
'input': get_script_section(sequence, cls=InputSection, required=True), # type: ignore
'output': get_script_section(sequence, cls=OutputSection, required=True), # type: ignore
'code': get_script_section(sequence, cls=CodeSection, required=True), # type: ignore
'big_maps': get_script_section(sequence, cls=BigMapsSection, required=False), # type: ignore
},
)
return cast(Type['TztMichelsonProgram'], cls)
[docs] @staticmethod
def match(expr) -> Type['TztMichelsonProgram']:
seq = cast(Type[MichelineSequence], MichelineSequence.match(expr))
if not issubclass(seq, MichelineSequence):
raise Exception(f'expected sequence, got {seq.prim}')
return TztMichelsonProgram.create(seq)
[docs] @classmethod
def as_micheline_expr(cls) -> List[Dict[str, Any]]:
# TODO: Serialize all sections
return [
cls.code.as_micheline_expr(),
cls.input.as_micheline_expr(),
cls.output.as_micheline_expr(),
]
[docs] @classmethod
def instantiate(cls) -> 'TztMichelsonProgram':
return cls()
[docs] def fill_context(self, script, context: ExecutionContext) -> None:
sender = context.get_sender_expr()
if sender:
context.sender = SenderSection.match(sender).args[0].get_string() # type: ignore
amount = context.get_amount_expr()
if amount:
context.amount = AmountSection.match(amount).args[0].get_int() # type: ignore
balance = context.get_balance_expr()
if balance:
context.balance = BalanceSection.match(balance).args[0].get_int() # type: ignore
_self = context.get_self_expr()
if _self:
context.address = SelfSection.match(_self).args[0].get_string() # type: ignore
now = context.get_now_expr()
if now:
context.now = NowSection.match(now).args[0].get_int() # type: ignore
source = context.get_source_expr()
if source:
context.source = SourceSection.match(source).args[0].get_string() # type: ignore
chain_id = context.get_chain_id_expr()
if chain_id:
# FIXME: Move to some common place
context.chain_id = base58_encode(
cast(bytes, ChainIdSection.match(chain_id).args[0].literal),
prefix=b'Net',
).decode()
[docs] def register_bigmaps(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext) -> None:
if self.big_maps:
for item in self.big_maps.args[0].args[::-1]:
if not issubclass(item, BigMapInstruction):
raise Exception('Only `Big_map` instructions can be used in `big_maps` section')
item.add(stack, stdout, context)
[docs] def begin(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext) -> None:
"""Prepare stack for contract execution"""
for item in self.input.args[0].args[::-1]:
if issubclass(item, StackEltInstruction):
item.push(stack, stdout, context)
else:
raise Exception('Only `Stack_elt` instructions can be used in `input` section', item)
[docs] def execute(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext) -> MichelsonInstruction:
"""Execute contract in interpreter"""
return cast(MichelsonInstruction, self.code.args[0].execute(stack, stdout, context))
[docs] def end(self, stack: MichelsonStack, stdout: List[str], context: ExecutionContext) -> None:
"""Finish contract execution"""
for item in self.output.args[0].args:
if not issubclass(item, StackEltInstruction):
raise Exception('Only `Stack_elt` instructions can be used in `output` section')
item.pull(stack, stdout, context)
if len(stack):
raise Exception('Stack is not empty after processing `output` section')