from hashlib import sha3_256
from hashlib import sha256
from hashlib import sha512
from typing import Callable
from typing import List
from typing import Tuple
from typing import cast
from py_ecc import optimized_bls12_381 as bls12_381
from py_ecc.fields import optimized_bls12_381_FQ12 as FQ12
from pymavryk.context.abstract import AbstractContext
from pymavryk.crypto.keccak import Keccak256
from pymavryk.crypto.key import Key
from pymavryk.crypto.key import blake2b_32
from pymavryk.michelson.instructions.base import MichelsonInstruction
from pymavryk.michelson.instructions.base import format_stdout
from pymavryk.michelson.stack import MichelsonStack
from pymavryk.michelson.types import BLS12_381_G1Type
from pymavryk.michelson.types import BLS12_381_G2Type
from pymavryk.michelson.types import BoolType
from pymavryk.michelson.types import BytesType
from pymavryk.michelson.types import KeyHashType
from pymavryk.michelson.types import KeyType
from pymavryk.michelson.types import ListType
from pymavryk.michelson.types import PairType
from pymavryk.michelson.types import SaplingStateType
from pymavryk.michelson.types import SignatureType
[docs]def execute_hash(prim: str, stack: MichelsonStack, stdout: List[str], hash_digest: Callable[[bytes], bytes]):
a = cast(BytesType, stack.pop1())
a.assert_type_equal(BytesType)
res = BytesType.from_value(hash_digest(bytes(a)))
stack.push(res)
stdout.append(format_stdout(prim, [a], [res]))
[docs]class Blake2bInstruction(MichelsonInstruction, prim='BLAKE2B'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
execute_hash(cls.prim, stack, stdout, lambda x: blake2b_32(bytes(x)).digest()) # type: ignore
return cls(stack_items_added=1)
[docs]class Sha256Instruction(MichelsonInstruction, prim='SHA256'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
execute_hash(cls.prim, stack, stdout, lambda x: sha256(bytes(x)).digest()) # type: ignore
return cls(stack_items_added=1)
[docs]class Sha512Instruction(MichelsonInstruction, prim='SHA512'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
execute_hash(cls.prim, stack, stdout, lambda x: sha512(bytes(x)).digest()) # type: ignore
return cls(stack_items_added=1)
[docs]class Sha3Instruction(MichelsonInstruction, prim='SHA3'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
execute_hash(cls.prim, stack, stdout, lambda x: sha3_256(bytes(x)).digest()) # type: ignore
return cls(stack_items_added=1)
[docs]class KeccakInstruction(MichelsonInstruction, prim='KECCAK'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
execute_hash(cls.prim, stack, stdout, lambda x: Keccak256(bytes(x)).digest()) # type: ignore
return cls(stack_items_added=1)
[docs]class CheckSignatureInstruction(MichelsonInstruction, prim='CHECK_SIGNATURE'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
pk, sig, msg = cast(Tuple[KeyType, SignatureType, BytesType], stack.pop3())
pk.assert_type_equal(KeyType)
sig.assert_type_equal(SignatureType)
msg.assert_type_equal(BytesType)
key = Key.from_encoded_key(str(pk))
# TODO: verify BLS signatures
try:
key.verify(signature=str(sig), message=bytes(msg))
except ValueError:
res = BoolType(False)
else:
res = BoolType(True)
stack.push(res)
stdout.append(format_stdout(cls.prim, [pk, sig, msg], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class HashKeyInstruction(MichelsonInstruction, prim='HASH_KEY'):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
a = cast(KeyType, stack.pop1())
a.assert_type_equal(KeyType)
key = Key.from_encoded_key(str(a))
res = KeyHashType.from_value(key.public_key_hash())
stack.push(res)
stdout.append(format_stdout(cls.prim, [a], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class PairingCheckInstruction(MichelsonInstruction, prim='PAIRING_CHECK'):
[docs] @classmethod
def execute(cls, stack: 'MichelsonStack', stdout: List[str], context: AbstractContext):
points = cast(ListType, stack.pop1())
points.assert_type_equal(
ListType.create_type(
args=[
PairType.create_type(
args=[
BLS12_381_G1Type,
BLS12_381_G2Type,
]
)
]
)
)
prod = FQ12.one()
for pair in points:
g1, g2 = tuple(iter(pair)) # type: Tuple[BLS12_381_G1Type, BLS12_381_G2Type]
prod = prod * bls12_381.pairing(g2.to_point(), g1.to_point())
res = BoolType.from_value(FQ12.one() == prod)
stack.push(res)
stdout.append(format_stdout(cls.prim, [points], [res])) # type: ignore
return cls(stack_items_added=1)
[docs]class SaplingEmptyStateInstruction(MichelsonInstruction, prim='SAPLING_EMPTY_STATE', args_len=1):
[docs] @classmethod
def execute(cls, stack: MichelsonStack, stdout: List[str], context: AbstractContext):
memo_size = cls.args[0].get_int() # type: ignore
res = SaplingStateType.empty(memo_size)
res.attach_context(context)
stack.push(res)
stdout.append(format_stdout(cls.prim, [], [res], memo_size)) # type: ignore
return cls(stack_items_added=1)
[docs]class SaplingVerifyUpdateInstruction(MichelsonInstruction, prim='SAPLING_VERIFY_UPDATE'):
pass