from pprint import pformat
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
from deprecation import deprecated # type: ignore
from pymavryk.context.impl import ExecutionContext
from pymavryk.context.mixin import ContextMixin
from pymavryk.crypto.encoding import base58_decode
from pymavryk.crypto.encoding import base58_encode
from pymavryk.crypto.encoding import is_bh
from pymavryk.crypto.key import blake2b_32
from pymavryk.jupyter import get_class_docstring
from pymavryk.logging import logger
from pymavryk.michelson.forge import forge_base58
from pymavryk.operation import DEFAULT_BURN_RESERVE
from pymavryk.operation import DEFAULT_GAS_RESERVE
from pymavryk.operation import MAX_OPERATIONS_TTL
from pymavryk.operation.content import ContentMixin
from pymavryk.operation.fees import calculate_fee
from pymavryk.operation.fees import default_fee
from pymavryk.operation.fees import default_gas_limit
from pymavryk.operation.fees import default_storage_limit
from pymavryk.operation.forge import forge_operation_group
from pymavryk.operation.result import OperationResult
from pymavryk.rpc.errors import RpcError
from pymavryk.rpc.kind import validation_passes
[docs]class OperationGroup(ContextMixin, ContentMixin):
"""Operation group representation: contents (single or multiple), signature, other fields,
and also useful helpers for filling with precise fees, signing, forging, and injecting.
"""
def __init__(
self,
context: ExecutionContext,
contents: Optional[List[Dict[str, Any]]] = None,
protocol: Optional[str] = None,
chain_id: Optional[str] = None,
branch: Optional[str] = None,
signature: Optional[str] = None,
opg_hash: Optional[str] = None,
opg_result: Optional[Dict[str, Any]] = None,
) -> None:
super().__init__(context=context)
self.contents = contents or []
self.protocol = protocol
self.chain_id = chain_id
self.branch = branch
self.signature = signature
self.opg_hash = opg_hash
self.opg_result = opg_result
def __repr__(self) -> str:
res = [
super().__repr__(),
'\nPayload',
pformat(self.json_payload()),
'\nHash',
self.opg_hash or '(Not sent)' '\nHelpers',
get_class_docstring(self.__class__),
]
return '\n'.join(res)
def _spawn(self, **kwargs) -> 'OperationGroup':
return OperationGroup(
context=self.context,
contents=kwargs.get('contents', self.contents.copy()),
protocol=kwargs.get('protocol', self.protocol),
chain_id=kwargs.get('chain_id', self.chain_id),
branch=kwargs.get('branch', self.branch),
signature=kwargs.get('signature', self.signature),
opg_hash=kwargs.get('opg_hash', self.opg_hash),
opg_result=kwargs.get('opg_result', self.opg_result),
)
[docs] def json_payload(self) -> Dict[str, Any]:
"""Get JSON payload used for the injection."""
return {
'protocol': self.protocol,
'branch': self.branch,
'contents': self.contents,
'signature': self.signature,
}
[docs] def binary_payload(self) -> bytes:
"""Get binary payload used for injection/hash calculation."""
if not self.signature:
raise ValueError('Not signed')
return bytes.fromhex(self.forge()) + forge_base58(self.signature)
[docs] def operation(self, content: Dict[str, Any]) -> 'OperationGroup':
"""Create new operation group with extra content added.
:param content: Kind-specific operation body
:rtype: OperationGroup
"""
return self._spawn(contents=self.contents + [content])
[docs] def fill(
self,
counter: Optional[int] = None,
ttl: Optional[int] = None,
gas_limit: Optional[int] = None,
storage_limit: Optional[int] = None,
minimal_nanomav_per_gas_unit: Optional[int] = None,
**kwargs,
) -> 'OperationGroup':
"""Try to fill all fields left unfilled, use approximate fees
(not optimal, use `autofill` to simulate operation and get precise values).
:param counter: Override counter value (for manual handling)
:param ttl: Number of blocks to wait in the mempool before removal (default is 5 for public network, MAX for sandbox),
-1 for MAX (if you have a private network and issues with block RPC queries)
:param gas_limit: Override gas_limit value (for manual handling)
:param storage_limit: Override storage_limit value (for manual handling)
:param minimal_nanomav_per_gas_unit: Override minimal_nanomav_per_gas_unit constant (for manual handling)
:rtype: OperationGroup
"""
if kwargs.get('branch_offset') is not None:
logger.warning('`branch_offset` argument is deprecated, use `ttl` instead')
ttl = MAX_OPERATIONS_TTL - kwargs['branch_offset']
if ttl is None:
ttl = self.context.get_operations_ttl()
elif ttl == -1:
ttl = MAX_OPERATIONS_TTL
if not 0 < ttl <= MAX_OPERATIONS_TTL:
raise Exception(f'`ttl` has to be in range (0, {MAX_OPERATIONS_TTL}]')
chain_id = self.chain_id or self.context.get_chain_id()
protocol = self.protocol or self.context.get_protocol()
branch = self.branch or self.shell.blocks[f'head~{MAX_OPERATIONS_TTL - ttl}'].hash()
source = self.key.public_key_hash()
constants = self.shell.head.context.constants()
if counter is not None:
self.context.set_counter(counter - 1) # which is supposedly the current state (head)
if gas_limit is None:
hard_gas_limit_per_content = int(constants['hard_gas_limit_per_operation']) // len(self.contents)
else:
hard_gas_limit_per_content = gas_limit // len(self.contents)
if storage_limit is None:
hard_storage_limit_per_content = int(constants['hard_storage_limit_per_operation']) // len(self.contents)
else:
hard_storage_limit_per_content = storage_limit // len(self.contents)
replace_map = {
'pkh': source,
'source': source,
'delegate': source, # self registration
'counter': lambda i, x: str(self.context.get_counter()),
'secret': lambda i, x: self.key.activation_code,
'period': lambda i, x: str(self.shell.head.voting_period()),
'public_key': lambda i, x: self.key.public_key(),
'gas_limit': lambda i, x: str(
min(
hard_gas_limit_per_content,
gas_limit if gas_limit is not None else default_gas_limit(x, constants),
)
),
'storage_limit': lambda i, x: str(
min(
hard_storage_limit_per_content,
storage_limit if storage_limit is not None else default_storage_limit(x, constants),
)
),
'fee': lambda i, x: str(default_fee(x, gas_limit, minimal_nanomav_per_gas_unit) if i == 0 else 0),
}
def fill_content(idx, content):
content = content.copy()
for k, v in replace_map.items():
if content.get(k) in ['', '0']:
content[k] = v(idx, content) if callable(v) else v
return content
return self._spawn(
contents=[fill_content(idx=i, content=x) for i, x in enumerate(self.contents)],
protocol=protocol,
chain_id=chain_id,
branch=branch,
)
[docs] def run(self, block_id: str = 'head'):
"""Simulate operation without signature checks.
:param block_id: Specify a level at which this operation should be applied (default is head)
:returns: RPC response from `run_operation`
"""
return self.shell.blocks[block_id].helpers.scripts.run_operation.post(
{
'operation': {
'branch': self.branch,
'contents': self.contents,
'signature': base58_encode(b'0' * 64, b'sig').decode(),
},
'chain_id': self.chain_id,
}
)
[docs] def forge(self, validate=False) -> str:
"""Convert json representation of the operation group into bytes.
:param validate: Forge remotely also and compare results, default is False
:returns: Hex string
"""
payload = {
'branch': self.branch,
'contents': self.contents,
}
local_data = forge_operation_group(payload).hex()
if validate:
remote_data = self.shell.blocks[self.branch].helpers.forge.operations.post(payload)
if local_data != remote_data:
raise ValueError(f'Local forge result differs from remote one:\n\n{local_data}\n\n{remote_data}')
return local_data
[docs] def message(self, block: Union[str, int] = 'genesis') -> bytes:
"""Get payload for the failing noop operation
:param block: Specify operation branch (default is genesis)
:returns: Message bytes
"""
if len(self.contents) != 1 or self.contents[0]['kind'] != 'failing_noop':
raise NotImplementedError('Use for signing messages only')
branch = block if is_bh(str(block)) else self.shell.blocks[block].hash()
return b'\x03' + bytes.fromhex(self._spawn(branch=branch).forge())
[docs] def autofill(
self,
gas_reserve: int = DEFAULT_GAS_RESERVE,
burn_reserve: int = DEFAULT_BURN_RESERVE,
counter: Optional[int] = None,
ttl: Optional[int] = None,
fee: Optional[int] = None,
gas_limit: Optional[int] = None,
storage_limit: Optional[int] = None,
**kwargs,
) -> 'OperationGroup':
"""Fill the gaps and then simulate the operation in order to calculate fee, gas/storage limits.
:param gas_reserve: Add a safe reserve for dynamically calculated gas limit (default is 100).
:param burn_reserve: Add a safe reserve for dynamically calculated storage limit (default is 100).
:param counter: Override counter value (for manual handling)
:param ttl: Number of blocks to wait in the mempool before removal (default is 5 for public network, 60 for sandbox)
:param fee: Explicitly set fee for operation. If not set fee will be calculated depending on results of operation dry-run.
:param gas_limit: Explicitly set gas limit for operation. If not set gas limit will be calculated depending on results of
operation dry-run. In case of batch will be evenly split between operations.
:param storage_limit: Explicitly set storage limit for operation. If not set storage limit will be calculated depending on
results of operation dry-run. In case of batch will be evenly split between operations.
:rtype: OperationGroup
"""
if kwargs.get('branch_offset') is not None:
logger.warning('`branch_offset` argument is deprecated, use `ttl` instead')
ttl = MAX_OPERATIONS_TTL - kwargs['branch_offset']
opg = self.fill(counter=counter, ttl=ttl)
opg_with_metadata = opg.run()
if not OperationResult.is_applied(opg_with_metadata):
raise RpcError.from_errors(OperationResult.errors(opg_with_metadata))
fee_acc = 0
extra_size = 32 + 64 # size of serialized branch and signature + safe reserve
num_contents = len(opg_with_metadata['contents'])
counter_offset = self.context.get_counter_offset()
opg.contents.clear()
for content in opg_with_metadata['contents']:
if validation_passes[content['kind']] == 3:
if gas_limit is not None:
gas_limit_new = gas_limit // num_contents
else:
gas_limit_new = OperationResult.consumed_gas(content)
if content['kind'] in ['origination', 'transaction']:
gas_limit_new += gas_reserve
if storage_limit is not None:
storage_limit_new = storage_limit // num_contents
else:
paid_storage_size_diff = OperationResult.paid_storage_size_diff(content)
burned = OperationResult.burned(content)
storage_limit_new = paid_storage_size_diff + burned
if content['kind'] in ['origination', 'transaction']:
storage_limit_new += burn_reserve
current_counter = int(content['counter'])
content.update(
counter=str(current_counter + counter_offset),
gas_limit=str(gas_limit_new),
storage_limit=str(storage_limit_new),
fee='0',
)
fee_acc += calculate_fee(content, gas_limit_new, extra_size=1 + extra_size // num_contents)
content.pop('metadata')
logger.debug("autofilled transaction content: %s" % content)
opg.contents.append(content)
if fee or fee_acc:
opg.contents[0]['fee'] = str(fee if fee is not None else fee_acc)
return opg
[docs] def sign(self) -> 'OperationGroup':
"""Sign the operation group with the key specified by `using`.
:rtype: OperationGroup
"""
validation_pass = validation_passes[self.contents[0]['kind']]
if any(map(lambda x: validation_passes[x['kind']] != validation_pass, self.contents)):
raise ValueError('Mixed validation passes')
if validation_pass == 0:
if self.chain_id is None:
raise ValueError('Chain ID is undefined, run .fill first')
watermark = b'\x02' + base58_decode(self.chain_id.encode())
else:
watermark = b'\x03'
message = watermark + bytes.fromhex(self.forge())
signature = self.key.sign(message=message, generic=True)
return self._spawn(signature=signature)
[docs] def hash(self) -> str:
"""Calculate the Base58 encoded operation group hash."""
hash_digest = blake2b_32(self.binary_payload()).digest()
return base58_encode(hash_digest, b'o').decode()
[docs] def run_operation(self, block_id: str = 'head'):
"""Simulate operation without signature checks.
:param block_id: Specify a level at which this operation should be applied (default is head)
:returns: RPC response from `run_operation`
"""
return self.run(block_id)
[docs] @deprecated(deprecated_in='3.1.0', removed_in='4.0.0', details='use `run_operation()` instead')
def preapply(self):
"""Preapply signed operation group.
:returns: RPC response from `preapply`
"""
if not self.signature:
raise ValueError('Not signed')
return self.run_operation()
[docs] def send(
self,
gas_reserve: int = DEFAULT_GAS_RESERVE,
burn_reserve: int = DEFAULT_BURN_RESERVE,
min_confirmations: int = 0,
ttl: Optional[int] = None,
) -> 'OperationGroup':
"""
:param gas_reserve: Add a safe reserve for dynamically calculated gas limit (default is 100).
:param burn_reserve: Add a safe reserve for dynamically calculated storage limit (default is 100).
:param min_confirmations: number of block injections to wait for before returning (default is 0, i.e. async mode)
:param ttl: Number of blocks to wait in the mempool before removal (default is 5 for public network, 60 for sandbox)
:return: OperationGroup with hash filled
"""
if ttl is None:
ttl = self.context.get_operations_ttl()
opg = self.autofill(gas_reserve=gas_reserve, burn_reserve=burn_reserve, ttl=ttl).sign()
res = opg.inject(min_confirmations=min_confirmations, num_blocks_wait=ttl)
return opg._spawn(opg_hash=res['hash'], opg_result=res)
[docs] def send_async(
self,
ttl: int,
counter: int,
gas_limit: int,
storage_limit: int,
minimal_nanomav_per_gas_unit: Optional[int] = None,
) -> 'OperationGroup':
"""
Send operation without simulation or pre-validation
:param ttl: Number of blocks to wait in the mempool before removal (default is 5 for public network, 60 for sandbox)
:param counter: Set counter value
:param gas_limit: Set gas_limit value
:param storage_limit: Set storage_limit value
:param minimal_nanomav_per_gas_unit: Override minimal_nanomav_per_gas_unit constant
:rtype: OperationGroup
"""
opg = self.fill(
counter=counter,
ttl=ttl,
gas_limit=gas_limit,
storage_limit=storage_limit,
minimal_nanomav_per_gas_unit=minimal_nanomav_per_gas_unit,
).sign()
res = opg.inject(prevalidate=False)
return opg._spawn(opg_hash=res['hash'])
[docs] def inject(
self,
check_result: bool = True,
num_blocks_wait: int = 5,
time_between_blocks: Optional[int] = None,
block_timeout: Optional[int] = None,
min_confirmations: int = 0,
prevalidate: bool = True,
**kwargs,
):
"""Inject the signed operation group.
:param check_result: raise RpcError in case operation is applied but has runtime errors
:param num_blocks_wait: number of blocks to wait for injection
:param time_between_blocks: override the corresponding parameter from constants
:param block_timeout: set block timeout (by default PyMavryk will wait for a long time)
:param min_confirmations: number of block injections to wait for before returning
:param prevalidate: ask node to pre-validate the operation before the injection (True by default)
:returns: operation group with metadata (raw RPC response)
"""
self.context.reset() # reset counter
opg_hash = self.shell.injection.operation.post(
operation=self.binary_payload(),
_async=not prevalidate,
)
if min_confirmations == 0:
return {
'chain_id': self.chain_id,
'hash': opg_hash,
**self.json_payload(),
}
operations = self.shell.wait_operations(
opg_hashes=[opg_hash],
ttl=num_blocks_wait,
min_confirmations=min_confirmations,
time_between_blocks=time_between_blocks,
block_timeout=block_timeout,
)
assert len(operations) == 1
if check_result:
if not OperationResult.is_applied(operations[0]):
raise RpcError.from_errors(OperationResult.errors(operations[0]))
return operations[0]
[docs] @deprecated(deprecated_in='3.1.0', removed_in='4.0.0', details='use `run_operation()` instead')
def result(self) -> List[OperationResult]:
"""Parse the preapply result.
:rtype: List[OperationResult]
"""
return OperationResult.from_operation_group(self.preapply())