from decimal import Decimal
from typing import Type
from typing import cast
from pymavryk.context.abstract import AbstractContext
from pymavryk.context.abstract import get_originated_address
from pymavryk.crypto.encoding import base58_decode
from pymavryk.crypto.encoding import is_address
from pymavryk.crypto.encoding import is_chain_id
from pymavryk.crypto.encoding import is_kt
from pymavryk.crypto.encoding import is_pkh
from pymavryk.crypto.encoding import is_public_key
from pymavryk.crypto.encoding import is_sig
from pymavryk.crypto.encoding import is_txr_address
from pymavryk.michelson.forge import forge_address
from pymavryk.michelson.forge import forge_base58
from pymavryk.michelson.forge import forge_contract
from pymavryk.michelson.forge import forge_public_key
from pymavryk.michelson.forge import optimize_timestamp
from pymavryk.michelson.forge import unforge_address
from pymavryk.michelson.forge import unforge_chain_id
from pymavryk.michelson.forge import unforge_contract
from pymavryk.michelson.forge import unforge_public_key
from pymavryk.michelson.forge import unforge_signature
from pymavryk.michelson.format import format_timestamp
from pymavryk.michelson.format import micheline_to_michelson
from pymavryk.michelson.micheline import Micheline
from pymavryk.michelson.micheline import parse_micheline_literal
from pymavryk.michelson.parse import michelson_to_micheline
from pymavryk.michelson.types.base import Undefined
from pymavryk.michelson.types.core import IntType
from pymavryk.michelson.types.core import MichelsonType
from pymavryk.michelson.types.core import NatType
from pymavryk.michelson.types.core import StringType
[docs]class TimestampType(IntType, prim='timestamp'): # type: ignore
[docs] @classmethod
def from_value(cls, value: int) -> 'TimestampType':
return cls(value)
[docs] @classmethod
def from_micheline_value(cls, val_expr) -> 'TimestampType':
value = parse_micheline_literal(val_expr, {'int': int, 'string': optimize_timestamp})
return cls.from_value(value)
[docs] @classmethod
def from_python_object(cls, py_obj) -> 'TimestampType':
if isinstance(py_obj, int):
value = py_obj
elif isinstance(py_obj, str):
value = optimize_timestamp(py_obj)
else:
raise AssertionError(f'unexpected value type {py_obj}')
return cls.from_value(value)
[docs] def to_micheline_value(self, mode='readable', lazy_diff=False):
if mode in ['optimized', 'legacy_optimized']:
return {'int': str(self.value)}
elif mode == 'readable':
return {'string': format_timestamp(self.value)}
else:
raise AssertionError(f'unsupported mode {mode}')
[docs] def to_python_object(self, try_unpack=False, lazy_diff=False, comparable=False):
return self.value
[docs]class MumavType(NatType, prim='mumav'):
def __repr__(self):
return str(Decimal(self.value) / 10**6)
[docs] @classmethod
def from_value(cls, value: int) -> 'MumavType':
assert value >= 0, f'expected natural number, got {value}'
if value.bit_length() > 63:
raise OverflowError(f'mumav overflow, got {value.bit_length()} bits, should not exceed 63')
return cls(value)
[docs] @classmethod
def from_python_object(cls, py_obj) -> 'MumavType':
if isinstance(py_obj, int):
value = py_obj
elif isinstance(py_obj, Decimal):
value = int(py_obj * (10**6))
elif isinstance(py_obj, str):
value = int(Decimal(py_obj) * (10**6))
else:
raise AssertionError(f'unexpected value type {py_obj}')
return cls.from_value(value)
[docs]class AddressType(StringType, prim='address'):
def __repr__(self):
return f'{self.value[:6]}…{self.value[-3:]}'
def __lt__(self, other: 'AddressType') -> bool: # type: ignore
if is_pkh(self.value) and is_kt(other.value):
return True
elif is_kt(self.value) and is_pkh(other.value):
return False
else:
return self.value < other.value
[docs] @classmethod
def dummy(cls, context: AbstractContext) -> 'AddressType':
return cls.from_value(context.get_dummy_address())
[docs] @classmethod
def from_value(cls, value: str) -> 'AddressType':
if value.endswith('%default'):
value = value.split('%')[0]
assert is_address(value), f'expected mv/KT/sr address, got {value}'
return cls(value)
[docs] @classmethod
def from_micheline_value(cls, val_expr) -> 'AddressType':
value = parse_micheline_literal(
val_expr,
{
'bytes': lambda x: unforge_contract(bytes.fromhex(x)),
'string': lambda x: x,
},
)
return cls.from_value(value)
[docs] @classmethod
def from_python_object(cls, py_obj) -> 'AddressType':
return cls.from_value(py_obj)
[docs] def to_micheline_value(self, mode='readable', lazy_diff=False):
if mode in ['optimized', 'legacy_optimized']:
return {'bytes': forge_contract(self.value).hex()} # because address can also have an entrypoint
elif mode == 'readable':
return {'string': self.value}
else:
raise AssertionError(f'unsupported mode {mode}')
[docs] def to_python_object(self, try_unpack=False, lazy_diff=False, comparable=False):
return self.value
[docs]class TXRAddress(StringType, prim='tx_rollup_l2_address'):
def __repr__(self):
return f'{self.value[:6]}…{self.value[-3:]}'
def __lt__(self, other: 'TXRAddress') -> bool: # type: ignore
if is_kt(other.value):
return True
elif is_pkh(other.value):
return False
else:
return self.value < other.value
[docs] @classmethod
def dummy(cls, context: AbstractContext) -> 'TXRAddress':
return cls.from_value(context.get_dummy_txr_address())
[docs] @classmethod
def from_value(cls, value: str) -> 'TXRAddress':
if value.endswith('%default'):
value = value.split('%')[0]
assert is_txr_address(value), f'expected txr1 address, got {value}'
return cls(value)
[docs] @classmethod
def from_micheline_value(cls, val_expr) -> 'TXRAddress':
value = parse_micheline_literal(
val_expr,
{
'bytes': lambda x: unforge_contract(bytes.fromhex(x)),
'string': lambda x: x,
},
)
return cls.from_value(value)
[docs] @classmethod
def from_python_object(cls, py_obj) -> 'TXRAddress':
return cls.from_value(py_obj)
[docs] def to_micheline_value(self, mode='readable', lazy_diff=False):
if mode in ['optimized', 'legacy_optimized']:
return {'bytes': forge_contract(self.value).hex()} # because address can also have an entrypoint
elif mode == 'readable':
return {'string': self.value}
else:
raise AssertionError(f'unsupported mode {mode}')
[docs] def to_python_object(self, try_unpack=False, lazy_diff=False, comparable=False):
return self.value
[docs]class KeyType(StringType, prim='key'):
@property
def raw(self) -> bytes:
return base58_decode(self.value.encode())
@property
def prefix(self) -> str:
return self.value[:4]
def __lt__(self, other: 'KeyType') -> bool: # type: ignore
"""
Keys are ordered as follows: edpk < sppk < p2pk
All keys are in compressed form in Mavryk (flag | X) where flag specifies if Y is odd or even
https://crypto.stackexchange.com/questions/70754/ec-key-compression
For secp256r1 (aka p256) we need to cut the first byte (for unknown reason)
"""
curves = {
'edpk': (0, 0),
'sppk': (1, 0),
'p2pk': (2, 1),
}
res = curves[self.prefix][0] - curves[other.prefix][0]
if res < 0:
return True
elif res > 0:
return False
else:
offset = curves[self.prefix][1]
return self.raw[offset:] < other.raw[offset:]
[docs] @classmethod
def dummy(cls, context: AbstractContext) -> 'KeyType':
return cls.from_value(context.get_dummy_public_key())
[docs] @classmethod
def from_value(cls, value: str) -> 'KeyType':
assert is_public_key(value), f'expected ed/sp/p2 public key, got {value}'
return cls(value)
[docs] @classmethod
def from_micheline_value(cls, val_expr) -> 'KeyType':
value = parse_micheline_literal(
val_expr,
{
'bytes': lambda x: unforge_public_key(bytes.fromhex(x)),
'string': lambda x: x,
},
)
return cls.from_value(value)
[docs] @classmethod
def parse_python_object(cls, py_obj) -> 'KeyType':
return cls.from_value(py_obj)
[docs] def to_micheline_value(self, mode='readable', lazy_diff=False):
if mode in ['optimized', 'legacy_optimized']:
return {'bytes': forge_public_key(self.value).hex()}
elif mode == 'readable':
return {'string': self.value}
else:
raise AssertionError(f'unsupported mode {mode}')
[docs] def to_python_object(self, try_unpack=False, lazy_diff=False, comparable=False):
return self.value
[docs]class KeyHashType(StringType, prim='key_hash'):
[docs] @classmethod
def dummy(cls, context: AbstractContext) -> 'KeyHashType':
return cls.from_value(context.get_dummy_key_hash())
[docs] @classmethod
def from_value(cls, value: str) -> 'KeyHashType':
assert is_pkh(value), f'expected mv1/mv2/mv3 key hash, got {value}'
return cls(value)
[docs] @classmethod
def from_micheline_value(cls, val_expr) -> 'KeyHashType':
value = parse_micheline_literal(
val_expr, {'bytes': lambda x: unforge_address(bytes.fromhex(x)), 'string': lambda x: x}
)
return cls.from_value(value)
[docs] @classmethod
def parse_python_object(cls, py_obj):
return cls.from_value(py_obj)
[docs] def to_micheline_value(self, mode='readable', lazy_diff=False):
if mode in ['optimized', 'legacy_optimized']:
return {'bytes': forge_address(self.value, tz_only=True).hex()}
elif mode == 'readable':
return {'string': self.value}
else:
raise AssertionError(f'unsupported mode {mode}')
[docs] def to_python_object(self, try_unpack=False, lazy_diff=False, comparable=False):
return self.value
[docs]class SignatureType(StringType, prim='signature'):
[docs] @classmethod
def dummy(cls, context: AbstractContext) -> 'SignatureType':
return cls.from_value(context.get_dummy_signature())
[docs] @classmethod
def from_value(cls, value: str) -> 'SignatureType':
assert is_sig(value), f'expected signature, got {value}'
return cls(value)
[docs] @classmethod
def from_micheline_value(cls, val_expr) -> 'SignatureType':
value = parse_micheline_literal(
val_expr,
{
'bytes': lambda x: unforge_signature(bytes.fromhex(x)),
'string': lambda x: x,
},
)
return cls.from_value(value)
[docs] @classmethod
def from_python_object(cls, py_obj) -> 'SignatureType':
return cls.from_value(py_obj)
[docs] def to_micheline_value(self, mode='readable', lazy_diff=False):
if mode in ['optimized', 'legacy_optimized']:
return {'bytes': forge_base58(self.value).hex()}
elif mode == 'readable':
return {'string': self.value}
else:
raise AssertionError(f'unsupported mode {mode}')
[docs] def to_python_object(self, try_unpack=False, lazy_diff=False, comparable=False):
return self.value
[docs]class ChainIdType(StringType, prim='chain_id'):
[docs] @classmethod
def dummy(cls, context: AbstractContext) -> 'ChainIdType':
return cls.from_value(context.get_dummy_chain_id())
[docs] @classmethod
def from_value(cls, value: str) -> 'ChainIdType':
assert is_chain_id(value), f'expected chain id, got {value}'
return cls(value)
[docs] @classmethod
def from_micheline_value(cls, val_expr) -> 'ChainIdType':
value = parse_micheline_literal(
val_expr, {'bytes': lambda x: unforge_chain_id(bytes.fromhex(x)), 'string': lambda x: x}
)
return cls.from_value(value)
[docs] @classmethod
def from_python_object(cls, py_obj) -> 'ChainIdType':
return cls.from_value(py_obj)
[docs] def to_micheline_value(self, mode='readable', lazy_diff=False):
if mode in ['optimized', 'legacy_optimized']:
return {'bytes': forge_base58(self.value).hex()}
elif mode == 'readable':
return {'string': self.value}
else:
raise AssertionError(f'unsupported mode {mode}')
[docs] def to_python_object(self, try_unpack=False, lazy_diff=False, comparable=False):
return self.value
[docs]class ContractType(AddressType, prim='contract', args_len=1):
def __repr__(self):
address, entrypoint = self.get_address(), self.get_entrypoint()
return f'{address[:6]}…{address[-3:]}%{entrypoint}'
[docs] @classmethod
def generate_pydoc(cls, definitions: list, inferred_name=None, comparable=False):
super(ContractType, cls).generate_pydoc(definitions)
param_expr = micheline_to_michelson(cls.args[0].as_micheline_expr())
if cls.args[0].args:
name = cls.field_name or cls.type_name or inferred_name or f'{cls.prim}_{len(definitions)}'
param_name = f'{name}_param'
definitions.insert(0, (param_name, param_expr))
return f'contract (${param_name})'
else:
return f'contract ({param_expr})'
[docs] @classmethod
def from_python_object(cls, py_obj) -> 'ContractType':
if py_obj is None or py_obj is Undefined:
py_obj = get_originated_address(0)
res = super(ContractType, cls).from_python_object(py_obj)
return cast(ContractType, res)
[docs] def get_address(self) -> str:
return self.value.split('%')[0]
[docs] def get_entrypoint(self) -> str:
res = self.value.split('%')
return res[1] if len(res) == 2 else 'default'
[docs] def to_python_object(self, try_unpack=False, lazy_diff=False, comparable=False):
assert not comparable, f'{self.prim} is not comparable'
return super(ContractType, self).to_python_object()
[docs]class LambdaType(MichelsonType, prim='lambda', args_len=2): # type: ignore
def __init__(self, value: Type[Micheline]):
super(LambdaType, self).__init__()
self.value = value
def __repr__(self):
return 'Lambda'
def __eq__(self, other) -> bool:
if not isinstance(other, LambdaType):
return False
# FIXME: That seems ugly
return self.value.as_micheline_expr() == other.value.as_micheline_expr()
[docs] @classmethod
def generate_pydoc(cls, definitions: list, inferred_name=None, comparable=False):
super(LambdaType, cls).generate_pydoc(definitions)
name = cls.field_name or cls.type_name or inferred_name or f'{cls.prim}_{len(definitions)}'
expr = {}
for i, suffix in enumerate(['return', 'param']):
arg_expr = micheline_to_michelson(cls.args[i].as_micheline_expr())
if cls.args[i].args:
arg_name = f'{name}_{suffix}'
definitions.insert(0, (arg_name, arg_expr))
expr[suffix] = f'${arg_name}'
else:
expr[suffix] = arg_expr
return f'lambda ({expr["param"]} -> {expr["return"]})'
[docs] @classmethod
def dummy(cls, context: AbstractContext) -> 'LambdaType':
return cls(Micheline.match(context.get_dummy_lambda()))
[docs] @classmethod
def from_micheline_value(cls, val_expr) -> 'LambdaType':
assert isinstance(val_expr, list), f'expected list, got {type(val_expr).__name__}'
return cls(Micheline.match(val_expr))
[docs] @classmethod
def from_python_object(cls, py_obj) -> 'LambdaType':
assert isinstance(py_obj, str), f'expected string, got {type(py_obj).__name__}'
value = michelson_to_micheline(py_obj)
return cls.from_micheline_value(value)
[docs] def to_literal(self) -> Type[Micheline]:
return self.value
[docs] def to_micheline_value(self, mode='readable', lazy_diff=False):
# TODO: optimized mode -> harcoded values in the code
return self.value.as_micheline_expr()
[docs] def to_python_object(self, try_unpack=False, lazy_diff=False, comparable=False):
assert not comparable, f'{self.prim} is not comparable'
return micheline_to_michelson(self.to_micheline_value())