Skip to content

Commit 1ca5ab0

Browse files
Defer spawning index datasources, add get_quotes method (#133)
* Defer spawning index datasources * get_quotes * Limit * Logs, runtimeerror fixes * Docs * Set index sync levels early * Fix head check in `process` * Lint
1 parent f60fdc8 commit 1ca5ab0

File tree

3 files changed

+103
-41
lines changed

3 files changed

+103
-41
lines changed

src/dipdup/datasources/tzkt/datasource.py

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ async def get_big_maps(
537537
big_maps.append(self.convert_big_map(bm))
538538
return big_maps
539539

540-
async def get_quotes(self, level: int) -> QuoteData:
540+
async def get_quote(self, level: int) -> QuoteData:
541541
"""Get quote for block"""
542542
self._logger.info('Fetching quotes for level %s', level)
543543
quote_json = await self._http.request(
@@ -548,6 +548,21 @@ async def get_quotes(self, level: int) -> QuoteData:
548548
)
549549
return self.convert_quote(quote_json[0])
550550

551+
async def get_quotes(self, from_level: int, to_level: int) -> List[QuoteData]:
552+
"""Get quotes for blocks"""
553+
self._logger.info('Fetching quotes for levels %s-%s', from_level, to_level)
554+
quotes_json = await self._http.request(
555+
'get',
556+
url='v1/quotes',
557+
params={
558+
"level.ge": from_level,
559+
"level.lt": to_level,
560+
"limit": self.request_limit,
561+
},
562+
cache=False,
563+
)
564+
return [self.convert_quote(quote) for quote in quotes_json]
565+
551566
async def add_index(self, index_config: ResolvedIndexConfigT) -> None:
552567
"""Register index config in internal mappings and matchers. Find and register subscriptions."""
553568

@@ -680,9 +695,9 @@ async def _extract_message_data(self, channel: str, message: List[Any]) -> Async
680695

681696
head_level = item['state']
682697
if self._level and head_level < self._level:
683-
raise RuntimeError('Received data message from level lower than current: {head_level} < {self._level}')
698+
raise RuntimeError(f'Received data message from level lower than current: {head_level} < {self._level}')
684699

685-
# NOTE: State messages will be replaced with negotiation some day
700+
# NOTE: State messages will be replaced with WS negotiation some day
686701
if message_type == TzktMessageType.STATE:
687702
if self._sync_level != head_level:
688703
self._logger.info('Datasource level set to %s', head_level)
@@ -732,7 +747,6 @@ async def _on_head_message(self, message: List[Dict[str, Any]]) -> None:
732747
created = False
733748
if self._head is None:
734749
self._head, created = await Head.get_or_create(
735-
# NOTE: It would be better to use datasource name but it's not available
736750
name=self._http._url,
737751
defaults=dict(
738752
level=block.level,
@@ -748,6 +762,31 @@ async def _on_head_message(self, message: List[Dict[str, Any]]) -> None:
748762

749763
self.emit_head(block)
750764

765+
# FIXME: I don't like this approach, too hacky.
766+
async def set_head_from_http(self) -> None:
767+
"""Set block from `get_head_block` HTTP method for indexes to use the same level during initial sync"""
768+
if self._head:
769+
self._logger.warning('Attempt to set head twice')
770+
return
771+
block = await self.get_head_block()
772+
self._head, created = await Head.get_or_create(
773+
name=self._http._url,
774+
defaults=dict(
775+
level=block.level,
776+
hash=block.hash,
777+
timestamp=block.timestamp,
778+
),
779+
)
780+
if not created:
781+
self._head.level = block.level # type: ignore
782+
self._head.hash = block.hash # type: ignore
783+
self._head.timestamp = block.timestamp # type: ignore
784+
await self._head.save()
785+
786+
self._logger.info('Datasource head set to block with level %s', self._head.level)
787+
788+
# NOTE: No need to emit?
789+
751790
@classmethod
752791
def convert_operation(cls, operation_json: Dict[str, Any]) -> OperationData:
753792
"""Convert raw operation message from WS/REST into dataclass"""

src/dipdup/dipdup.py

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -45,41 +45,45 @@ def __init__(self, ctx: DipDupContext) -> None:
4545
self._indexes: Dict[str, Index] = {}
4646
self._contracts: Set[ContractConfig] = set()
4747
self._stopped: bool = False
48-
self._synchronized: bool = False
4948

50-
async def run(self) -> None:
49+
async def run(self, spawn_datasources_event: Optional[Event]) -> None:
5150
self._logger.info('Starting index dispatcher')
52-
await self._subscribe_to_datasources()
51+
await self._subscribe_to_datasource_events()
52+
await self._set_datasource_heads()
5353
await self._load_index_states()
5454

5555
while not self._stopped:
56-
with suppress(IndexError):
57-
while index := pending_indexes.pop():
58-
self._indexes[index._config.name] = index
59-
6056
tasks = [index.process() for index in self._indexes.values()]
6157
async with slowdown(1.0):
6258
await gather(*tasks)
6359

6460
await self._check_states()
6561

62+
indexes_spawned = False
63+
with suppress(IndexError):
64+
while index := pending_indexes.popleft():
65+
self._indexes[index._config.name] = index
66+
indexes_spawned = True
67+
if not indexes_spawned:
68+
await self._check_states()
69+
70+
if spawn_datasources_event:
71+
spawn_datasources_event.set()
72+
6673
def stop(self) -> None:
6774
self._stopped = True
6875

6976
async def _check_states(self) -> None:
7077
statuses = [i.state.status for i in self._indexes.values()]
7178

72-
def _have_no(status: IndexStatus) -> bool:
79+
def _every_index_is(status: IndexStatus) -> bool:
7380
nonlocal statuses
74-
return bool(tuple(filter(partial(ne, status), statuses)))
81+
return bool(statuses) and not bool(tuple(filter(partial(ne, status), statuses)))
7582

76-
synching_indexes = _have_no(IndexStatus.REALTIME)
77-
if not synching_indexes:
78-
# TODO: `on_synchronized` hook? Not sure if we need it.
79-
...
83+
# TODO: `on_synchronized` hook? Not sure if we need it.
84+
# if _every_index_is(IndexStatus.REALTIME): ...
8085

81-
pending_oneshot_indexes = _have_no(IndexStatus.ONESHOT)
82-
if not pending_oneshot_indexes:
86+
if _every_index_is(IndexStatus.ONESHOT):
8387
self.stop()
8488

8589
async def _fetch_contracts(self) -> None:
@@ -93,14 +97,19 @@ async def _fetch_contracts(self) -> None:
9397
self._ctx.config.contracts[contract.name] = contract_config
9498
self._ctx.config.pre_initialize()
9599

96-
async def _subscribe_to_datasources(self) -> None:
100+
async def _subscribe_to_datasource_events(self) -> None:
97101
for datasource in self._ctx.datasources.values():
98102
if not isinstance(datasource, IndexDatasource):
99103
continue
100104
datasource.on_operations(self._dispatch_operations)
101105
datasource.on_big_maps(self._dispatch_big_maps)
102106
datasource.on_rollback(self._rollback)
103107

108+
async def _set_datasource_heads(self) -> None:
109+
for datasource in self._ctx.datasources.values():
110+
if isinstance(datasource, TzktDatasource):
111+
await datasource.set_head_from_http()
112+
104113
async def _load_index_states(self) -> None:
105114
await self._fetch_contracts()
106115
index_states = await IndexState.filter().all()
@@ -207,14 +216,15 @@ async def run(self, reindex: bool, oneshot: bool) -> None:
207216
await self._initialize_schema()
208217
await self._set_up_hasura(stack, tasks)
209218

219+
spawn_datasources_event: Optional[Event] = None
210220
if not oneshot:
211221
await self._set_up_scheduler(stack, tasks)
212-
await self._spawn_datasources(tasks)
222+
spawn_datasources_event = await self._spawn_datasources(tasks)
213223

214224
for name in self._config.indexes:
215225
await self._ctx._spawn_index(name)
216226

217-
await self._set_up_index_dispatcher(tasks)
227+
await self._set_up_index_dispatcher(tasks, spawn_datasources_event)
218228

219229
await gather(*tasks)
220230

@@ -313,19 +323,26 @@ async def _set_up_hasura(self, stack: AsyncExitStack, tasks: Set[Task]) -> None:
313323
tasks.add(create_task(hasura_gateway.configure()))
314324

315325
async def _set_up_datasources(self, stack: AsyncExitStack) -> None:
316-
# FIXME: Find a better way to do this
317-
# if self._datasources:
318-
# raise RuntimeError
319326
await self._create_datasources()
320327
for datasource in self._datasources.values():
321328
await stack.enter_async_context(datasource)
322329

323-
async def _set_up_index_dispatcher(self, tasks: Set[Task]) -> None:
330+
async def _set_up_index_dispatcher(self, tasks: Set[Task], spawn_datasources_event: Optional[Event]) -> None:
324331
index_dispatcher = IndexDispatcher(self._ctx)
325-
tasks.add(create_task(index_dispatcher.run()))
332+
tasks.add(create_task(index_dispatcher.run(spawn_datasources_event)))
333+
334+
async def _spawn_datasources(self, tasks: Set[Task]) -> Event:
335+
event = Event()
336+
337+
async def _wrapper():
338+
self._logger.info('Waiting for IndexDispatcher to spawn datasources')
339+
await event.wait()
340+
self._logger.info('Spawning datasources')
341+
_tasks = [create_task(d.run()) for d in self._datasources.values()]
342+
await gather(*_tasks)
326343

327-
async def _spawn_datasources(self, tasks: Set[Task]) -> None:
328-
tasks.update(create_task(d.run()) for d in self._datasources.values())
344+
tasks.add(create_task(_wrapper()))
345+
return event
329346

330347
async def _set_up_scheduler(self, stack: AsyncExitStack, tasks: Set[Task]) -> None:
331348
job_failed = Event()

src/dipdup/index.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
)
2121
from dipdup.context import DipDupContext
2222
from dipdup.datasources.tzkt.datasource import BigMapFetcher, OperationFetcher, TzktDatasource
23-
from dipdup.exceptions import InvalidDataError
23+
from dipdup.exceptions import ConfigInitializationException, InvalidDataError
2424
from dipdup.models import BigMapData, BigMapDiff, HeadBlockData
2525
from dipdup.models import Index as IndexState
2626
from dipdup.models import IndexStatus, OperationData, Origination, Transaction
@@ -86,7 +86,11 @@ async def process(self) -> None:
8686

8787
elif self._datasource.sync_level is None:
8888
self._logger.info('Datasource is not active, sync to the latest block')
89-
last_level = (await self._datasource.get_head_block()).level
89+
# NOTE: Late establishing connection to the WebSocket
90+
if self.datasource.head:
91+
last_level = self.datasource.head.level
92+
else:
93+
last_level = (await self._datasource.get_head_block()).level
9094
await self._synchronize(last_level)
9195

9296
elif self._datasource.sync_level > self.state.level:
@@ -154,6 +158,7 @@ async def single_level_rollback(self, from_level: int) -> None:
154158
raise RuntimeError('Index level is higher than rollback level')
155159

156160
async def _process_queue(self) -> None:
161+
"""Process WebSocket queue"""
157162
if not self._queue:
158163
return
159164
self._logger.info('Processing websocket queue')
@@ -195,8 +200,8 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None:
195200
await self._exit_sync_state(last_level)
196201

197202
async def _process_level_operations(self, level: int, operations: List[OperationData], block: Optional[HeadBlockData] = None) -> None:
198-
if level < self.state.level:
199-
raise RuntimeError(f'Level of operation batch is lower than index state level: {level} < {self.state.level}')
203+
if level <= self.state.level:
204+
raise RuntimeError(f'Level of operation batch must be higher than index state level: {level} <= {self.state.level}')
200205

201206
if self._rollback_level:
202207
levels = {
@@ -214,7 +219,7 @@ async def _process_level_operations(self, level: int, operations: List[Operation
214219
received_hashes = set([op.hash for op in operations])
215220
reused_hashes = received_hashes & expected_hashes
216221
if reused_hashes != expected_hashes:
217-
await self._ctx.reindex(reason='attempted a single level rollback, but arrived block differs from processed one')
222+
await self._ctx.reindex(reason='attempted a single level rollback, but arrived block has additional transactions')
218223

219224
self._rollback_level = None
220225
self._last_hashes = set()
@@ -324,7 +329,7 @@ async def _on_match(
324329
"""Prepare handler arguments, parse parameter and storage. Schedule callback in executor."""
325330
self._logger.info('%s: `%s` handler matched!', operation_subgroup.hash, handler_config.callback)
326331
if not handler_config.parent:
327-
raise RuntimeError('Handler must have a parent')
332+
raise ConfigInitializationException
328333

329334
args: List[Optional[Union[Transaction, Origination, OperationData]]] = []
330335
for pattern_config, operation in zip(handler_config.pattern, matched_operations):
@@ -415,7 +420,8 @@ def __init__(self, ctx: DipDupContext, config: BigMapIndexConfig, datasource: Tz
415420
def push(self, level: int, big_maps: List[BigMapData], block: Optional[HeadBlockData] = None):
416421
self._queue.append((level, big_maps, block))
417422

418-
async def _process_queue(self):
423+
async def _process_queue(self) -> None:
424+
"""Process WebSocket queue"""
419425
if not self._queue:
420426
return
421427
self._logger.info('Processing websocket queue')
@@ -450,8 +456,8 @@ async def _synchronize(self, last_level: int, cache: bool = False) -> None:
450456
await self._exit_sync_state(last_level)
451457

452458
async def _process_level_big_maps(self, level: int, big_maps: List[BigMapData], block: Optional[HeadBlockData] = None):
453-
if level < self.state.level:
454-
raise RuntimeError(f'Level of operation batch is lower than index state level: {level} < {self.state.level}')
459+
if level <= self.state.level:
460+
raise RuntimeError(f'Level of big map batch must be higher than index state level: {level} <= {self.state.level}')
455461

456462
async with in_global_transaction():
457463
self._logger.info('Processing %s big map diffs of level %s', len(big_maps), level)
@@ -479,7 +485,7 @@ async def _on_match(
479485
"""Prepare handler arguments, parse key and value. Schedule callback in executor."""
480486
self._logger.info('%s: `%s` handler matched!', matched_big_map.operation_id, handler_config.callback)
481487
if not handler_config.parent:
482-
raise RuntimeError('Handler must have a parent')
488+
raise ConfigInitializationException
483489

484490
if matched_big_map.action.has_key:
485491
key_type = handler_config.key_type_cls
@@ -525,14 +531,14 @@ async def _process_big_maps(self, big_maps: List[BigMapData]) -> None:
525531
await self._on_match(handler_config, big_map)
526532

527533
async def _get_big_map_addresses(self) -> Set[str]:
528-
"""Get addresses to fetch transactions from during initial synchronization"""
534+
"""Get addresses to fetch big map diffs from during initial synchronization"""
529535
addresses = set()
530536
for handler_config in self._config.handlers:
531537
addresses.add(cast(ContractConfig, handler_config.contract).address)
532538
return addresses
533539

534540
async def _get_big_map_paths(self) -> Set[str]:
535-
"""Get addresses to fetch transactions from during initial synchronization"""
541+
"""Get addresses to fetch big map diffs from during initial synchronization"""
536542
paths = set()
537543
for handler_config in self._config.handlers:
538544
paths.add(handler_config.path)

0 commit comments

Comments
 (0)