Skip to content

Commit 7865bb8

Browse files
Fix dispatching bigmaps to indexes with wrong datasource, refactor event emitter related code (#75)
1 parent 9f8fa0e commit 7865bb8

File tree

5 files changed

+122
-47
lines changed

5 files changed

+122
-47
lines changed

src/dipdup/context.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,14 @@
1212

1313

1414
# TODO: Dataclasses are cool, everyone loves them. Resolve issue with pydantic in HandlerContext.
15-
class HandlerContext:
16-
"""Common handler context."""
17-
15+
class DipDupContext:
1816
def __init__(
1917
self,
2018
datasources: Dict[str, DatasourceT],
2119
config: DipDupConfig,
22-
logger: FormattedLogger,
23-
template_values: Optional[Dict[str, str]],
2420
) -> None:
2521
self.datasources = datasources
2622
self.config = config
27-
self.logger = logger
28-
self.template_values = template_values
2923
self._updated: bool = False
3024

3125
def commit(self) -> None:
@@ -70,6 +64,23 @@ async def reindex(self) -> None:
7064
await Tortoise._drop_databases()
7165
await self.restart()
7266

67+
68+
class HandlerContext(DipDupContext):
69+
"""Common handler context."""
70+
71+
def __init__(
72+
self,
73+
datasources: Dict[str, DatasourceT],
74+
config: DipDupConfig,
75+
logger: FormattedLogger,
76+
template_values: Optional[Dict[str, str]],
77+
datasource: DatasourceT,
78+
) -> None:
79+
super().__init__(datasources, config)
80+
self.logger = logger
81+
self.template_values = template_values
82+
self.datasource = datasource
83+
7384
def add_contract(self, name: str, address: str, typename: Optional[str] = None) -> None:
7485
if name in self.config.contracts:
7586
raise ConfigurationError(f'Contract `{name}` is already exists')
@@ -91,16 +102,17 @@ def add_index(self, name: str, template: str, values: Dict[str, Any]) -> None:
91102

92103

93104
class RollbackHandlerContext(HandlerContext):
105+
template_values: None
106+
94107
def __init__(
95108
self,
96109
datasources: Dict[str, DatasourceT],
97110
config: DipDupConfig,
98111
logger: FormattedLogger,
99-
datasource: str,
112+
datasource: DatasourceT,
100113
from_level: int,
101114
to_level: int,
102115
) -> None:
103-
super().__init__(datasources, config, logger, None)
104-
self.datasource = datasource
116+
super().__init__(datasources, config, logger, None, datasource)
105117
self.from_level = from_level
106118
self.to_level = to_level
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from abc import ABC
2+
from enum import Enum
3+
from typing import Awaitable, List, Protocol
4+
5+
from pyee import AsyncIOEventEmitter # type: ignore
6+
7+
from dipdup.models import BigMapData, OperationData
8+
9+
10+
class EventType(Enum):
11+
operations = 'operatitions'
12+
big_maps = 'big_maps'
13+
rollback = 'rollback'
14+
15+
16+
class OperationsCallback(Protocol):
17+
def __call__(self, datasource: 'IndexDatasource', operations: List[OperationData]) -> Awaitable[None]:
18+
...
19+
20+
21+
class BigMapsCallback(Protocol):
22+
def __call__(self, datasource: 'IndexDatasource', big_maps: List[BigMapData]) -> Awaitable[None]:
23+
...
24+
25+
26+
class RollbackCallback(Protocol):
27+
def __call__(self, datasource: 'IndexDatasource', from_level: int, to_level: int) -> Awaitable[None]:
28+
...
29+
30+
31+
class IndexDatasource(ABC, AsyncIOEventEmitter):
32+
def on(self, event, f=None) -> None:
33+
raise RuntimeError('Do not use `on` directly')
34+
35+
def emit(self, event: str, *args, **kwargs) -> None:
36+
if event not in ('new_listener', 'error'):
37+
raise RuntimeError('Do not use `emit` directly')
38+
super().emit(event, *args, **kwargs)
39+
40+
def on_operations(self, fn: OperationsCallback) -> None:
41+
super().on(EventType.operations, fn)
42+
43+
def on_big_maps(self, fn: BigMapsCallback) -> None:
44+
super().on(EventType.big_maps, fn)
45+
46+
def on_rollback(self, fn: RollbackCallback) -> None:
47+
super().on(EventType.rollback, fn)
48+
49+
def emit_operations(self, operations: List[OperationData]) -> None:
50+
super().emit(EventType.operations, datasource=self, operations=operations)
51+
52+
def emit_big_maps(self, big_maps: List[BigMapData]) -> None:
53+
super().emit(EventType.big_maps, datasource=self, big_maps=big_maps)
54+
55+
def emit_rollback(self, from_level: int, to_level: int) -> None:
56+
super().emit(EventType.rollback, datasource=self, from_level=from_level, to_level=to_level)

src/dipdup/datasources/tzkt/datasource.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from aiosignalrcore.hub_connection_builder import HubConnectionBuilder # type: ignore
88
from aiosignalrcore.messages.completion_message import CompletionMessage # type: ignore
99
from aiosignalrcore.transport.websockets.connection import ConnectionState # type: ignore
10-
from pyee import AsyncIOEventEmitter # type: ignore
1110

1211
from dipdup.config import (
1312
BigMapIndexConfig,
@@ -16,6 +15,7 @@
1615
OperationHandlerOriginationPatternConfig,
1716
OperationIndexConfig,
1817
)
18+
from dipdup.datasources.datasource import IndexDatasource
1919
from dipdup.datasources.proxy import DatasourceRequestProxy
2020
from dipdup.datasources.tzkt.enums import TzktMessageType
2121
from dipdup.models import BigMapAction, BigMapData, OperationData
@@ -254,7 +254,7 @@ async def fetch_big_maps_by_level(self) -> AsyncGenerator[Tuple[int, List[BigMap
254254
yield big_maps[0].level, big_maps[: i + 1]
255255

256256

257-
class TzktDatasource(AsyncIOEventEmitter):
257+
class TzktDatasource(IndexDatasource):
258258
"""Bridge between REST/WS TzKT endpoints and DipDup.
259259
260260
* Converts raw API data to models
@@ -452,7 +452,7 @@ async def add_index(self, index_config: IndexConfigTemplateT) -> None:
452452
else:
453453
raise NotImplementedError(f'Index kind `{index_config.kind}` is not supported')
454454

455-
await self.on_connect()
455+
await self._on_connect()
456456

457457
def _get_client(self) -> BaseHubConnection:
458458
"""Create SignalR client, register message callbacks"""
@@ -471,10 +471,10 @@ def _get_client(self) -> BaseHubConnection:
471471
)
472472
).build()
473473

474-
self._client.on_open(self.on_connect)
475-
self._client.on_error(self.on_error)
476-
self._client.on('operations', self.on_operation_message)
477-
self._client.on('bigmaps', self.on_big_map_message)
474+
self._client.on_open(self._on_connect)
475+
self._client.on_error(self._on_error)
476+
self._client.on('operations', self._on_operation_message)
477+
self._client.on('bigmaps', self._on_big_map_message)
478478

479479
return self._client
480480

@@ -485,7 +485,7 @@ async def run(self) -> None:
485485
self._logger.info('Starting websocket client')
486486
await self._get_client().start()
487487

488-
async def on_connect(self) -> None:
488+
async def _on_connect(self) -> None:
489489
"""Subscribe to all required channels on established WS connection"""
490490
if self._get_client().transport.state != ConnectionState.connected:
491491
return
@@ -499,7 +499,7 @@ async def on_connect(self) -> None:
499499
for address, paths in self._big_map_subscriptions.items():
500500
await self.subscribe_to_big_maps(address, paths)
501501

502-
def on_error(self, message: CompletionMessage) -> NoReturn:
502+
def _on_error(self, message: CompletionMessage) -> NoReturn:
503503
"""Raise exception from WS server's error message"""
504504
raise Exception(message.error)
505505

@@ -554,7 +554,7 @@ async def subscribe_to_big_maps(self, address: str, paths: List[str]) -> None:
554554
],
555555
)
556556

557-
async def on_operation_message(
557+
async def _on_operation_message(
558558
self,
559559
message: List[Dict[str, Any]],
560560
) -> None:
@@ -577,18 +577,19 @@ async def on_operation_message(
577577
if operation.status != 'applied':
578578
continue
579579
operations.append(operation)
580-
self.emit("operations", operations)
580+
self.emit_operations(operations)
581581

582582
elif message_type == TzktMessageType.REORG:
583-
self.emit("rollback", self.level, current_level)
583+
if self.level is None:
584+
raise RuntimeError
585+
self.emit_rollback(self.level, current_level)
584586

585587
else:
586588
raise NotImplementedError
587589

588-
async def on_big_map_message(
590+
async def _on_big_map_message(
589591
self,
590592
message: List[Dict[str, Any]],
591-
sync=False,
592593
) -> None:
593594
"""Parse and emit raw big map diffs from WS"""
594595
for item in message:
@@ -606,10 +607,12 @@ async def on_big_map_message(
606607
for big_map_json in item['data']:
607608
big_map = self.convert_big_map(big_map_json)
608609
big_maps.append(big_map)
609-
self.emit("big_maps", big_maps)
610+
self.emit_big_maps(big_maps)
610611

611612
elif message_type == TzktMessageType.REORG:
612-
self.emit("rollback", self.level, current_level)
613+
if self.level is None:
614+
raise RuntimeError
615+
self.emit_rollback(self.level, current_level)
613616

614617
else:
615618
raise NotImplementedError

src/dipdup/dipdup.py

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import asyncio
22
import hashlib
33
import logging
4-
from functools import partial
54
from os.path import join
65
from posix import listdir
76
from typing import Dict, List, cast
@@ -26,18 +25,19 @@
2625
StaticTemplateConfig,
2726
TzktDatasourceConfig,
2827
)
29-
from dipdup.context import RollbackHandlerContext
28+
from dipdup.context import DipDupContext, RollbackHandlerContext
3029
from dipdup.datasources import DatasourceT
3130
from dipdup.datasources.bcd.datasource import BcdDatasource
31+
from dipdup.datasources.datasource import IndexDatasource
3232
from dipdup.datasources.tzkt.datasource import TzktDatasource
3333
from dipdup.exceptions import ConfigurationError, HandlerImportError
3434
from dipdup.hasura import configure_hasura
35-
from dipdup.index import BigMapIndex, HandlerContext, Index, OperationIndex
35+
from dipdup.index import BigMapIndex, Index, OperationIndex
3636
from dipdup.models import BigMapData, IndexType, OperationData, State
3737

3838

3939
class IndexDispatcher:
40-
def __init__(self, ctx: HandlerContext) -> None:
40+
def __init__(self, ctx: DipDupContext) -> None:
4141
self._ctx = ctx
4242

4343
self._logger = logging.getLogger(__name__)
@@ -82,21 +82,21 @@ async def reload_config(self) -> None:
8282

8383
self._ctx.reset()
8484

85-
async def dispatch_operations(self, operations: List[OperationData]) -> None:
85+
async def dispatch_operations(self, datasource: TzktDatasource, operations: List[OperationData]) -> None:
8686
assert len(set(op.level for op in operations)) == 1
8787
level = operations[0].level
8888
for index in self._indexes.values():
89-
if isinstance(index, OperationIndex):
89+
if isinstance(index, OperationIndex) and index.datasource == datasource:
9090
index.push(level, operations)
9191

92-
async def dispatch_big_maps(self, big_maps: List[BigMapData]) -> None:
92+
async def dispatch_big_maps(self, datasource: TzktDatasource, big_maps: List[BigMapData]) -> None:
9393
assert len(set(op.level for op in big_maps)) == 1
9494
level = big_maps[0].level
9595
for index in self._indexes.values():
96-
if isinstance(index, BigMapIndex):
96+
if isinstance(index, BigMapIndex) and index.datasource == datasource:
9797
index.push(level, big_maps)
9898

99-
async def _rollback(self, datasource: str, from_level: int, to_level: int) -> None:
99+
async def _rollback(self, datasource: TzktDatasource, from_level: int, to_level: int) -> None:
100100
logger = utils.FormattedLogger(ROLLBACK_HANDLER)
101101
rollback_fn = self._ctx.config.get_rollback_fn()
102102
ctx = RollbackHandlerContext(
@@ -111,12 +111,12 @@ async def _rollback(self, datasource: str, from_level: int, to_level: int) -> No
111111

112112
async def run(self, oneshot=False) -> None:
113113
self._logger.info('Starting index dispatcher')
114-
for name, datasource in self._ctx.datasources.items():
115-
if not isinstance(datasource, TzktDatasource):
114+
for datasource in self._ctx.datasources.values():
115+
if not isinstance(datasource, IndexDatasource):
116116
continue
117-
datasource.on('operations', self.dispatch_operations)
118-
datasource.on('big_maps', self.dispatch_big_maps)
119-
datasource.on('rollback', partial(self._rollback, datasource=name))
117+
datasource.on_operations(self.dispatch_operations)
118+
datasource.on_big_maps(self.dispatch_big_maps)
119+
datasource.on_rollback(self._rollback)
120120

121121
self._ctx.commit()
122122

@@ -143,11 +143,9 @@ def __init__(self, config: DipDupConfig) -> None:
143143
self._config = config
144144
self._datasources: Dict[str, DatasourceT] = {}
145145
self._datasources_by_config: Dict[DatasourceConfigT, DatasourceT] = {}
146-
self._ctx = HandlerContext(
146+
self._ctx = DipDupContext(
147147
config=self._config,
148148
datasources=self._datasources,
149-
logger=utils.FormattedLogger(__name__),
150-
template_values=None,
151149
)
152150
self._index_dispatcher = IndexDispatcher(self._ctx)
153151

src/dipdup/index.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
OperationIndexConfig,
1717
OperationType,
1818
)
19-
from dipdup.context import HandlerContext
19+
from dipdup.context import DipDupContext, HandlerContext
2020
from dipdup.datasources.tzkt.datasource import BigMapFetcher, OperationFetcher, TzktDatasource
2121
from dipdup.models import BigMapAction, BigMapData, BigMapDiff, OperationData, Origination, State, TemporaryState, Transaction
2222
from dipdup.utils import FormattedLogger, in_global_transaction
@@ -25,14 +25,18 @@
2525

2626

2727
class Index:
28-
def __init__(self, ctx: HandlerContext, config: IndexConfigTemplateT, datasource: TzktDatasource) -> None:
28+
def __init__(self, ctx: DipDupContext, config: IndexConfigTemplateT, datasource: TzktDatasource) -> None:
2929
self._ctx = ctx
3030
self._config = config
3131
self._datasource = datasource
3232

3333
self._logger = logging.getLogger(__name__)
3434
self._state: Optional[State] = None
3535

36+
@property
37+
def datasource(self) -> TzktDatasource:
38+
return self._datasource
39+
3640
async def get_state(self) -> State:
3741
"""Get state of index containing current level and config hash"""
3842
if self._state is None:
@@ -90,7 +94,7 @@ async def _initialize_index_state(self) -> None:
9094
class OperationIndex(Index):
9195
_config: OperationIndexConfig
9296

93-
def __init__(self, ctx: HandlerContext, config: OperationIndexConfig, datasource: TzktDatasource) -> None:
97+
def __init__(self, ctx: DipDupContext, config: OperationIndexConfig, datasource: TzktDatasource) -> None:
9498
super().__init__(ctx, config, datasource)
9599
self._queue: Deque[Tuple[int, List[OperationData]]] = deque()
96100
self._contract_hashes: Dict[str, Tuple[str, str]] = {}
@@ -283,6 +287,7 @@ async def _on_match(
283287
config=self._ctx.config,
284288
logger=logger,
285289
template_values=self._config.template_values,
290+
datasource=self.datasource,
286291
)
287292

288293
await handler_config.callback_fn(handler_context, *args)
@@ -325,7 +330,7 @@ async def _get_contract_hashes(self, address: str) -> Tuple[str, str]:
325330
class BigMapIndex(Index):
326331
_config: BigMapIndexConfig
327332

328-
def __init__(self, ctx: HandlerContext, config: BigMapIndexConfig, datasource: TzktDatasource) -> None:
333+
def __init__(self, ctx: DipDupContext, config: BigMapIndexConfig, datasource: TzktDatasource) -> None:
329334
super().__init__(ctx, config, datasource)
330335
self._queue: Deque[Tuple[int, List[BigMapData]]] = deque()
331336

@@ -423,6 +428,7 @@ async def _on_match(
423428
config=self._ctx.config,
424429
logger=logger,
425430
template_values=self._config.template_values,
431+
datasource=self.datasource,
426432
)
427433

428434
await handler_config.callback_fn(handler_context, big_map_context)

0 commit comments

Comments
 (0)