from copy import deepcopy
from typing import Any
from typing import List
from typing import Optional
from typing import Tuple
from typing import cast
from attr import dataclass
from pymavryk.context.impl import ExecutionContext
from pymavryk.michelson.micheline import MichelineSequence
from pymavryk.michelson.micheline import MichelsonRuntimeError
from pymavryk.michelson.parse import MichelsonParser
from pymavryk.michelson.parse import MichelsonParserError
from pymavryk.michelson.parse import michelson_to_micheline
from pymavryk.michelson.program import MichelsonProgram
from pymavryk.michelson.program import TztMichelsonProgram
from pymavryk.michelson.sections import CodeSection
from pymavryk.michelson.stack import MichelsonStack
from pymavryk.michelson.types import OperationType
[docs]@dataclass(kw_only=True)
class InterpreterResult:
"""Result of running contract in interpreter"""
operations = None
storage = None
lazy_diff = None
stdout: List[str]
error: Optional[Exception] = None
instructions: Optional[MichelineSequence] = None
stack: Optional[MichelsonStack] = None
[docs]class Interpreter:
"""Michelson interpreter reimplemented in Python.
Based on the following reference: https://tezos.gitlab.io/michelson-reference/
"""
def __init__(
self,
extra_primitives: Optional[List[str]] = None,
debug: bool = False,
) -> None:
self.stack = MichelsonStack()
self.context = ExecutionContext()
self.context.debug = debug
self.parser = MichelsonParser(debug=debug, extra_primitives=extra_primitives)
[docs] def execute(self, code: str) -> InterpreterResult:
"""Execute some code preserving current context and stack
:param code: Michelson code
"""
result = InterpreterResult(stdout=[])
stack_backup = deepcopy(self.stack)
context_backup = deepcopy(self.context)
try:
code_section = CodeSection.match(michelson_to_micheline(code))
instructions = code_section.args[0].execute(self.stack, result.stdout, self.context)
result.instructions = MichelineSequence([instructions])
result.stack = self.stack
except (MichelsonParserError, MichelsonRuntimeError) as e:
if self.context.debug:
raise
self.stack = stack_backup
self.context = context_backup
result.stdout.append(e.format_stdout())
result.error = e
return result
[docs] def reset(self) -> None:
"""Reset interpreter's stack and context"""
self.stack = MichelsonStack()
self.context = ExecutionContext()
[docs] @staticmethod
def run_code(
parameter,
storage,
script: str,
entrypoint='default',
output_mode='readable',
amount=None,
chain_id=None,
source=None,
sender=None,
balance=None,
block_id=None,
**kwargs,
) -> Tuple[List[dict], Any, List[dict], List[str], Optional[Exception]]:
"""Execute contract in interpreter
:param parameter: parameter expression
:param storage: storage expression
:param script: contract's Michelson code
:param entrypoint: contract entrypoint
:param output_mode: one of readable/optimized/legacy_optimized
:param amount: patch AMOUNT
:param chain_id: patch CHAIN_ID
:param source: patch SOURCE
:param sender: patch SENDER
:param balance: patch BALANCE
:param block_id: set block ID
"""
context = ExecutionContext(
amount=amount,
chain_id=chain_id,
source=source,
sender=sender,
balance=balance,
block_id=block_id,
script={'code': script, 'storage': storage},
**kwargs,
)
stack = MichelsonStack()
stdout = [] # type: ignore
try:
program = MichelsonProgram.load(context, with_code=True)
res = program.instantiate(
entrypoint=entrypoint,
parameter=parameter,
storage=storage,
)
res.begin(stack, stdout, context)
res.execute(stack, stdout, context)
operations, storage, lazy_diff, _ = res.end(stack, stdout, output_mode=output_mode)
return operations, storage, lazy_diff, stdout, None
except MichelsonRuntimeError as e:
stdout.append(e.format_stdout())
return [], None, [], stdout, e
[docs] @staticmethod
def run_callback(
entrypoint: str,
parameter,
storage,
context: ExecutionContext,
) -> Tuple[Any, Any, List[str], Optional[Exception]]:
"""Execute view entrypoint of the contract loaded into the context
:param entrypoint: contract entrypoint
:param parameter: parameter section
:param storage: storage section
:param context: execution context
:returns: [operations, storage, stdout, error]
"""
ctx = ExecutionContext(
shell=context.shell,
key=context.key,
block_id=context.block_id,
script=context.script,
address=context.address,
)
stack = MichelsonStack()
stdout = [] # type: ignore
try:
program = MichelsonProgram.load(ctx, with_code=True)
res = program.instantiate(entrypoint=entrypoint, parameter=parameter, storage=storage)
res.begin(stack, stdout, context)
res.execute(stack, stdout, context)
_, _, _, pair = res.end(stack, stdout)
operations = cast(List[OperationType], list(pair.items[0]))
storage = pair.items[1]
# Note: the `storage` returned by the Michelson interpreter above is not
# required to include the full annotations specified in the contract's storage.
# The lack of annotations affects calls to `to_python_object()`, causing the storage
# you get back from the view to not always be converted to the same object
# as if you called ContractInterface.storage() directly.
# Re-parsing using the contract's storage section here to recover the annotations.
storage = program.storage.from_micheline_value(storage.to_micheline_value())
return [op.to_python_object() for op in operations], storage.to_python_object(), stdout, None
except MichelsonRuntimeError as e:
stdout.append(e.format_stdout())
return None, None, stdout, e
[docs] @staticmethod
def run_view(name: str, parameter, storage, context: ExecutionContext) -> Tuple[Any, Any, Optional[Exception]]:
ctx = ExecutionContext(
shell=context.shell,
key=context.key,
block_id=context.block_id,
script=context.script,
address=context.address,
view_results=context.view_results,
)
stack = MichelsonStack()
stdout = [] # type: ignore
try:
program = MichelsonProgram.load(ctx, with_code=True)
res = program.instantiate_view(name=name, parameter=parameter, storage=storage)
res.begin(stack, stdout, context)
res.execute_view(stack, stdout, context)
ret_value = res.ret(stack, stdout)
return ret_value.to_python_object(), stdout, None
except MichelsonRuntimeError as e:
stdout.append(e.format_stdout())
return None, stdout, e
[docs] @staticmethod
def run_tzt(
script: str,
amount=None,
chain_id=None,
source=None,
sender=None,
balance=None,
block_id=None,
**kwargs,
) -> None:
"""Execute TZT test suite code
:param script: test contract's Michelson code
:param amount: patch AMOUNT
:param chain_id: patch CHAIN_ID
:param source: patch SOURCE
:param sender: patch SENDER
:param balance: patch BALANCE
:param block_id: set block ID
"""
context = ExecutionContext(
amount=amount,
chain_id=chain_id,
source=source,
sender=sender,
balance=balance,
block_id=block_id,
script={'code': script},
tzt=True,
**kwargs,
)
stack = MichelsonStack()
stdout: List[str] = []
program = TztMichelsonProgram.load(context, with_code=True)
res = program.instantiate()
res.fill_context(script, context)
res.register_bigmaps(stack, stdout, context)
res.begin(stack, stdout, context)
res.execute(stack, stdout, context)
res.end(stack, stdout, context)