Source code for pymavryk.sandbox.node

import atexit
import logging
import unittest
from concurrent.futures import FIRST_EXCEPTION
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait
from contextlib import suppress
from pprint import pprint
from threading import Event
from time import sleep
from typing import List
from typing import Optional

import requests.exceptions
from testcontainers.core.container import Container  # type: ignore
from testcontainers.core.docker_client import DockerClient  # type: ignore
from testcontainers.core.generic import DockerContainer  # type: ignore

from pymavryk.client import PyMavrykClient
from pymavryk.operation.group import OperationGroup
from pymavryk.sandbox.parameters import LATEST
from pymavryk.sandbox.parameters import sandbox_addresses

DOCKER_IMAGE = 'bakingbad/sandboxed-node:v17.0-1'
MAX_ATTEMPTS = 100
ATTEMPT_DELAY = 0.1
TEZOS_NODE_PORT = 8732


def kill_existing_containers():
    docker = DockerClient()
    running_containers: List[Container] = docker.client.containers.list(
        filters={
            'status': 'running',
            'ancestor': DOCKER_IMAGE,
        }
    )
    for container in running_containers:
        with suppress(Exception):
            container.stop(timeout=1)


atexit.register(kill_existing_containers)


def worker_callback(f):
    e = f.exception()

    if e is None:
        return

    trace = []
    tb = e.__traceback__
    while tb is not None:
        trace.append(
            {
                "filename": tb.tb_frame.f_code.co_filename,
                "name": tb.tb_frame.f_code.co_name,
                "lineno": tb.tb_lineno,
            }
        )
        tb = tb.tb_next
    pprint(
        {
            'type': type(e).__name__,
            'message': str(e),
            'trace': trace,
        }
    )


def get_next_baker_key(client: PyMavrykClient) -> str:
    baking_rights = client.shell.head.helpers.baking_rights()
    delegate = next(br['delegate'] for br in baking_rights if br['round'] == 0)
    return next(k for k, v in sandbox_addresses.items() if v == delegate)


class SandboxedNodeContainer(DockerContainer):
    def __init__(self, image=DOCKER_IMAGE, port=TEZOS_NODE_PORT):
        super().__init__(image)
        self.with_bind_ports(TEZOS_NODE_PORT, port)
        self.url = f'http://localhost:{port}'
        self.client = PyMavrykClient().using(shell=self.url)

    def start(self):
        super().start()
        if self.get_wrapped_container() is None:
            raise RuntimeError('Failed to create a container')

    def wait_for_connection(self, max_attempts=MAX_ATTEMPTS, attempt_delay=ATTEMPT_DELAY) -> bool:
        attempts = max_attempts
        while attempts > 0:
            try:
                self.client.shell.node.get("/version/")
                return True
            except requests.exceptions.ConnectionError:
                sleep(attempt_delay)
                attempts -= 1
        return False

    def activate(self, protocol=LATEST):
        return self.client.using(key='dictator').activate_protocol(protocol).fill().sign().inject()

    def bake(self, key: str, min_fee: int = 0):
        return self.client.using(key=key).bake_block(min_fee).fill().work().sign().inject()

    def get_client(self, key: str):
        return self.client.using(key=key)


[docs]class SandboxedNodeTestCase(unittest.TestCase): """Perform tests with sanboxed node in Docker container.""" IMAGE: str = DOCKER_IMAGE "Docker image to use" PORT: int = TEZOS_NODE_PORT "Port to expose to host machine" PROTOCOL: str = LATEST "Hash of protocol to activate" node_container: Optional['SandboxedNodeContainer'] = None executor: Optional[ThreadPoolExecutor] = None
[docs] @classmethod def setUpClass(cls) -> None: """Spin up sandboxed node container and activate protocol.""" kill_existing_containers() cls.node_container = SandboxedNodeContainer(image=cls.IMAGE, port=cls.PORT) cls.node_container.start() if not cls.node_container.wait_for_connection(): logging.error('failed to connect to %s', cls.node_container.url) return cls.node_container.activate(cls.PROTOCOL)
[docs] @classmethod def tearDownClass(cls) -> None: cls._get_node_container().stop(force=True, delete_volume=True)
@classmethod def _get_node_container(cls) -> SandboxedNodeContainer: if cls.node_container is None: raise RuntimeError('Sandboxed node container is not running') return cls.node_container
[docs] @classmethod def activate(cls, protocol_alias: str) -> OperationGroup: """Activate protocol.""" return cls._get_node_container().activate(protocol=protocol_alias)
@classmethod def get_client(cls, key='bootstrap2') -> PyMavrykClient: return cls._get_node_container().get_client(key)
[docs] @classmethod def bake_block(cls, min_fee: int = 0) -> OperationGroup: """Bake new block. :param min_fee: minimum fee of operation to be included in block """ key = get_next_baker_key(cls.get_client()) return cls._get_node_container().bake(key=key, min_fee=min_fee)
@property def client(self) -> PyMavrykClient: """PyMavryk client to interact with sandboxed node.""" return self._get_node_container().get_client(key='bootstrap1')
class SandboxedNodeAutoBakeTestCase(SandboxedNodeTestCase): exit_event: Optional[Event] = None baker: Optional[Future] = None min_fee = 0 TIME_BETWEEN_BLOCKS = 3 "Time delay between bake attempts, in seconds" @staticmethod def autobake(time_between_blocks: int, node_url: str, exit_event: Event, min_fee=0): logging.info("Baker thread started") client = PyMavrykClient().using(shell=node_url) ptr = 0 while not exit_event.is_set(): if ptr % time_between_blocks == 0: key = get_next_baker_key(client) client.using(key=key).bake_block(min_fee=min_fee).fill().work().sign().inject() sleep(1) ptr += 1 logging.info("Baker thread stopped") @classmethod def setUpClass(cls) -> None: super().setUpClass() if cls.executor is None: cls.executor = ThreadPoolExecutor(1) if cls.node_container is None: raise RuntimeError('sandboxed node container is not created') cls.exit_event = Event() cls.baker = cls.executor.submit( cls.autobake, cls.TIME_BETWEEN_BLOCKS, cls.node_container.url, cls.exit_event, cls.min_fee, ) cls.baker.add_done_callback(worker_callback) @classmethod def tearDownClass(cls) -> None: assert cls.exit_event assert cls.baker cls.exit_event.set() wait([cls.baker], return_when=FIRST_EXCEPTION)