Source code for pymavryk.crypto.encoding

from typing import Union

import base58

tb = bytes

base58_encodings = [
    #    Encoded   |               Decoded             |
    # prefix | len | prefix                      | len | Data type
    (b'B', 51, tb([1, 52]), 32, 'block hash'),
    (b'o', 51, tb([5, 116]), 32, 'operation hash'),
    (b'Lo', 52, tb([133, 233]), 32, 'operation list hash'),
    (b'LLo', 53, tb([29, 159, 109]), 32, 'operation list list hash'),
    (b'P', 51, tb([2, 170]), 32, 'protocol hash'),
    (b'Co', 52, tb([79, 199]), 32, 'context hash'),
    (b'mv1', 36, tb([5, 186, 196]), 20, 'ed25519 public key hash'),
    (b'mv2', 36, tb([5, 186, 199]), 20, 'secp256k1 public key hash'),
    (b'mv3', 36, tb([5, 186, 201]), 20, 'p256 public key hash'),
    (b'mv4', 36, tb([5, 186, 204]), 20, 'BLS12-381 (MinPk) public key hash'),
    (b'KT1', 36, tb([2, 90, 121]), 20, 'originated smart contract address'),
    (b'txr1', 37, tb([1, 128, 120, 31]), 20, 'tx_rollup_l2_address'),
    (b'sr1', 36, tb([6, 124, 117]), 20, 'originated smart rollup address'),
    (b'src1', 54, tb([17, 165, 134, 138]), 32, 'smart rollup commitment hash'),
    (b'srs1', 54, tb([17, 165, 235, 240]), 32, 'smart rollup state hash'),
    (b'srib1', 55, tb([3, 255, 138, 145, 110]), 32, 'smart rollup inbox'),
    (b'srib2', 55, tb([3, 255, 138, 145, 140]), 32, 'smart rollup merkelized payload'),
    (b'id', 30, tb([153, 103]), 16, 'cryptobox public key hash'),
    (b'expr', 54, tb([13, 44, 64, 27]), 32, 'script expression'),
    (b'edsk', 54, tb([13, 15, 58, 7]), 32, 'ed25519 seed'),
    (b'edpk', 54, tb([13, 15, 37, 217]), 32, 'ed25519 public key'),
    (b'spsk', 54, tb([17, 162, 224, 201]), 32, 'secp256k1 secret key'),
    (b'p2sk', 54, tb([16, 81, 238, 189]), 32, 'p256 secret key'),
    (b'edesk', 88, tb([7, 90, 60, 179, 41]), 56, 'ed25519 encrypted seed'),
    (b'spesk', 88, tb([9, 237, 241, 174, 150]), 56, 'secp256k1 encrypted secret key'),
    (b'p2esk', 88, tb([9, 48, 57, 115, 171]), 56, 'p256_encrypted_secret_key'),
    (b'sppk', 55, tb([3, 254, 226, 86]), 33, 'secp256k1 public key'),
    (b'p2pk', 55, tb([3, 178, 139, 127]), 33, 'p256 public key'),
    (b'SSp', 53, tb([38, 248, 136]), 33, 'secp256k1 scalar'),
    (b'GSp', 53, tb([5, 92, 0]), 33, 'secp256k1 element'),
    (b'edsk', 98, tb([43, 246, 78, 7]), 64, 'ed25519 secret key'),
    (b'edsig', 99, tb([9, 245, 205, 134, 18]), 64, 'ed25519 signature'),
    (b'spsig', 99, tb([13, 115, 101, 19, 63]), 64, 'secp256k1 signature'),
    (b'p2sig', 98, tb([54, 240, 44, 52]), 64, 'p256 signature'),
    (b'sig', 96, tb([4, 130, 43]), 64, 'generic signature'),
    (b'Net', 15, tb([87, 82, 0]), 4, 'chain id'),
    (b'nce', 53, tb([69, 220, 169]), 32, 'seed nonce hash'),
    (b'bmv1', 37, tb([1, 1, 75, 4]), 20, 'blinded public key hash'),
    (b'vh', 52, tb([1, 106, 242]), 32, 'block_payload_hash'),
    (b'BLsig', 142, tb([40, 171, 64, 207]), 96, 'bls12_381_min_pk signature'),
    (b'BLpk', 76, tb([6, 149, 135, 204]), 48, 'bls12_381_min_pk public_key'),
    (b'BLsk', 54, tb([3, 150, 192, 40]), 32, 'bls12_381 secret_key'),
    (b'BLesk', 88, tb([2, 5, 30, 53, 25]), 56, 'bls12_381 encrypted_secret_key'),
]

operation_tags = {
    'endorsement': 0,
    'seed_nonce_revelation': 1,
    'double_endorsement_evidence': 2,
    'double_baking_evidence': 3,
    'account_activation': 4,
    'proposals': 5,
    'ballot': 6,
    'reveal': 7,
    'transaction': 8,
    'origination': 9,
    'delegation': 10,
}


def scrub_input(v: Union[str, bytes]) -> bytes:
    if isinstance(v, bytes):
        return v
    elif isinstance(v, str):
        try:
            return bytes.fromhex(v.removeprefix('0x'))
        except ValueError:
            return v.encode('ascii')
    else:
        raise TypeError('A bytes-like object is required (also str), not `%s`' % type(v).__name__)


[docs] def base58_decode(v: bytes) -> bytes: """Decode data using Base58 with checksum + validate binary prefix against known kinds and cut in the end. :param v: Array of bytes (use string.encode()) :returns: bytes """ try: prefix_len = next( len(encoding[2]) for encoding in base58_encodings if len(v) == encoding[1] and v.startswith(encoding[0]) ) except StopIteration as e: raise ValueError('Invalid encoding, prefix or length mismatch.') from e return base58.b58decode_check(v)[prefix_len:]
[docs] def base58_encode(v: bytes, prefix: bytes) -> bytes: """Encode data using Base58 with checksum and add an according binary prefix in the end. :param v: Array of bytes :param prefix: Human-readable prefix (use b'') e.g. b'tz', b'KT', etc :returns: bytes (use string.decode()) """ try: encoding = next(encoding for encoding in base58_encodings if len(v) == encoding[3] and prefix == encoding[0]) except StopIteration as e: raise ValueError('Invalid encoding, prefix or length mismatch.') from e return base58.b58encode_check(encoding[2] + v)
def _validate(v: Union[str, bytes], prefixes: list): if isinstance(v, str): v = v.encode() v = scrub_input(v) if any(map(v.startswith, prefixes)): base58_decode(v) else: raise ValueError('Unknown prefix.')
[docs] def validate_pkh(v: Union[str, bytes]): """Ensure parameter is a public key hash (starts with b'mv1', b'mv2', b'mv3', b'mv4') :param v: string or bytes :raises ValueError: if parameter is not a public key hash """ return _validate(v, prefixes=[b'mv1', b'mv2', b'mv3', b'mv4'])
[docs] def validate_l2_pkh(v: Union[str, bytes]): """Ensure parameter is a L2 public key hash (starts with b'txr1') :param v: string or bytes :raises ValueError: if parameter is not a public key hash """ return _validate(v, prefixes=[b'txr1'])
[docs] def validate_sig(v: Union[str, bytes]): """Ensure parameter is a signature (starts with b'edsig', b'spsig', b'p2sig', b'BLsig', b'sig') :param v: string or bytes :raises ValueError: if parameter is not a signature """ return _validate(v, prefixes=[b'edsig', b'spsig', b'p2sig', b'BLsig', b'sig'])
[docs] def is_pkh(v: Union[str, bytes]) -> bool: """Check if value is a public key hash.""" try: validate_pkh(v) except (ValueError, TypeError): return False return True
[docs] def is_l2_pkh(v: Union[str, bytes]) -> bool: """Check if value is an L2 public key hash.""" try: validate_l2_pkh(v) except (ValueError, TypeError): return False return True
[docs] def is_sig(v: Union[str, bytes]) -> bool: """Check if value is a signature.""" try: validate_sig(v) except (ValueError, TypeError): return False return True
[docs] def is_bh(v: Union[str, bytes]) -> bool: """Check if value is a block hash.""" try: _validate(v, prefixes=[b'B']) except (ValueError, TypeError): return False return True
[docs] def is_ogh(v) -> bool: """Check if value is an operation group hash.""" try: _validate(v, prefixes=[b'o']) except (ValueError, TypeError): return False return True
[docs] def is_kt(v: Union[str, bytes]) -> bool: """Check if value is a KT address.""" try: _validate(v, prefixes=[b'KT1']) except (ValueError, TypeError): return False return True
[docs] def is_sr(v: Union[str, bytes]) -> bool: """Check if value is a smart rollup address.""" try: _validate(v, prefixes=[b'sr1']) except (ValueError, TypeError): return False return True
[docs] def is_public_key(v: Union[str, bytes]) -> bool: """Check if value is a public key.""" try: _validate( v, prefixes=[ # ed25519 b'edsk', b'edpk', # secp256k1 b'spsk', b'sppk', # p256 b'p2sk', b'p2pk', # bls12_381 b'BLsk', b'BLpk', ], ) except (ValueError, TypeError): return False return True
[docs] def is_chain_id(v: Union[str, bytes]) -> bool: """Check if value is a chain id.""" try: _validate(v, prefixes=[b'Net']) except (ValueError, TypeError): return False return True
[docs] def is_address(v: Union[str, bytes]) -> bool: """Check if value is a tz/KT address""" if isinstance(v, bytes): v = v.decode() address = v.split('%')[0] return is_kt(address) or is_pkh(address) or is_sr(address)
[docs] def is_txr_address(v: Union[str, bytes]) -> bool: """Check if value is a txr1 address""" if isinstance(v, bytes): v = v.decode() address = v.split('%')[0] return is_l2_pkh(address)