Skip to content

Commit c146f77

Browse files
Fix handlers being spawned with wrong index (#128)
1 parent 221c633 commit c146f77

File tree

5 files changed

+84
-40
lines changed

5 files changed

+84
-40
lines changed

src/dipdup/config.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,8 +322,8 @@ def parent(self) -> Optional[T]:
322322

323323
@parent.setter
324324
def parent(self, config: T) -> None:
325-
if self._parent:
326-
raise RuntimeError("Can't unset parent once set")
325+
if self._parent and config is not self._parent:
326+
raise ConfigInitializationException
327327
self._parent = config
328328

329329

@@ -525,6 +525,8 @@ def callback_fn(self, fn: Callable) -> None:
525525
self._callback_fn = fn
526526

527527
def initialize_callback_fn(self, package: str):
528+
if self._callback_fn:
529+
return
528530
_logger.debug('Registering %s callback `%s`', self.kind, self.callback)
529531
module_name = f'{package}.{self.kind}s.{self.callback}'
530532
fn_name = self.callback
@@ -1122,6 +1124,7 @@ def _resolve_index_links(self, index_config: IndexConfigT) -> None:
11221124
index_config.datasource = self.get_tzkt_datasource(index_config.datasource)
11231125

11241126
for handler in index_config.handlers:
1127+
handler.parent = index_config
11251128
# TODO: Verify callback uniqueness
11261129
# self._callback_patterns[handler.callback].append(handler.pattern)
11271130
if isinstance(handler.contract, str):

src/dipdup/context.py

Lines changed: 64 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from contextlib import contextmanager, suppress
77
from os.path import exists, join
88
from pprint import pformat
9-
from typing import Any, Dict, Iterator, Optional, Union, cast
9+
from typing import Any, Dict, Iterator, Optional, Tuple, Union, cast
1010

1111
import sqlparse # type: ignore
1212
from tortoise import Tortoise
@@ -19,7 +19,6 @@
1919
DipDupConfig,
2020
HandlerConfig,
2121
HookConfig,
22-
IndexConfig,
2322
IndexTemplateConfig,
2423
OperationIndexConfig,
2524
PostgresDatabaseConfig,
@@ -59,11 +58,25 @@ def __init__(
5958
def __str__(self) -> str:
6059
return pformat(self.__dict__)
6160

62-
async def fire_hook(self, name: str, fmt: Optional[str] = None, *args, **kwargs: Any) -> None:
61+
async def fire_hook(
62+
self,
63+
name: str,
64+
fmt: Optional[str] = None,
65+
*args,
66+
**kwargs: Any,
67+
) -> None:
6368
await self.callbacks.fire_hook(self, name, fmt, *args, **kwargs)
6469

65-
async def fire_handler(self, name: str, datasource: Datasource, fmt: Optional[str] = None, *args, **kwargs: Any) -> None:
66-
await self.callbacks.fire_handler(self, name, datasource, fmt, *args, **kwargs)
70+
async def fire_handler(
71+
self,
72+
name: str,
73+
index: str,
74+
datasource: Datasource,
75+
fmt: Optional[str] = None,
76+
*args,
77+
**kwargs: Any,
78+
) -> None:
79+
await self.callbacks.fire_handler(self, name, index, datasource, fmt, *args, **kwargs)
6780

6881
async def execute_sql(self, name: str) -> None:
6982
await self.callbacks.execute_sql(self, name)
@@ -200,50 +213,54 @@ def __init__(
200213
self.logger = logger
201214
self.handler_config = handler_config
202215
self.datasource = datasource
203-
template_values = cast(IndexConfig, handler_config.parent).template_values if handler_config.parent else {}
216+
template_values = handler_config.parent.template_values if handler_config.parent else {}
204217
self.template_values = TemplateValuesDict(self, **template_values)
205218

206219

207220
class CallbackManager:
208221
def __init__(self, package: str) -> None:
209222
self._logger = logging.getLogger('dipdup.callback')
210223
self._package = package
211-
self._handlers: Dict[str, HandlerConfig] = {}
224+
self._handlers: Dict[Tuple[str, str], HandlerConfig] = {}
212225
self._hooks: Dict[str, HookConfig] = {}
213226

214227
def register_handler(self, handler_config: HandlerConfig) -> None:
215-
if handler_config.callback not in self._handlers:
216-
self._handlers[handler_config.callback] = handler_config
228+
if not handler_config.parent:
229+
raise RuntimeError('Handler must have a parent index')
230+
231+
# NOTE: Same handlers can be linked to different indexes, we need to use exact config
232+
key = (handler_config.callback, handler_config.parent.name)
233+
if key not in self._handlers:
234+
self._handlers[key] = handler_config
217235
handler_config.initialize_callback_fn(self._package)
218236

219237
def register_hook(self, hook_config: HookConfig) -> None:
220-
if hook_config.callback not in self._hooks:
221-
self._hooks[hook_config.callback] = hook_config
238+
key = hook_config.callback
239+
if key not in self._hooks:
240+
self._hooks[key] = hook_config
222241
hook_config.initialize_callback_fn(self._package)
223242

224243
async def fire_handler(
225244
self,
226245
ctx: 'DipDupContext',
227246
name: str,
247+
index: str,
228248
datasource: Datasource,
229249
fmt: Optional[str] = None,
230250
*args,
231251
**kwargs: Any,
232252
) -> None:
233-
try:
234-
new_ctx = HandlerContext(
235-
datasources=ctx.datasources,
236-
config=ctx.config,
237-
callbacks=ctx.callbacks,
238-
logger=FormattedLogger(f'dipdup.handlers.{name}', fmt),
239-
handler_config=self._handlers[name],
240-
datasource=datasource,
241-
)
242-
except KeyError as e:
243-
raise ConfigurationError(f'Attempt to fire unregistered handler `{name}`') from e
244-
253+
handler_config = self._get_handler(name, index)
254+
new_ctx = HandlerContext(
255+
datasources=ctx.datasources,
256+
config=ctx.config,
257+
callbacks=ctx.callbacks,
258+
logger=FormattedLogger(f'dipdup.handlers.{name}', fmt),
259+
handler_config=handler_config,
260+
datasource=datasource,
261+
)
245262
with self._wrapper('handler', name):
246-
await new_ctx.handler_config.callback_fn(new_ctx, *args, **kwargs)
263+
await handler_config.callback_fn(new_ctx, *args, **kwargs)
247264

248265
async def fire_hook(
249266
self,
@@ -253,20 +270,18 @@ async def fire_hook(
253270
*args,
254271
**kwargs: Any,
255272
) -> None:
256-
try:
257-
ctx = HookContext(
258-
datasources=ctx.datasources,
259-
config=ctx.config,
260-
callbacks=ctx.callbacks,
261-
logger=FormattedLogger(f'dipdup.hooks.{name}', fmt),
262-
hook_config=self._hooks[name],
263-
)
264-
except KeyError as e:
265-
raise ConfigurationError(f'Attempt to fire unregistered hook `{name}`') from e
273+
hook_config = self._get_hook(name)
274+
new_ctx = HookContext(
275+
datasources=ctx.datasources,
276+
config=ctx.config,
277+
callbacks=ctx.callbacks,
278+
logger=FormattedLogger(f'dipdup.hooks.{name}', fmt),
279+
hook_config=hook_config,
280+
)
266281

267-
self._verify_arguments(ctx, *args, **kwargs)
282+
self._verify_arguments(new_ctx, *args, **kwargs)
268283
with self._wrapper('hook', name):
269-
await ctx.hook_config.callback_fn(ctx, *args, **kwargs)
284+
await hook_config.callback_fn(ctx, *args, **kwargs)
270285

271286
async def execute_sql(self, ctx: 'DipDupContext', name: str) -> None:
272287
"""Execute SQL included with project"""
@@ -328,3 +343,16 @@ def _verify_arguments(cls, ctx: HookContext, *args, **kwargs) -> None:
328343
type_=type(arg),
329344
expected_type=expected_type,
330345
)
346+
347+
def _get_handler(self, name: str, index: str) -> HandlerConfig:
348+
try:
349+
return self._handlers[(name, index)]
350+
except KeyError as e:
351+
raise ConfigurationError(f'Attempt to fire unregistered handler `{name}` of index `{index}`') from e
352+
353+
def _get_hook(self, name: str) -> HookConfig:
354+
355+
try:
356+
return self._hooks[name]
357+
except KeyError as e:
358+
raise ConfigurationError(f'Attempt to fire unregistered hook `{name}`') from e

src/dipdup/datasources/tzkt/datasource.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,7 @@ async def _extract_message_data(self, channel: str, message: List[Any]) -> Async
667667
if self._level and head_level < self._level:
668668
raise RuntimeError('Received data message from level lower than current: {head_level} < {self._level}')
669669

670-
# TODO: State messages will be dropped from TzKT
670+
# NOTE: State messages will be replaced with negotiation some day
671671
if message_type == TzktMessageType.STATE:
672672
if self._sync_level != head_level:
673673
self._logger.info('Datasource level set to %s', head_level)

src/dipdup/http.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@
1515
from dipdup import __version__
1616
from dipdup.config import HTTPConfig # type: ignore
1717

18+
safe_exceptions = (
19+
aiohttp.ClientConnectionError,
20+
aiohttp.ClientConnectorError,
21+
aiohttp.ClientResponseError,
22+
aiohttp.ClientPayloadError,
23+
)
24+
1825

1926
class HTTPGateway(ABC):
2027
"""Base class for datasources which connect to remote HTTP endpoints"""
@@ -108,7 +115,7 @@ async def _retry_request(self, method: str, url: str, weight: int = 1, **kwargs)
108115
weight=weight,
109116
**kwargs,
110117
)
111-
except (aiohttp.ClientConnectionError, aiohttp.ClientConnectorError, aiohttp.ClientResponseError) as e:
118+
except safe_exceptions as e:
112119
if self._config.retry_count and attempt - 1 == self._config.retry_count:
113120
raise e
114121

src/dipdup/index.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,8 @@ async def _on_match(
325325
):
326326
"""Prepare handler arguments, parse parameter and storage. Schedule callback in executor."""
327327
self._logger.info('%s: `%s` handler matched!', operation_subgroup.hash, handler_config.callback)
328+
if not handler_config.parent:
329+
raise RuntimeError('Handler must have a parent')
328330

329331
args: List[Optional[Union[Transaction, Origination, OperationData]]] = []
330332
for pattern_config, operation in zip(handler_config.pattern, matched_operations):
@@ -367,6 +369,7 @@ async def _on_match(
367369

368370
await self._ctx.fire_handler(
369371
handler_config.callback,
372+
handler_config.parent.name,
370373
self.datasource,
371374
operation_subgroup.hash + ': {}',
372375
*args,
@@ -474,6 +477,8 @@ async def _on_match(
474477
) -> None:
475478
"""Prepare handler arguments, parse key and value. Schedule callback in executor."""
476479
self._logger.info('%s: `%s` handler matched!', matched_big_map.operation_id, handler_config.callback)
480+
if not handler_config.parent:
481+
raise RuntimeError('Handler must have a parent')
477482

478483
if matched_big_map.action.has_key:
479484
key_type = handler_config.key_type_cls
@@ -502,6 +507,7 @@ async def _on_match(
502507

503508
await self._ctx.fire_handler(
504509
handler_config.callback,
510+
handler_config.parent.name,
505511
self.datasource,
506512
# FIXME: missing `operation_id` field in API to identify operation
507513
None,

0 commit comments

Comments
 (0)