import json
from pprint import pformat
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
import requests
import requests.exceptions
from simplejson import JSONDecodeError
from pymavryk.logging import logger
def _urljoin(*args: str) -> str:
return "/".join(map(lambda x: str(x).strip('/'), args))
def _gen_error_variants(error_id: str) -> List[str]:
chunks = error_id.split('.')
variants = [error_id]
if len(chunks) > 1:
variants.append(chunks[-2])
if len(chunks) > 2:
variants.append('.'.join(chunks[2:]))
return variants
[docs]class RpcError(Exception):
__handlers__ = {} # type: ignore
@classmethod
def __init_subclass__(cls, error_id: Union[str, List[str]]) -> None:
super().__init_subclass__()
if isinstance(error_id, list):
for eid in error_id:
cls.__handlers__[eid] = cls
else:
cls.__handlers__[error_id] = cls
[docs] @classmethod
def from_errors(cls, errors: List[Dict[str, Any]]) -> 'RpcError':
"""Create RpcError from "errors" section of response JSON."""
if not errors:
return RpcError('Unspecified error')
# FIXME: Only first error is being processed
error = errors[-1]
for key in _gen_error_variants(error['id']):
if key in cls.__handlers__:
handler = cls.__handlers__[key]
return handler(error)
return RpcError(error)
[docs] @classmethod
def from_response(cls, res: requests.Response) -> 'RpcError':
"""Create RpcError from requests Response."""
if res.headers.get('content-type') == 'application/json':
try:
errors = res.json()
except JSONDecodeError:
# sometimes rpc returns invalid json
return RpcError(res.text)
assert isinstance(errors, list)
return cls.from_errors(errors)
else:
return RpcError(res.text)
def __str__(self) -> str:
return pformat(self.args)
[docs]class RpcNode:
"""Request proxy for a single Mavryk node."""
def __init__(self, uri: Union[str, List[str]], headers: Optional[Dict[str, str]] = None) -> None:
if not uri:
raise RuntimeError()
if not isinstance(uri, list):
uri = [uri]
self.uri = uri
self.headers = headers or {}
def __repr__(self) -> str:
res = [
super().__repr__(),
'\nNode address',
self.uri[0],
]
return '\n'.join(res)
[docs] def request(self, method: str, path: str, **kwargs) -> requests.Response:
"""Perform HTTP request to node.
:param method: one of GET/POST/PUT/DELETE
:param path: path to endpoint
:param kwargs: requests.request arguments
:raises RpcError: node has returned an error
:returns: node response
"""
logger.debug('>>>>> %s %s\n%s', method, path, json.dumps(kwargs, indent=4))
res = requests.request(
method=method,
url=_urljoin(self.uri[0], path),
headers={'content-type': 'application/json', 'user-agent': 'PyMavryk', **self.headers},
**kwargs,
)
if res.status_code == 401:
logger.debug('<<<<< %s\n%s', res.status_code, res.text)
raise RpcError(f'Unauthorized: {path}')
if res.status_code == 404:
logger.debug('<<<<< %s\n%s', res.status_code, res.text)
raise RpcError(f'Not found: {path}')
if res.status_code != 200:
logger.debug('<<<<< %s\n%s', res.status_code, pformat(res.text, indent=4))
raise RpcError.from_response(res)
logger.debug('<<<<< %s\n%s', res.status_code, json.dumps(res.json(), indent=4))
return res
[docs] def get(
self,
path: str,
params: Optional[Dict[str, Any]] = None,
timeout: Optional[int] = None,
) -> requests.Response:
return self.request('GET', path, params=params, timeout=timeout).json()
[docs] def post(self, path: str, params: Optional[Dict[str, Any]] = None, json=None) -> Union[requests.Response, str]:
response = self.request('POST', path, params=params, json=json)
try:
return response.json()
except JSONDecodeError:
return response.text
[docs] def delete(self, path: str, params: Optional[Dict[str, Any]] = None) -> requests.Response:
return self.request('DELETE', path, params=params).json()
[docs] def put(self, path: str, params: Optional[Dict[str, Any]] = None) -> requests.Response:
return self.request('PUT', path, params=params).json()
[docs]class RpcMultiNode(RpcNode):
"""Request proxy for multiple nodes chosen for each request in round-robin order."""
def __init__(self, uri: Union[str, List[str]]) -> None:
super().__init__(uri)
self.nodes = list(map(RpcNode, self.uri))
self._next_i = 0
def __repr__(self) -> str:
res = [
super().__repr__(),
'\nNode addresses',
*self.uri,
]
return '\n'.join(res)
[docs] def request(self, method: str, path: str, **kwargs) -> requests.Response:
assert self._next_i < len(self.nodes)
res = self.nodes[self._next_i].request(method, path, **kwargs)
self._next_i = (self._next_i + 1) % len(self.nodes)
return res