Skip to content

Commit 4554be2

Browse files
Fix crash when adding an index with new subscriptions in runtime (#429)
1 parent 3d261bf commit 4554be2

File tree

3 files changed

+14
-9
lines changed

3 files changed

+14
-9
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog],
66
and this project adheres to [Semantic Versioning].
77

8+
## [Unreleased]
9+
10+
### Fixed
11+
12+
- index: Fixed crash when adding an index with new subscriptions in runtime.
13+
814
## [5.2.4] - 2022-07-17
915

1016
### Fixed

src/dipdup/datasources/subscription.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,7 @@ def missing_subscriptions(self) -> Set[Subscription]:
5353
return {k for k, v in self._subscriptions.items() if k is not None and v is None}
5454

5555
def add(self, subscription: Subscription) -> None:
56-
if subscription in self._subscriptions:
57-
if not self._merge_subscriptions:
58-
_logger.warning(f'Subscription already exists: {subscription}')
59-
else:
56+
if subscription not in self._subscriptions:
6057
self._subscriptions[subscription] = None
6158

6259
def remove(self, subscription: Subscription) -> None:

src/dipdup/index.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -209,13 +209,15 @@ async def process(self) -> None:
209209
await self._synchronize(head_level)
210210
await self.state.update_status(IndexStatus.ONESHOT, head_level)
211211

212+
index_level = self.state.level
212213
sync_levels = {self.datasource.get_sync_level(s) for s in self._config.subscriptions}
213-
sync_level = sync_levels.pop()
214-
if sync_level is None:
214+
if not sync_levels:
215+
raise RuntimeError('Index has no subscriptions')
216+
if None in sync_levels:
215217
raise RuntimeError('Call `set_sync_level` before starting IndexDispatcher')
216-
if sync_levels:
217-
raise RuntimeError(f'Multiple sync levels: {sync_level}, {sync_levels}')
218-
index_level = self.state.level
218+
# NOTE: Multiple sync levels means index with new subscriptions was added in runtime.
219+
# NOTE: Choose the highest level; outdated realtime messages will be dropped from the queue anyway.
220+
sync_level = max(cast(Set[int], sync_levels))
219221

220222
if index_level < sync_level:
221223
self._logger.info('Index is behind datasource level, syncing: %s -> %s', index_level, sync_level)

0 commit comments

Comments
 (0)