Source code for pymavryk.contract.interface

import json
import logging
from decimal import Decimal
from functools import cached_property
from functools import lru_cache
from os.path import exists
from os.path import expanduser
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Type
from typing import Union
from typing import cast
from urllib.parse import urlparse

import requests
from deprecation import deprecated  # type: ignore

from pymavryk.context.mixin import ContextMixin
from pymavryk.context.mixin import ExecutionContext
from pymavryk.contract.data import ContractData
from pymavryk.contract.entrypoint import ContractEntrypoint
from pymavryk.contract.metadata import ContractMetadata
from pymavryk.contract.result import ContractCallResult
from pymavryk.contract.token_metadata import ContractTokenMetadata
from pymavryk.contract.view import ContractView
from pymavryk.crypto.key import Key
from pymavryk.jupyter import get_class_docstring
from pymavryk.logging import logger
from pymavryk.michelson.format import micheline_to_michelson
from pymavryk.michelson.micheline import MichelsonRuntimeError
from pymavryk.michelson.parse import michelson_to_micheline
from pymavryk.michelson.program import MichelsonProgram
from pymavryk.michelson.sections import ViewSection
from pymavryk.michelson.types import BigMapType
from pymavryk.michelson.types import BytesType
from pymavryk.michelson.types.base import generate_pydoc
from pymavryk.operation.group import OperationGroup
from pymavryk.rpc import ShellQuery


class ContractTokenMetadataProxy:
    """Get TZIP-21 contract token metadata by token_id"""

    def __init__(self, fn: Callable) -> None:
        self._fn = fn

    def __getitem__(self, item):
        return self._fn(item)


[docs]class ContractInterface(ContextMixin): """Proxy class for interacting with a contract.""" program: MichelsonProgram def __init__(self, context: ExecutionContext) -> None: super().__init__(context=context) self._logger = logging.getLogger(__name__) self._storage: Optional[ContractData] = None self.entrypoints = self.program.parameter.list_entrypoints() self.views = {view.name: view for view in self.program.views} # type: Dict[str, Type[ViewSection]] for entrypoint, ty in self.entrypoints.items(): if entrypoint == 'token_metadata': continue attr = ContractEntrypoint(context=context, entrypoint=entrypoint) attr.__doc__ = generate_pydoc(ty, entrypoint) assert not hasattr(self, entrypoint), f'Entrypoint name collision {entrypoint}' setattr(self, entrypoint, attr) for view_name, view_ty in self.views.items(): view_attr = ContractView( context=context, name=view_name, parameter=view_ty.args[1].as_micheline_expr(), return_type=view_ty.args[2].as_micheline_expr(), code=view_ty.args[3].as_micheline_expr(), # type: ignore ) view_attr.__doc__ = view_ty.generate_pydoc() # type: ignore assert not hasattr(self, view_name), f'View name collision {view_name}' setattr(self, view_name, view_attr) def __repr__(self) -> str: res = [ super().__repr__(), '.storage\t# access storage data at block `block_id`', '.parameter\t# root entrypoint', '\nEntrypoints', *list(map(lambda x: f'.{x}()', self.entrypoints)), '\nViews', *list(map(lambda x: f'.{x}()', self.views)), '\nHelpers', get_class_docstring(self.__class__, attr_filter=lambda x: x not in self.entrypoints), ] return '\n'.join(res) def __getattr__(self, item: str) -> ContractEntrypoint: raise AttributeError(f'unexpected entrypoint {item}')
[docs] @staticmethod def from_url(url: str, context: Optional[ExecutionContext] = None) -> 'ContractInterface': """Create contract from michelson source code available via URL :param url: link to the Michelson file :param context: optional execution context :rtype: ContractInterface """ res = requests.get(url) if res.status_code != 200: raise ValueError(f'cannot fetch `{url} {res.status_code}`', res.text) return ContractInterface.from_michelson(res.text, context)
[docs] @staticmethod def from_file(path: str, context: Optional[ExecutionContext] = None) -> 'ContractInterface': """Create contract from michelson source code stored in a file. :param path: Path to the `.tz` file :param context: optional execution context :rtype: ContractInterface """ with open(expanduser(path)) as f: return ContractInterface.from_michelson(f.read(), context)
[docs] @staticmethod def from_michelson(source: str, context: Optional[ExecutionContext] = None) -> 'ContractInterface': """Create contract from michelson source code. :param source: Michelson source code :param context: optional execution context :rtype: ContractInterface """ return ContractInterface.from_micheline(michelson_to_micheline(source), context)
[docs] @staticmethod def from_micheline( expression: List[Dict[str, Any]], context: Optional[ExecutionContext] = None, ) -> 'ContractInterface': """Create contract from micheline expression. :param expression: [{'prim': 'parameter'}, {'prim': 'storage'}, {'prim': 'code'}] :param context: optional execution context :rtype: ContractInterface """ if context is not None: code_expr = context.resolve_global_constants(expression) else: code_expr = expression program = MichelsonProgram.match(code_expr) cls = type(ContractInterface.__name__, (ContractInterface,), {'program': program}) context = ExecutionContext( shell=context.shell if context else None, key=context.key if context else None, script={'code': code_expr}, global_constants=context.global_constants if context else None, ) return cls(context)
[docs] @staticmethod def from_context(context: ExecutionContext) -> 'ContractInterface': """Create contract from the previously loaded context data. :param context: execution context :return: ContractInterface """ program = MichelsonProgram.load(context, with_code=True) cls = type(ContractInterface.__name__, (ContractInterface,), {'program': program}) return cls(context)
[docs] @classmethod @deprecated( deprecated_in='3.0.0', removed_in='4.0.0', details='use one of `from_file`, `from_michelson`, `from_micheline`, `from_url`', ) def create_from(cls, source): """Create contract interface from its code. :param source: Michelson code, filename, or Micheline JSON :rtype: ContractInterface """ if isinstance(source, str): if exists(expanduser(source)): return ContractInterface.from_file(source) return ContractInterface.from_michelson(source) return ContractInterface.from_micheline(source)
[docs] def to_micheline(self) -> List[Dict[str, Any]]: """Get contract script in Micheline JSON :return: [{'prim': 'parameter'}, {'prim': 'storage'}, {'prim': 'code'}] """ return self.program.as_micheline_expr()
[docs] def to_michelson(self) -> str: """Get contract listing in formatted Michelson :return: string """ return micheline_to_michelson(self.to_micheline())
[docs] def to_file(self, path: str) -> None: """Write contract source to a .tz file :param path: path to the file """ with open(path, 'w+') as f: f.write(self.to_michelson())
[docs] @deprecated(deprecated_in='3.0.0', removed_in='4.0.0', details='use `.storage[path][to][big_map][key]()` instead') def big_map_get(self, path): """Get BigMap entry as Python object by plain key and block height. :param path: JSON path to the key (or just key to access default BigMap location). Use `/` to separate nodes and `::` to separate tuple args. In any other case you'd need to escape those symbols. :returns: object """ node = self.storage for item in path.split('/'): if len(item) == 0: continue if isinstance(item, str): res = item.split('::') item = tuple(res) if len(res) > 1 else item node = node[item] return node() if node else None
[docs] def using( self, shell: Optional[Union[ShellQuery, str]] = None, key: Optional[Union[Key, str]] = None, block_id: Optional[Union[str, int]] = None, mode: Optional[str] = None, ipfs_gateway: Optional[str] = None, ) -> 'ContractInterface': """Change the block at which the current contract is inspected. Also, if address is undefined you can specify RPC endpoint, and private key. :param shell: one of 'mainnet', '***net', or RPC node uri, or instance of :class:`pymavryk.rpc.shell.ShellQuery` :param key: base58 encoded key, path to the faucet file, alias from octez-client, or instance of `Key` :param block_id: block height / hash / offset to use, default is `head` :param mode: whether to use `readable` or `optimized` encoding for parameters/storage/other :param ipfs_gateway: override IPFS gateway URI :rtype: ContractInterface """ has_address = self.context.address is not None return type(self)( self._spawn_context( shell=None if has_address else shell, key=None if has_address else key, address=self.context.address, block_id=block_id, mode=mode, ipfs_gateway=ipfs_gateway, ) )
@property def storage(self) -> ContractData: if self._storage: return self._storage elif self.address: expr = self.shell.blocks[self.context.block_id].context.contracts[self.address].storage() storage = self.program.storage.from_micheline_value(expr) storage.attach_context(self.context) else: storage = self.program.storage.dummy(self.context) return ContractData(self.context, storage.item, title="storage") @storage.setter def storage(self, storage: ContractData) -> None: if self.address: raise Exception('Can\'t set storage of deployed contract') self._storage = storage
[docs] def storage_from_file(self, path: str) -> None: """Load contract storage from file :param path: path to .tz file """ with open(path) as file: expr = michelson_to_micheline(file.read()) self.storage_from_micheline(expr)
[docs] def storage_from_micheline(self, expression) -> None: """Load contract storage from Micheline expression :param expression: Micheline expression """ storage = self.program.storage.from_micheline_value(expression) storage.attach_context(self.context) self.storage = ContractData(self.context, storage.item, title="storage")
[docs] def storage_from_michelson(self, source: str) -> None: """Load contract storage from Michelson code :param source: Michelson code """ expr = michelson_to_micheline(source) self.storage_from_micheline(expr)
@cached_property def metadata(self) -> Optional[ContractMetadata]: """Get TZIP-016 contract metadata, if exists :rtype: ContractMetadata """ metadata_url = self.metadata_url if metadata_url is None: return None logger.info('Trying to fetch contract metadata from `%s`', metadata_url) parsed_url = urlparse(metadata_url) if parsed_url.scheme in ('http', 'https'): # NOTE: KT1B34qXVRfQrScEaqjjt6cJ5G8LtVFZ7fSc metadata = ContractMetadata.from_url(metadata_url, self.context) elif parsed_url.scheme == 'ipfs': # NOTE: KT1AFA2mwNUMNd4SsujE1YYp29vd8BZejyKW metadata = ContractMetadata.from_ipfs(parsed_url.netloc, self.context) elif parsed_url.scheme == 'mavryk-storage': parts = parsed_url.path.split('/') if len(parts) == 1: # NOTE: KT1JBThDEqyqrEHimhxoUBCSnsKAqFcuHMkP storage = self.storage elif len(parts) == 2: # NOTE: KT1REEb5VxWRjcHm5GzDMwErMmNFftsE5Gpf context = self._spawn_context(address=parsed_url.netloc) storage = ContractInterface.from_context(context).storage else: raise NotImplementedError('Unknown metadata URL scheme') metadata_json = json.loads(storage['metadata'][parts[-1]]().decode()) metadata = ContractMetadata.from_json(metadata_json, self.context) elif parsed_url.scheme == 'sha256': raise NotImplementedError else: raise NotImplementedError('Unknown metadata URL scheme') return metadata @property def token_metadata(self) -> ContractTokenMetadataProxy: """Get TZIP-021 contract token metadata proxy :rtype: ContractTokenMetadataProxy """ return ContractTokenMetadataProxy(self._get_token_metadata) # type: ignore @lru_cache(maxsize=1000) # noqa: B019 def _get_token_metadata(self, token_id: int) -> Optional[ContractTokenMetadata]: token_metadata = self._get_token_metadata_from_view(token_id) if token_metadata is None: token_metadata = self._get_token_metadata_from_storage(token_id) return token_metadata def _get_token_metadata_from_storage(self, token_id: int) -> Optional[ContractTokenMetadata]: self._logger.info('Trying to fetch token %s metadata from storage', token_id) try: token_metadata_url = self.storage['token_metadata'][token_id]['token_info']['']().decode() # FIXME: Dirty except (KeyError, AssertionError): self._logger.info('Storage doesn\'t contain metadata URL for token %s', token_id) return None self._logger.info('Trying to fetch contract token metadata from `%s`', token_metadata_url) parsed_url = urlparse(token_metadata_url) if parsed_url.scheme in ('http', 'https'): token_metadata = ContractTokenMetadata.from_url(token_metadata_url, self.context) elif parsed_url.scheme == 'ipfs': token_metadata = ContractTokenMetadata.from_ipfs(parsed_url.netloc, self.context) elif parsed_url.scheme == 'mavryk-storage': parts = parsed_url.path.split('/') if len(parts) == 1: storage = self.storage elif len(parts) == 2: context = self._spawn_context(address=parsed_url.netloc) storage = ContractInterface.from_context(context).storage else: raise NotImplementedError('Unknown metadata URL scheme') token_metadata_json = json.loads(storage['metadata'][parts[-1]]().decode()) token_metadata = ContractTokenMetadata.from_json(token_metadata_json, self.context) elif parsed_url.scheme == 'sha256': raise NotImplementedError else: raise NotImplementedError('Unknown metadata URL scheme') return token_metadata def _get_token_metadata_from_view(self, token_id: int) -> Optional[ContractTokenMetadata]: self._logger.info('Trying to fetch token %s metadata from off-chain view', token_id) try: token_metadata_json = cast(ContractMetadata, self.metadata).tokenMetadata(token_id).storage_view()[1] return ContractTokenMetadata.from_json(token_metadata_json) except KeyError: self._logger.info('There\'s no off-chain view named `token_metadata`') return None except MichelsonRuntimeError: self._logger.info('Off-chain view has no token metadata for token_id %s', token_id) return None @cached_property def metadata_url(self) -> Optional[str]: try: metadata = self.storage.data.find(lambda x: x.field_name == 'metadata' and isinstance(x, BigMapType)) if metadata is not None: metadata_url = metadata[''] if isinstance(metadata_url, BytesType): return metadata_url.value.decode() else: self._logger.info('Empty string key is not found in metadata big map') else: self._logger.info('Metadata big map not found') # FIXME: Dirty except (KeyError, AssertionError): self._logger.info('Failed to get metadata URI') return None @property def parameter(self) -> ContractEntrypoint: root_name = self.program.parameter.root_name assert root_name in self.entrypoints, 'root entrypoint is undefined' return getattr(self, root_name) @property # type: ignore @deprecated(deprecated_in='3.0.0', removed_in='4.0.0', details='access `ContractInterface` directly') def contract(self) -> 'ContractInterface': return self @property # type: ignore @deprecated(deprecated_in='3.0.0', removed_in='4.0.0', details='use `to_michelson()` instead') def text(self) -> str: return self.to_michelson() @property # type: ignore @deprecated(deprecated_in='3.0.0', removed_in='4.0.0', details='use `to_micheline()` instead') def code(self): return self.to_micheline() @property # type: ignore @deprecated(deprecated_in='3.0.0', removed_in='4.0.0', details='use `default()` instead') def call(self) -> ContractEntrypoint: return self.parameter
[docs] def operation_result(self, operation_group: Dict[str, Any]) -> List[ContractCallResult]: """Get operation parameters, and resulting storage as Python objects. Can locate operation inside operation groups with multiple contents and/or internal operations. :param operation_group: {'branch', 'protocol', 'contents', 'signature'} :rtype: ContractCallResult """ return ContractCallResult.from_run_operation(operation_group, context=self.context)
[docs] def script(self, initial_storage=None, mode: Optional[str] = None) -> Dict[str, Any]: """Generate script for contract origination. :param initial_storage: Python object, leave None to generate default (attach shell/key for smart fill) :param mode: whether to use `readable` or `optimized` (or `legacy_optimized`) encoding for initial storage :return: {"code": $Micheline, "storage": $Micheline} """ if initial_storage: storage = self.program.storage.from_python_object(initial_storage) else: storage = self.program.storage.dummy(self.context) return { 'code': self.program.as_micheline_expr(), 'storage': storage.to_micheline_value(mode=mode or self.context.mode, lazy_diff=True), }
[docs] def originate( self, initial_storage=None, mode: Optional[str] = None, balance: Union[int, Decimal] = 0, delegate: Optional[str] = None, ) -> OperationGroup: """Create an origination operation :param initial_storage: Python object, leave None to generate default :param mode: whether to use `readable` or `optimized` (or `legacy_optimized`) encoding for initial storage :param balance: initial balance :param delegate: initial delegator :rtype: OperationGroup """ return OperationGroup(context=self._spawn_context()).origination( script=self.script(initial_storage, mode=mode), balance=balance, delegate=delegate, )
@deprecated(deprecated_in='3.0.0', removed_in='4.0.0', details='use `ContractInterface` instead') class Contract(ContractInterface): pass