Source code for pymavryk.crypto.encoding

from typing import Union

import base58


def tb(l):
    return b''.join(map(lambda x: x.to_bytes(1, 'big'), l))


base58_encodings = [
    #    Encoded   |               Decoded             |
    # prefix | len | prefix                      | len | Data type
    (b"B", 51, tb([1, 52]), 32, "block hash"),
    (b"o", 51, tb([5, 116]), 32, "operation hash"),
    (b"Lo", 52, tb([133, 233]), 32, "operation list hash"),
    (b"LLo", 53, tb([29, 159, 109]), 32, "operation list list hash"),
    (b"P", 51, tb([2, 170]), 32, "protocol hash"),
    (b"Co", 52, tb([79, 199]), 32, "context hash"),
    (b"mv1", 36, tb([5, 186, 196]), 20, "ed25519 public key hash"),
    (b"mv2", 36, tb([5, 186, 199]), 20, "secp256k1 public key hash"),
    (b"mv3", 36, tb([5, 186, 201]), 20, "p256 public key hash"),
    (b"mv4", 36, tb([5, 186, 204]), 20, "BLS-MinPk"),
    (b"KT1", 36, tb([2, 90, 121]), 20, "originated address"),
    (b"txr1", 37, tb([1, 128, 120, 31]), 20, "tx_rollup_l2_address"),
    (b"sr1", 36, tb([6, 124, 117]), 20, "address prefix for originated smart rollup"),
    (b"src1", 54, tb([17, 165, 134, 138]), 32, "address prefix for smart rollup commitment"),
    (b"id", 30, tb([153, 103]), 16, "cryptobox public key hash"),
    (b'expr', 54, tb([13, 44, 64, 27]), 32, u'script expression'),
    (b"edsk", 54, tb([13, 15, 58, 7]), 32, "ed25519 seed"),
    (b"edpk", 54, tb([13, 15, 37, 217]), 32, "ed25519 public key"),
    (b"spsk", 54, tb([17, 162, 224, 201]), 32, "secp256k1 secret key"),
    (b"p2sk", 54, tb([16, 81, 238, 189]), 32, "p256 secret key"),
    (b"edesk", 88, tb([7, 90, 60, 179, 41]), 56, "ed25519 encrypted seed"),
    (b"spesk", 88, tb([9, 237, 241, 174, 150]), 56, "secp256k1 encrypted secret key"),
    (b"p2esk", 88, tb([9, 48, 57, 115, 171]), 56, "p256_encrypted_secret_key"),
    (b"sppk", 55, tb([3, 254, 226, 86]), 33, "secp256k1 public key"),
    (b"p2pk", 55, tb([3, 178, 139, 127]), 33, "p256 public key"),
    (b"SSp", 53, tb([38, 248, 136]), 33, "secp256k1 scalar"),
    (b"GSp", 53, tb([5, 92, 0]), 33, "secp256k1 element"),
    (b"edsk", 98, tb([43, 246, 78, 7]), 64, "ed25519 secret key"),
    (b"edsig", 99, tb([9, 245, 205, 134, 18]), 64, "ed25519 signature"),
    (b"spsig", 99, tb([13, 115, 101, 19, 63]), 64, "secp256k1 signature"),
    (b"p2sig", 98, tb([54, 240, 44, 52]), 64, "p256 signature"),
    (b"sig", 96, tb([4, 130, 43]), 64, "generic signature"),
    (b'Net', 15, tb([87, 82, 0]), 4, "chain id"),
    (b'nce', 53, tb([69, 220, 169]), 32, 'seed nonce hash'),
    (b'bmv1', 37, tb([1, 1, 75, 4]), 20, 'blinded public key hash'),
    (b'vh', 52, tb([1, 106, 242]), 32, 'block_payload_hash'),
]

operation_tags = {
    'endorsement': 0,
    'seed_nonce_revelation': 1,
    'double_endorsement_evidence': 2,
    'double_baking_evidence': 3,
    'account_activation': 4,
    'proposals': 5,
    'ballot': 6,
    'reveal': 7,
    'transaction': 8,
    'origination': 9,
    'delegation': 10,
}


def scrub_input(v: Union[str, bytes]) -> bytes:
    if isinstance(v, bytes):
        pass
    elif isinstance(v, str):
        try:
            _ = int(v, 16)
        except ValueError:
            v = v.encode('ascii')
        else:
            if v.startswith('0x'):
                v = v[2:]
            v = bytes.fromhex(v)
    else:
        raise TypeError("a bytes-like object is required (also str), not '%s'" % type(v).__name__)
    return v


[docs]def base58_decode(v: bytes) -> bytes: """Decode data using Base58 with checksum + validate binary prefix against known kinds and cut in the end. :param v: Array of bytes (use string.encode()) :returns: bytes """ try: prefix_len = next( len(encoding[2]) for encoding in base58_encodings if len(v) == encoding[1] and v.startswith(encoding[0]) ) except StopIteration as e: raise ValueError('Invalid encoding, prefix or length mismatch.') from e return base58.b58decode_check(v)[prefix_len:]
[docs]def base58_encode(v: bytes, prefix: bytes) -> bytes: """Encode data using Base58 with checksum and add an according binary prefix in the end. :param v: Array of bytes :param prefix: Human-readable prefix (use b'') e.g. b'tz', b'KT', etc :returns: bytes (use string.decode()) """ try: encoding = next(encoding for encoding in base58_encodings if len(v) == encoding[3] and prefix == encoding[0]) except StopIteration as e: raise ValueError('Invalid encoding, prefix or length mismatch.') from e return base58.b58encode_check(encoding[2] + v)
def _validate(v: Union[str, bytes], prefixes: list): if isinstance(v, str): v = v.encode() v = scrub_input(v) if any(map(v.startswith, prefixes)): base58_decode(v) else: raise ValueError('Unknown prefix.')
[docs]def validate_pkh(v: Union[str, bytes]): """Ensure parameter is a public key hash (starts with b'mv1', b'mv2', b'mv3') :param v: string or bytes :raises ValueError: if parameter is not a public key hash """ return _validate(v, prefixes=[b'mv1', b'mv2', b'mv3', b'mv4'])
[docs]def validate_l2_pkh(v: Union[str, bytes]): """Ensure parameter is a L2 public key hash (starts with b'txr1') :param v: string or bytes :raises ValueError: if parameter is not a public key hash """ return _validate(v, prefixes=[b'txr1'])
[docs]def validate_sig(v: Union[str, bytes]): """Ensure parameter is a signature (starts with b'edsig', b'spsig', b'p2sig', b'sig') :param v: string or bytes :raises ValueError: if parameter is not a signature """ return _validate(v, prefixes=[b'edsig', b'spsig', b'p2sig', b'sig'])
[docs]def is_pkh(v: Union[str, bytes]) -> bool: """Check if value is a public key hash.""" try: validate_pkh(v) except (ValueError, TypeError): return False return True
[docs]def is_l2_pkh(v: Union[str, bytes]) -> bool: """Check if value is an L2 public key hash.""" try: validate_l2_pkh(v) except (ValueError, TypeError): return False return True
[docs]def is_sig(v: Union[str, bytes]) -> bool: """Check if value is a signature.""" try: validate_sig(v) except (ValueError, TypeError): return False return True
[docs]def is_bh(v: Union[str, bytes]) -> bool: """Check if value is a block hash.""" try: _validate(v, prefixes=[b'B']) except (ValueError, TypeError): return False return True
[docs]def is_ogh(v) -> bool: """Check if value is an operation group hash.""" try: _validate(v, prefixes=[b'o']) except (ValueError, TypeError): return False return True
[docs]def is_kt(v: Union[str, bytes]) -> bool: """Check if value is a KT address.""" try: _validate(v, prefixes=[b'KT1']) except (ValueError, TypeError): return False return True
[docs]def is_sr(v: Union[str, bytes]) -> bool: """Check if value is a smart rollup address.""" try: _validate(v, prefixes=[b'sr1']) except (ValueError, TypeError): return False return True
[docs]def is_public_key(v: Union[str, bytes]) -> bool: """Check if value is a public key.""" try: _validate(v, prefixes=[b"edsk", b"edpk", b"spsk", b"p2sk", b"sppk", b"p2pk"]) except (ValueError, TypeError): return False return True
[docs]def is_chain_id(v: Union[str, bytes]) -> bool: """Check if value is a chain id.""" try: _validate(v, prefixes=[b'Net']) except (ValueError, TypeError): return False return True
[docs]def is_address(v: Union[str, bytes]) -> bool: """Check if value is a tz/KT address""" if isinstance(v, bytes): v = v.decode() address = v.split('%')[0] return is_kt(address) or is_pkh(address) or is_sr(address)
[docs]def is_txr_address(v: Union[str, bytes]) -> bool: """Check if value is a txr1 address""" if isinstance(v, bytes): v = v.decode() address = v.split('%')[0] return is_l2_pkh(address)