Skip to content

Commit f22a1bc

Browse files
Call rollback hook instead of triggering reindex when single-level rollback has failed (#369)
* Call rollback hook instead of triggering reindex when single-level rollback has failed * Typo in changelog * Allow mixing oneshot and regular indexes in a single config (#367) * Decrease the size of generic and `-pytezos` Docker images by 11% and 16%, respectively (#370) * Fix crash with `RuntimeError` after continuous realtime connection loss (#366) * Fix crash with `RuntimeError` after continuous realtime connection loss * Less agressive backoff * Lint
1 parent e6ea103 commit f22a1bc

File tree

6 files changed

+70
-43
lines changed

6 files changed

+70
-43
lines changed

CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
### Fixed
66

77
* index: Allow mixing oneshot and regular indexes in a single config.
8+
* index: Call rollback hook instead of triggering reindex when single-level rollback has failed.
89
* index: Fixed crash with `RuntimeError` after continuous realtime connection loss.
910
* index: Fixed `OperationIndexConfig.types` field being partially ignored.
1011
* tzkt: Fixed `origination` subscription missing when `merge_subscriptions` flag is set.
@@ -52,7 +53,7 @@
5253
* ci: Push `X` and `X.Y` tags to the Docker Hub on release.
5354
* cli: Added `config env` command to export env-file with default values.
5455
* cli: Show warning when running an outdated version of DipDup.
55-
* hooks: Added new hook `on_index_rollback` to perform per-index rollbacks.
56+
* hooks: Added a new hook `on_index_rollback` to perform per-index rollbacks.
5657

5758
### Fixed
5859

@@ -70,7 +71,7 @@
7071

7172
* exceptions: Fixed incorrect formatting and broken links in help messages.
7273
* index: Fixed crash when the only index in config is `head`.
73-
* index: Fixed fetching originations during initial sync.
74+
* index: Fixed fetching originations during the initial sync.
7475

7576
## 5.0.3 - 2022-05-04
7677

poetry.lock

Lines changed: 12 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/dipdup/config.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,12 @@
5252
from dipdup.enums import SkipHistory
5353
from dipdup.exceptions import ConfigInitializationException
5454
from dipdup.exceptions import ConfigurationError
55+
from dipdup.exceptions import ConflictingHooksError
5556
from dipdup.exceptions import IndexAlreadyExistsError
57+
from dipdup.exceptions import InitializationRequiredError
5658
from dipdup.utils import exclude_none
5759
from dipdup.utils import import_from
60+
from dipdup.utils import is_importable
5861
from dipdup.utils import pascal_to_snake
5962
from dipdup.utils import snake_to_pascal
6063

@@ -1225,6 +1228,23 @@ def package_path(self) -> str:
12251228
except ImportError:
12261229
return os.path.join(os.getcwd(), self.package)
12271230

1231+
# TODO: Remove in 6.0
1232+
@cached_property
1233+
def per_index_rollback(self) -> bool:
1234+
"""Check if package has `on_index_rollback` hook"""
1235+
new_hook = is_importable(f'{self.package}.hooks.on_index_rollback', 'on_index_rollback')
1236+
old_hook = is_importable(f'{self.package}.hooks.on_rollback', 'on_rollback')
1237+
if new_hook and new_hook:
1238+
raise ConflictingHooksError('on_rollback', 'on_index_rollback')
1239+
elif not new_hook and not old_hook:
1240+
raise InitializationRequiredError('none of `on_rollback` or `on_index_rollback` hooks found')
1241+
elif new_hook:
1242+
return True
1243+
elif old_hook:
1244+
return False
1245+
else:
1246+
raise RuntimeError
1247+
12281248
@property
12291249
def oneshot(self) -> bool:
12301250
"""Whether all indexes have `last_level` field set"""

src/dipdup/dipdup.py

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,8 @@
6868

6969

7070
class IndexDispatcher:
71-
def __init__(self, ctx: DipDupContext, index_rollback: bool = True) -> None:
71+
def __init__(self, ctx: DipDupContext) -> None:
7272
self._ctx = ctx
73-
self._index_rollback = index_rollback
7473

7574
self._logger = logging.getLogger('dipdup')
7675
self._indexes: Dict[str, Index] = {}
@@ -332,7 +331,7 @@ async def _on_rollback(self, datasource: IndexDatasource, type_: MessageType, fr
332331
self._logger.info('`%s` rollback complete', channel)
333332
return
334333

335-
if self._index_rollback:
334+
if self._ctx.config.per_index_rollback:
336335
hook_name = 'on_index_rollback'
337336
for index_name in unprocessed_indexes:
338337
self._logger.warning('`%s`: can\'t process, firing `%s` hook', index_name, hook_name)
@@ -562,17 +561,7 @@ async def _set_up_index_dispatcher(
562561
start_scheduler_event: Event,
563562
early_realtime: bool,
564563
) -> None:
565-
# NOTE: Decide how to handle rollbacks depending on hooks presence
566-
# TODO: Remove in 6.0
567-
old_hook, new_hook = 'on_rollback', 'on_index_rollback'
568-
has_old_hook = is_importable(f'{self._config.package}.hooks.{old_hook}', old_hook)
569-
has_new_hook = is_importable(f'{self._config.package}.hooks.{new_hook}', new_hook)
570-
if has_old_hook and has_new_hook:
571-
raise ConflictingHooksError(old_hook, new_hook)
572-
elif not has_old_hook and not has_new_hook:
573-
raise InitializationRequiredError('none of `on_rollback` or `on_index_rollback` hooks found')
574-
575-
index_dispatcher = IndexDispatcher(self._ctx, index_rollback=has_new_hook)
564+
index_dispatcher = IndexDispatcher(self._ctx)
576565
tasks.add(
577566
create_task(
578567
index_dispatcher.run(

src/dipdup/index.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -390,11 +390,13 @@ async def _process_level_operations(self, operation_subgroups: Tuple[OperationSu
390390
if batch_level < index_level:
391391
raise RuntimeError(f'Batch level is lower than index level: {batch_level} < {index_level}')
392392

393+
# NOTE: Single level rollback was triggered
393394
if head_level := self._next_head_level:
394395
if head_level != index_level:
395396
raise RuntimeError(f'New head level is not equal to index level: {head_level} != {index_level}')
396397

397398
self._logger.info('Rolling back to the previous level, verifying processed operations')
399+
rollback_hook_called = False
398400
old_head_hashes = set(self._head_hashes)
399401
old_head_matched_hashes = {k for k, v in self._head_hashes.items() if v}
400402
new_head_hashes = {s.hash for s in operation_subgroups}
@@ -404,19 +406,31 @@ async def _process_level_operations(self, operation_subgroups: Tuple[OperationSu
404406

405407
self._logger.info('Comparing hashes: %s new, %s missing', len(unprocessed_hashes), len(missing_hashes))
406408
if missing_hashes:
407-
self._logger.info('Some operations were backtracked, requesting reindexing')
408-
await self._ctx.reindex(
409-
ReindexingReason.rollback,
410-
datasource=self._datasource.name,
411-
from_level=head_level,
412-
# NOTE: Index level is not decreased on a single-level rollback
413-
to_level=head_level - 1,
414-
missing_hashes=', '.join(missing_hashes),
415-
)
409+
rollback_hook_called = True
410+
self._logger.info('Some operations were backtracked, calling rollback hook')
411+
if self._ctx.config.per_index_rollback:
412+
hook_name = 'on_index_rollback'
413+
await self._ctx.fire_hook(
414+
hook_name,
415+
index=self,
416+
from_level=head_level,
417+
to_level=head_level - 1,
418+
)
419+
else:
420+
hook_name = 'on_rollback'
421+
await self._ctx.fire_hook(
422+
hook_name,
423+
datasource=self.datasource,
424+
from_level=head_level + 1,
425+
to_level=head_level,
426+
)
416427

417428
self._next_head_level = None
418429
self._head_hashes.clear()
419430

431+
if rollback_hook_called:
432+
return
433+
420434
operation_subgroups = tuple(filter(lambda subgroup: subgroup.hash in unprocessed_hashes, operation_subgroups))
421435

422436
self._logger.debug('Processing %s operation subgroups of level %s', len(operation_subgroups), batch_level)

tests/test_dipdup/test_rollback.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,12 @@
2525

2626

2727
def _get_index_dispatcher() -> IndexDispatcher:
28-
return IndexDispatcher(
29-
ctx=Mock(spec=DipDupContext),
28+
index_dispatcher = IndexDispatcher(
29+
ctx=MagicMock(spec=DipDupContext),
3030
)
31+
index_dispatcher._ctx.reindex = AsyncMock() # type: ignore
32+
index_dispatcher._ctx.config = AsyncMock() # type: ignore
33+
return index_dispatcher
3134

3235

3336
def _get_operation_index(level: int) -> OperationIndex:
@@ -62,6 +65,7 @@ def _get_operation_index(level: int) -> OperationIndex:
6265
index._state.save = AsyncMock() # type: ignore
6366
index._call_matched_handler = AsyncMock() # type: ignore
6467
index._ctx.reindex = AsyncMock() # type: ignore
68+
index._ctx.config = MagicMock()
6569

6670
return index
6771

@@ -352,12 +356,11 @@ async def test_single_level_new_head_less(self) -> None:
352356
assert operation_index._next_head_level is None
353357
assert operation_index.state.level == from_level
354358

355-
operation_index._ctx.reindex.assert_awaited_with(
356-
ReindexingReason.rollback,
357-
datasource=operation_index.datasource.name,
359+
operation_index._ctx.fire_hook.assert_awaited_with(
360+
'on_index_rollback',
361+
index=operation_index,
358362
from_level=from_level,
359363
to_level=to_level,
360-
missing_hashes='1',
361364
)
362365
assert operation_index._call_matched_handler.await_count == 2
363366

0 commit comments

Comments
 (0)