from typing import Any
from typing import Callable
from typing import Generator
from typing import Tuple
from pymavryk.crypto.encoding import is_bh
from pymavryk.jupyter import get_attr_docstring
from pymavryk.logging import logger
from pymavryk.rpc.node import RpcError
from pymavryk.rpc.query import RpcQuery
[docs]def find_state_change_intervals(
head: int,
last: int,
get: Callable,
equals: Callable,
step=60,
) -> Generator:
succ_value = get(head)
logger.debug('%s at head %s' % succ_value, head)
for level in range(head - step, last, -step):
value = get(level)
logger.debug('%s at level %s' % value, level)
if not equals(value, succ_value):
logger.debug('%s -> %s at (%s, {level + step})' % value, succ_value, level)
yield level + step, succ_value, level, value
succ_value = value
[docs]def find_state_change(
head: int,
last: int,
get: Callable,
equals: Callable,
pred_value: Any,
) -> (int, Any): # type: ignore
def bisect(start: int, end: int):
if end == start + 1:
return end, get(end)
level = (end + start) // 2
value = get(level)
logger.debug('%s at level %s' % value, level)
if equals(value, pred_value):
return bisect(level, end)
else:
return bisect(start, level)
return bisect(last, head)
[docs]def walk_state_change_interval(
head: int,
last: int,
get: Callable,
equals: Callable,
head_value: Any,
last_value: Any,
) -> Generator:
level = last
value = last_value
while not equals(value, head_value):
level, value = find_state_change(head, level, get, equals, pred_value=value)
logger.debug('%s -> %s at %s' % last_value, value, level)
yield level, value
[docs]def find_state_changes(
head: int,
last: int,
get: Callable,
equals: Callable,
step=60,
) -> Generator:
state_change_intervals = find_state_change_intervals(head, last, get, equals, step)
for int_head, int_head_value, int_tail, int_last_value in state_change_intervals:
yield from walk_state_change_interval(
int_head,
int_tail,
get,
equals,
head_value=int_head_value,
last_value=int_last_value,
)
[docs]class BlockSliceQuery(RpcQuery):
def __init__(self, start: int, stop=None, **kwargs):
super().__init__(**kwargs)
self._start: int = start
self._stop = stop or 'head'
def __repr__(self):
res = [
super().__repr__(),
f'\nBlock range\n`{self._start}` — `{self._stop}`',
f'\n(){get_attr_docstring(BlockSliceQuery, "__call__")}',
]
return '\n'.join(res)
def __getitem__(self, item):
"""Get block by index.
:param item: Index inside given block range
:rtype: BlockQuery
"""
start, stop = self.get_range()
if item >= 0:
return self._getitem(start + item)
else:
return self._getitem(stop + item + 1)
[docs] def __call__(self) -> list: # type: ignore
"""Get block hashes (base58) for this interval."""
header = self._getitem(self._stop).header()
if is_bh(self._stop):
head = self._stop
else:
head = header['hash']
if self._start < 0:
length = abs(self._start)
else:
length = header['level'] - self._start + 1
return super().__call__(length=min(header['level'], length), head=head)
[docs] def get_range(self):
"""Get block level range."""
def get_level(x):
if isinstance(x, int):
if x < 0:
return max(0, self.head.level() + x)
elif x > 0:
return x
else:
return 1
else:
return self._getitem(x).header()['level']
return get_level(self._start), get_level(self._stop)
[docs] def find_proposal_injection(self, proposal_id):
"""Find proposal injection.
:param proposal_id: Proposal hash (base58)
"""
last, head = self.get_range()
level, _ = find_state_change(
head=head - 1, # proposals are empty at the last block
last=last,
get=lambda x: self._getitem(x).votes.proposals[proposal_id](),
equals=lambda x, y: x == y,
pred_value=0,
)
votes = self._getitem(level).operations.find_votes(proposal_id)
assert len(votes) == 1
return votes
[docs] def find_upvotes(self, proposal_id) -> Generator:
"""Find upvoting operations for the given proposal.
:param proposal_id: Proposal hash (base58)
:returns: Generator (lazy)
"""
last, head = self.get_range()
state_changes = find_state_changes(
head=head - 1, # proposals are empty at the last block
last=last,
get=lambda x: self._getitem(x).votes.proposals[proposal_id](),
equals=lambda x, y: x == y,
)
for level, _ in state_changes:
yield from self._getitem(level).operations.find_upvotes(proposal_id)
[docs] def find_ballots(self) -> Generator:
"""Find ballot operations for the current period.
:returns: Generator (lazy)
"""
last, head = self.get_range()
state_changes = find_state_changes(
head=head - 1, # ballots are empty at the last block
last=last,
get=lambda x: self._getitem(x).votes.ballots(),
equals=lambda x, y: x == y,
)
for level, _ in state_changes:
yield from self._getitem(level).operations.find_ballots()
[docs] def find_origination(self, contract_id):
"""Find contract origination.
:param contract_id: Contract ID (KT-address)
"""
def get_counter(x):
try:
return self._getitem(x).context.contracts[contract_id].counter()
except RpcError:
return None
level, _ = find_state_change(
head=self.head.level(),
last=0,
get=get_counter,
equals=lambda x, y: x == y,
pred_value=None,
)
return self._getitem(level).operations.find_origination(contract_id)
[docs] def find_operation(self, operation_group_hash) -> dict:
"""Find operation by hash.
:param operation_group_hash: base58
:raises: StopIteration if not found
"""
last, head = self.get_range()
if self._start < 0:
levels = range(head, max(1, last - 1), -1)
else:
levels = range(last, head + 1, 1)
for block_level in levels:
logger.debug(f'Looking for operation %s in block %s...' % (operation_group_hash, block_level))
try:
return self._getitem(block_level).operations[operation_group_hash]()
except StopIteration:
continue
raise StopIteration(operation_group_hash)
[docs]class PeriodQuery(RpcQuery):
__pos_key__ = ''
__val_key__ = ''
def _get_item(self, item) -> BlockSliceQuery:
lvl = self.head.metadata()['level_info']
blocks_per_period = int((lvl['level'] - lvl[self.__pos_key__] - 1) / lvl[self.__val_key__])
def get_range(x):
if x >= 0:
start_lvl = (max(1, x) - 1) * blocks_per_period + 1
else:
start_lvl = (lvl[self.__val_key__] + x + 1) * blocks_per_period + 1
stop_lvl = start_lvl + blocks_per_period - 1
if stop_lvl > lvl['level']:
stop_lvl = None
return start_lvl, stop_lvl
if isinstance(item, slice):
start, _ = get_range(item.start or 1)
_, stop = get_range(item.stop or -1)
elif isinstance(item, int):
start, stop = get_range(item)
else:
raise NotImplementedError(item)
return BlockSliceQuery(
start=start,
stop=stop,
node=self.node,
path=self._wild_path,
params=self._params,
)
[docs]class CyclesQuery(PeriodQuery):
__pos_key__ = 'cycle_position'
__val_key__ = 'cycle'
[docs] def __call__(self, **params):
"""Get current cycle."""
return self.head.cycle()
def __getitem__(self, item) -> BlockSliceQuery:
"""Get block range by cycle/cycle range.
:param item: Cycle number or range (slice), range start/stop can be empty or negative
:rtype: BlockSliceQuery
"""
return self._get_item(item)
[docs]class VotingPeriodsQuery(PeriodQuery):
__pos_key__ = 'voting_period_position'
__val_key__ = 'voting_period'
[docs] def __call__(self, **params):
"""Get current voting period."""
return self.head.voting_period()
def __getitem__(self, item) -> BlockSliceQuery:
"""Get block range by voting_period/voting_period range.
:param item: Voting_period number or range (slice), range start/stop can be empty or negative
:rtype: BlockSliceQuery
"""
return self._get_item(item)