Source code for pymavryk.protocol.protocol

import io
import os
import tarfile
from binascii import hexlify
from collections import OrderedDict
from tempfile import TemporaryDirectory
from typing import List
from typing import Tuple

import netstruct  # type: ignore
import requests
import simplejson as json
from tqdm import tqdm  # type: ignore

from pymavryk.crypto.encoding import base58_encode
from pymavryk.crypto.key import blake2b_32
from pymavryk.jupyter import InlineDocstring
from pymavryk.jupyter import get_class_docstring
from pymavryk.protocol.diff import apply_patch
from pymavryk.protocol.diff import generate_unidiff_html
from pymavryk.protocol.diff import make_patch


[docs]def dir_to_files(path) -> List[Tuple[str, str]]: files = [] with open(os.path.join(path, 'TEZOS_PROTOCOL')) as f: index = json.load(f) for module in index['modules']: for ext in ['mli', 'ml']: name = f'{module.lower()}.{ext}' filename = os.path.join(path, name) if not os.path.exists(filename): continue with open(filename, 'r') as file: text = file.read() files.append((name, text)) return files
[docs]def tar_to_files(path=None, raw=None) -> List[Tuple[str, str]]: assert path or raw fileobj = io.BytesIO(raw) if raw else None with tarfile.open(name=path, fileobj=fileobj) as tar, TemporaryDirectory() as tmp_dir: tar.extractall(tmp_dir) files = dir_to_files(tmp_dir) return files
[docs]def url_to_files(url) -> List[Tuple[str, str]]: res = requests.get(url, stream=True) raw = b'' for data in tqdm(res.iter_content()): raw += data return tar_to_files(raw=raw)
[docs]def files_to_proto(files: List[Tuple[str, str]]) -> dict: components = OrderedDict() # type: ignore for filename, text in files: name, ext = filename.split('.') key = {'mli': 'interface', 'ml': 'implementation'}[ext] name = name.capitalize() data = hexlify(text.encode()).decode() if name in components: components[name][key] = data else: components[name] = {'name': name, key: data} proto = { 'expected_env_version': 0, # TODO: this is V1 'components': list(components.values()), } return proto
[docs]def files_to_tar(files: List[Tuple[str, str]], output_path=None): fileobj = io.BytesIO() if output_path is None else None nameparts = os.path.basename(output_path).split('.') mode = 'w' if len(nameparts) == 3: mode = f'w:{nameparts[-1]}' with tarfile.open(name=output_path, fileobj=fileobj, mode=mode) as tar: for filename, text in files: file = io.BytesIO(text.encode()) ti = tarfile.TarInfo(filename) ti.size = len(file.getvalue()) tar.addfile(ti, file) if fileobj: return fileobj.getvalue() return None
[docs]def proto_to_files(proto: dict) -> List[Tuple[str, str]]: files = [] extensions = {'interface': 'mli', 'implementation': 'ml'} for component in proto.get('components', []): for key, ext in extensions.items(): if key in component: filename = f'{component["name"].lower()}.{ext}' text = bytes.fromhex(component[key]).decode() files.append((filename, text)) return files
[docs]def proto_to_bytes(proto: dict) -> bytes: res = b'' for component in proto.get('components', []): res += netstruct.pack(b'I$', component['name'].encode()) if component.get('interface'): res += b'\xff' + netstruct.pack(b'I$', bytes.fromhex(component['interface'])) else: res += b'\x00' # we should also handle patch case res += netstruct.pack(b'I$', bytes.fromhex(component.get('implementation', ''))) res = netstruct.pack(b'hI$', proto['expected_env_version'], res) return res
[docs]class Protocol(metaclass=InlineDocstring): def __init__(self, proto): self._proto = proto def __repr__(self): res = [ super().__repr__(), '\nHelpers', get_class_docstring(self.__class__), ] return '\n'.join(res) def __iter__(self): return iter(proto_to_files(self._proto))
[docs] @classmethod def from_uri(cls, uri): """Loads protocol implementation from various sources and converts it to the RPC-like format. :param uri: link/path to a tar archive or path to a folder with extracted contents :returns: Protocol instance """ if uri.startswith('http'): files = url_to_files(uri) elif os.path.exists(os.path.expanduser(uri)): files = tar_to_files(uri) elif os.path.isdir(uri): files = dir_to_files(uri) else: raise ValueError(uri) return Protocol(files_to_proto(files))
[docs] def index(self) -> dict: """Generates TEZOS_PROTOCOL file. :returns: dict with protocol hash and modules """ data = { 'hash': self.hash(), 'modules': list(map(lambda x: x['name'], self._proto.get('components', []))), } return data
[docs] def export_tar(self, output_path=None): """Creates a tarball and dumps to a file or returns bytes. :param output_path: Path to the tarball [optional]. You can add .bz2 or .gz extension to make it compressed :returns: bytes if path is None or nothing """ files = proto_to_files(self._proto) files.append(('TEZOS_PROTOCOL', json.dumps(self.index()))) return files_to_tar(files, output_path)
[docs] def export_html(self, output_path=None): """Generates github-like side-by-side diff viewe, powered by diff2html.js :param output_path: will write to this file if specified :returns: html string if path is not specified """ diffs = [text for filename, text in self if text] return generate_unidiff_html(diffs, output_path=output_path)
[docs] def diff(self, proto, context_size=3): """Calculates file diff between two protocol versions. :param proto: an instance of Protocol :param context_size: number of context lines before and after the change :returns: patch in proto format """ files = [] yours = dict(iter(self)) theirs = proto_to_files(proto()) for filename, their_text in theirs: patch = make_patch( a=yours.get(filename, ''), b=their_text, filename=filename, context_size=context_size, ) files.append((filename, patch)) return Protocol(files_to_proto(files))
[docs] def patch(self, patch): """Applies unified diff and returns full-fledged protocol. :param patch: an instance of Protocol containing diff of files :returns: Protocol instance """ files = [] yours = dict(iter(self)) diff = proto_to_files(patch()) for filename, diff_text in diff: text = yours.get(filename, '') if diff_text: text = apply_patch(text, diff_text) files.append((filename, text)) return Protocol(files_to_proto(files))
[docs] def hash(self): hash_digest = blake2b_32(proto_to_bytes(self._proto)).digest() return base58_encode(hash_digest, b'P').decode()