Source code for pymavryk.rpc.node

import json
from pprint import pformat
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

import requests
import requests.exceptions
from simplejson import JSONDecodeError

from pymavryk.logging import logger


def _urljoin(*args: str) -> str:
    return "/".join(map(lambda x: str(x).strip('/'), args))


def _gen_error_variants(error_id: str) -> List[str]:
    chunks = error_id.split('.')
    variants = [error_id]
    if len(chunks) > 1:
        variants.append(chunks[-2])
        if len(chunks) > 2:
            variants.append('.'.join(chunks[2:]))
    return variants


[docs]class RpcError(Exception): __handlers__ = {} # type: ignore @classmethod def __init_subclass__(cls, error_id: Union[str, List[str]]) -> None: super().__init_subclass__() if isinstance(error_id, list): for eid in error_id: cls.__handlers__[eid] = cls else: cls.__handlers__[error_id] = cls
[docs] @classmethod def from_errors(cls, errors: List[Dict[str, Any]]) -> 'RpcError': """Create RpcError from "errors" section of response JSON.""" if not errors: return RpcError('Unspecified error') # FIXME: Only first error is being processed error = errors[-1] for key in _gen_error_variants(error['id']): if key in cls.__handlers__: handler = cls.__handlers__[key] return handler(error) return RpcError(error)
[docs] @classmethod def from_response(cls, res: requests.Response) -> 'RpcError': """Create RpcError from requests Response.""" if res.headers.get('content-type') == 'application/json': try: errors = res.json() except JSONDecodeError: # sometimes rpc returns invalid json return RpcError(res.text) assert isinstance(errors, list) return cls.from_errors(errors) else: return RpcError(res.text)
def __str__(self) -> str: return pformat(self.args)
[docs]class RpcNode: """Request proxy for a single Mavryk node.""" def __init__(self, uri: Union[str, List[str]], headers: Optional[Dict[str, str]] = None) -> None: if not uri: raise RuntimeError() if not isinstance(uri, list): uri = [uri] self.uri = uri self.headers = headers or {} def __repr__(self) -> str: res = [ super().__repr__(), '\nNode address', self.uri[0], ] return '\n'.join(res)
[docs] def request(self, method: str, path: str, **kwargs) -> requests.Response: """Perform HTTP request to node. :param method: one of GET/POST/PUT/DELETE :param path: path to endpoint :param kwargs: requests.request arguments :raises RpcError: node has returned an error :returns: node response """ logger.debug('>>>>> %s %s\n%s', method, path, json.dumps(kwargs, indent=4)) res = requests.request( method=method, url=_urljoin(self.uri[0], path), headers={'content-type': 'application/json', 'user-agent': 'PyMavryk', **self.headers}, **kwargs, ) if res.status_code == 401: logger.debug('<<<<< %s\n%s', res.status_code, res.text) raise RpcError(f'Unauthorized: {path}') if res.status_code == 404: logger.debug('<<<<< %s\n%s', res.status_code, res.text) raise RpcError(f'Not found: {path}') if res.status_code != 200: logger.debug('<<<<< %s\n%s', res.status_code, pformat(res.text, indent=4)) raise RpcError.from_response(res) logger.debug('<<<<< %s\n%s', res.status_code, json.dumps(res.json(), indent=4)) return res
[docs] def get( self, path: str, params: Optional[Dict[str, Any]] = None, timeout: Optional[int] = None, ) -> requests.Response: return self.request('GET', path, params=params, timeout=timeout).json()
[docs] def post(self, path: str, params: Optional[Dict[str, Any]] = None, json=None) -> Union[requests.Response, str]: response = self.request('POST', path, params=params, json=json) try: return response.json() except JSONDecodeError: return response.text
[docs] def delete(self, path: str, params: Optional[Dict[str, Any]] = None) -> requests.Response: return self.request('DELETE', path, params=params).json()
[docs] def put(self, path: str, params: Optional[Dict[str, Any]] = None) -> requests.Response: return self.request('PUT', path, params=params).json()
[docs]class RpcMultiNode(RpcNode): """Request proxy for multiple nodes chosen for each request in round-robin order.""" def __init__(self, uri: Union[str, List[str]]) -> None: super().__init__(uri) self.nodes = list(map(RpcNode, self.uri)) self._next_i = 0 def __repr__(self) -> str: res = [ super().__repr__(), '\nNode addresses', *self.uri, ] return '\n'.join(res)
[docs] def request(self, method: str, path: str, **kwargs) -> requests.Response: assert self._next_i < len(self.nodes) res = self.nodes[self._next_i].request(method, path, **kwargs) self._next_i = (self._next_i + 1) % len(self.nodes) return res