Source code for pymavryk.contract.view

from pprint import pformat
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from pymavryk.context.impl import ExecutionContext
from pymavryk.context.mixin import ContextMixin
from pymavryk.contract.call import ContractCallResult
from pymavryk.contract.call import skip_nones
from pymavryk.jupyter import get_class_docstring
from pymavryk.logging import logger
from pymavryk.michelson.micheline import MichelsonRuntimeError
from pymavryk.michelson.repl import Interpreter
from pymavryk.michelson.types.base import MichelsonType
from pymavryk.michelson.types.base import generate_pydoc


def format_view_script(param_ty_expr, storage_ty_expr, return_ty_expr, code_expr):
    return {
        'code': [
            {
                'prim': 'parameter',
                'args': [
                    {
                        'prim': 'pair',
                        'args': [
                            param_ty_expr,
                            storage_ty_expr['args'][0],
                        ],
                    },
                ],
            },
            {
                'prim': 'storage',
                'args': [
                    {
                        'prim': 'option',
                        'args': [return_ty_expr],
                    },
                ],
            },
            {
                'prim': 'code',
                'args': [
                    [
                        {'prim': 'CAR'},
                        code_expr,
                        {'prim': 'SOME'},
                        {
                            'prim': 'NIL',
                            'args': [
                                {'prim': 'operation'},
                            ],
                        },
                        {'prim': 'PAIR'},
                    ]
                ],
            },
        ]
    }


def format_view_params(param_expr, storage_expr):
    return {
        'entrypoint': 'default',
        'value': {
            'prim': 'Pair',
            'args': [param_expr, storage_expr],
        },
    }


[docs]class ContractView(ContextMixin): """Proxy class for handling off-chain and on-chain views""" def __init__( self, context: ExecutionContext, name: str, parameter: Optional[Dict[str, Any]], return_type: Dict[str, Any], code: List[Any], ) -> None: super().__init__(context=context) self.name = name self.param_ty_expr = parameter or {'prim': 'unit'} self.return_ty_expr = return_type self.code_expr = code self.__doc__ = generate_pydoc(MichelsonType.match(self.param_ty_expr), title=name) def __repr__(self) -> str: res = [ super().__repr__(), f'.name\t{self.name}', f'\nBuiltin\n(*args, **kwargs) # build view parameters (see typedef)', f'\nTypedef\n{self.__doc__}', '\nHelpers', get_class_docstring(self.__class__), ] return '\n'.join(res) def __call__(self, *args, **kwargs) -> 'ContractViewCall': """Spawn a contract call proxy initialized with the entrypoint name :param args: entrypoint args :param kwargs: entrypoint key-value args :rtype: ContractViewCall """ if args: if len(args) == 1: py_obj = args[0] else: py_obj = args elif kwargs: py_obj = kwargs else: py_obj = None try: param_ty = MichelsonType.match(self.param_ty_expr) param_expr = param_ty.from_python_object(py_obj).to_micheline_value() except MichelsonRuntimeError as e: logger.info(self.__doc__) raise ValueError(f'Unexpected arguments: {pformat(py_obj)}', *e.args) from e return ContractViewCall( context=self.context, param_expr=param_expr, param_ty_expr=self.param_ty_expr, return_ty_expr=self.return_ty_expr, code_expr=self.code_expr, name=self.name, )
class ContractViewCall(ContextMixin): """Proxy class encapsulating a contract call: contract type scheme, contract address, parameters, and amount""" def __init__( self, context: ExecutionContext, param_expr: dict, param_ty_expr: dict, return_ty_expr: dict, code_expr: list, name: str, ) -> None: super().__init__(context=context) self.name = name self.param_expr = param_expr self.param_ty_expr = param_ty_expr self.return_ty_expr = return_ty_expr self.code_expr = code_expr def __repr__(self) -> str: res = [ super().__repr__(), f'.name\t{self.name}', '\nHelpers', get_class_docstring(self.__class__), ] return '\n'.join(res) def _encode_storage(self, storage=None): try: storage_ty = MichelsonType.match(self.context.storage_expr) if storage is None: return storage_ty.dummy(self.context).to_micheline_value(lazy_diff=True) else: return storage_ty.from_python_object(storage).to_micheline_value(lazy_diff=True) except MichelsonRuntimeError as e: logger.info(self.__doc__) raise ValueError(f'Unexpected storage object: {pformat(storage)}', *e.args) from e def _get_storage(self, storage=None): if storage is None: storage_expr = self.context.get_storage_value(self.address) if storage_expr is None: return self._encode_storage() else: return storage_expr else: return self._encode_storage(storage) def _prepare_query( self, parameters=None, source=None, sender=None, balance=None, chain_id=None, gas_limit=None, ): script = format_view_script( param_ty_expr=self.param_ty_expr, storage_ty_expr=self.context.storage_expr, return_ty_expr=self.return_ty_expr, code_expr=self.code_expr, ) return_ty = MichelsonType.match(self.return_ty_expr) initial_storage = return_ty.dummy(self.context).to_micheline_value(lazy_diff=True) return skip_nones( script=script, storage=initial_storage, entrypoint=parameters['entrypoint'], input=parameters['value'], amount='0', chain_id=chain_id or self.context.get_chain_id(), source=sender, payer=source, balance=str(balance or 0), gas=str(gas_limit) if gas_limit is not None else None, ) def run_code( self, storage=None, source=None, sender=None, balance=None, chain_id=None, gas_limit=None, ) -> ContractCallResult: """Execute using RPC interpreter. :param storage: initial storage as Python object, leave None if you want to generate a dummy one :param source: patch SOURCE :param sender: patch SENDER :param balance: patch BALANCE :param chain_id: patch CHAIN_ID :param gas_limit: restrict max consumed gas :rtype: ContractCallResult """ parameters = format_view_params(param_expr=self.param_expr, storage_expr=self._encode_storage(storage)) query = self._prepare_query( source=source, sender=sender, balance=balance, chain_id=chain_id, gas_limit=gas_limit, parameters=parameters ) res = self.shell.blocks[self.block_id].helpers.scripts.run_code.post(query) return ContractCallResult.from_run_code(res, parameters=parameters, context=self.context) def run_view(self, source=None, sender=None, chain_id=None, gas_limit=None, level=None, now=None) -> Any: """Execute using special RPC endpoint for on-chain views. :param source: patch SOURCE :param sender: patch SENDER :param chain_id: patch CHAIN_ID :param gas_limit: restrict max consumed gas :param level: patch LEVEL :param now: patch NOW :returns: decoded view result """ query = skip_nones( contract=self.address, view=self.name, input=self.param_expr, source=sender, payer=source, chain_id=chain_id or self.context.get_chain_id(), unparsing_mode="Readable", unlimited_gas=gas_limit is None, gas=str(gas_limit) if gas_limit is not None else None, level=str(level) if level is not None else None, now=str(now) if now is not None else None, ) res = self.shell.blocks[self.block_id].helpers.scripts.run_script_view.post(query) py_obj = MichelsonType.match(self.return_ty_expr).from_micheline_value(res['data']).to_python_object() return py_obj def storage_view(self, storage=None): """Get return value of an off-chain view. :param storage: override current contract storage (as Python object) :returns: Decoded return value """ parameters = format_view_params( param_expr=self.param_expr, storage_expr=self._get_storage(storage), ) script = format_view_script( param_ty_expr=self.param_ty_expr, storage_ty_expr=self.context.storage_expr, return_ty_expr=self.return_ty_expr, code_expr=self.code_expr, ) _, storage, stdout, error = Interpreter.run_callback( parameter=parameters['value'], entrypoint=parameters['entrypoint'], storage={'prim': 'None'}, context=self._spawn_context(script=script, address=self.address), ) if error: logger.debug('\n'.join(stdout)) raise error return storage # type: ignore def onchain_view(self, storage=None, balance=None, view_results: Optional[Dict[str, Any]] = None): """Get return value of an on-chain view (not supporting external view calls). :param storage: override current contract storage (as Python object) :param balance: patch BALANCE :param view_results: patch VIEW calls (keys must be string "address%view", values => Python objects) :returns: Decoded return value """ ret, stdout, error = Interpreter.run_view( name=self.name, parameter=self.param_expr, storage=self._get_storage(storage), context=self._spawn_context( balance=balance, script={ **self.context.script, # type: ignore 'storage': self._encode_storage(storage), }, view_results=view_results, ), ) if error: logger.debug('\n'.join(stdout)) raise error return ret # type: ignore