Skip to content

Commit e6ea103

Browse files
Fix crash with RuntimeError after continuous realtime connection loss (#366)
* Fix crash with `RuntimeError` after continuous realtime connection loss * Less agressive backoff
1 parent c1b14cf commit e6ea103

File tree

5 files changed

+59
-31
lines changed

5 files changed

+59
-31
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
### Fixed
66

77
* index: Allow mixing oneshot and regular indexes in a single config.
8+
* index: Fixed crash with `RuntimeError` after continuous realtime connection loss.
89
* index: Fixed `OperationIndexConfig.types` field being partially ignored.
910
* tzkt: Fixed `origination` subscription missing when `merge_subscriptions` flag is set.
1011

src/dipdup/datasources/datasource.py

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
_logger = logging.getLogger('dipdup.datasource')
2323

2424

25+
EmptyCallbackT = Callable[[], Awaitable[None]]
2526
HeadCallbackT = Callable[['IndexDatasource', HeadBlockData], Awaitable[None]]
2627
OperationsCallbackT = Callable[['IndexDatasource', Tuple[OperationData, ...]], Awaitable[None]]
2728
BigMapsCallbackT = Callable[['IndexDatasource', Tuple[BigMapData, ...]], Awaitable[None]]
@@ -69,10 +70,12 @@ class GraphQLDatasource(Datasource):
6970
class IndexDatasource(Datasource):
7071
def __init__(self, url: str, http_config: HTTPConfig, merge_subscriptions: bool = False) -> None:
7172
super().__init__(url, http_config)
72-
self._on_head: Set[HeadCallbackT] = set()
73-
self._on_operations: Set[OperationsCallbackT] = set()
74-
self._on_big_maps: Set[BigMapsCallbackT] = set()
75-
self._on_rollback: Set[RollbackCallbackT] = set()
73+
self._on_connected_callbacks: Set[EmptyCallbackT] = set()
74+
self._on_disconnected_callbacks: Set[EmptyCallbackT] = set()
75+
self._on_head_callbacks: Set[HeadCallbackT] = set()
76+
self._on_operations_callbacks: Set[OperationsCallbackT] = set()
77+
self._on_big_maps_callbacks: Set[BigMapsCallbackT] = set()
78+
self._on_rollback_callbacks: Set[RollbackCallbackT] = set()
7679
self._subscriptions: SubscriptionManager = SubscriptionManager(merge_subscriptions)
7780
self._subscriptions.add(HeadSubscription())
7881
self._network: Optional[str] = None
@@ -91,34 +94,48 @@ def network(self) -> str:
9194
async def subscribe(self) -> None:
9295
...
9396

94-
def on_head(self, fn: HeadCallbackT) -> None:
95-
self._on_head.add(fn)
97+
def call_on_head(self, fn: HeadCallbackT) -> None:
98+
self._on_head_callbacks.add(fn)
9699

97-
def on_operations(self, fn: OperationsCallbackT) -> None:
98-
self._on_operations.add(fn)
100+
def call_on_operations(self, fn: OperationsCallbackT) -> None:
101+
self._on_operations_callbacks.add(fn)
99102

100-
def on_big_maps(self, fn: BigMapsCallbackT) -> None:
101-
self._on_big_maps.add(fn)
103+
def call_on_big_maps(self, fn: BigMapsCallbackT) -> None:
104+
self._on_big_maps_callbacks.add(fn)
102105

103-
def on_rollback(self, fn: RollbackCallbackT) -> None:
104-
self._on_rollback.add(fn)
106+
def call_on_rollback(self, fn: RollbackCallbackT) -> None:
107+
self._on_rollback_callbacks.add(fn)
108+
109+
def call_on_connected(self, fn: EmptyCallbackT) -> None:
110+
self._on_connected_callbacks.add(fn)
111+
112+
def call_on_disconnected(self, fn: EmptyCallbackT) -> None:
113+
self._on_disconnected_callbacks.add(fn)
105114

106115
async def emit_head(self, head: HeadBlockData) -> None:
107-
for fn in self._on_head:
116+
for fn in self._on_head_callbacks:
108117
await fn(self, head)
109118

110119
async def emit_operations(self, operations: Tuple[OperationData, ...]) -> None:
111-
for fn in self._on_operations:
120+
for fn in self._on_operations_callbacks:
112121
await fn(self, operations)
113122

114123
async def emit_big_maps(self, big_maps: Tuple[BigMapData, ...]) -> None:
115-
for fn in self._on_big_maps:
124+
for fn in self._on_big_maps_callbacks:
116125
await fn(self, big_maps)
117126

118127
async def emit_rollback(self, type_: MessageType, from_level: int, to_level: int) -> None:
119-
for fn in self._on_rollback:
128+
for fn in self._on_rollback_callbacks:
120129
await fn(self, type_, from_level, to_level)
121130

131+
async def emit_connected(self) -> None:
132+
for fn in self._on_connected_callbacks:
133+
await fn()
134+
135+
async def emit_disconnected(self) -> None:
136+
for fn in self._on_disconnected_callbacks:
137+
await fn()
138+
122139
def set_network(self, network: str) -> None:
123140
if self._network:
124141
raise RuntimeError('Network is already set')

src/dipdup/datasources/tzkt/datasource.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ class TzktDatasource(IndexDatasource):
359359
_default_http_config = HTTPConfig(
360360
cache=True,
361361
retry_sleep=1,
362-
retry_multiplier=2,
362+
retry_multiplier=1.1,
363363
ratelimit_rate=100,
364364
ratelimit_period=1,
365365
connection_limit=25,
@@ -894,8 +894,8 @@ def _get_ws_client(self) -> SignalRClient:
894894
max_size=None,
895895
)
896896

897-
self._ws_client.on_open(self._on_connect)
898-
self._ws_client.on_close(self._on_disconnect)
897+
self._ws_client.on_open(self._on_connected)
898+
self._ws_client.on_close(self._on_disconnected)
899899
self._ws_client.on_error(self._on_error)
900900

901901
self._ws_client.on('operations', partial(self._on_message, MessageType.operation))
@@ -915,6 +915,7 @@ async def _wrapper():
915915
await ws.run()
916916
except WebsocketConnectionError as e:
917917
self._logger.error('Websocket connection error: %s', e)
918+
await self.emit_disconnected()
918919
await asyncio.sleep(retry_sleep)
919920
retry_sleep *= self._http_config.retry_multiplier
920921

@@ -925,13 +926,15 @@ async def _wrapper():
925926

926927
await gather(*tasks)
927928

928-
async def _on_connect(self) -> None:
929+
async def _on_connected(self) -> None:
929930
self._logger.info('Realtime connection established')
930931
# NOTE: Subscribing here will block WebSocket loop
932+
await self.emit_connected()
931933

932-
async def _on_disconnect(self) -> None:
934+
async def _on_disconnected(self) -> None:
933935
self._logger.info('Realtime connection lost')
934936
self._subscriptions.reset()
937+
await self.emit_disconnected()
935938

936939
async def _on_error(self, message: CompletionMessage) -> NoReturn:
937940
"""Raise exception from WS server's error message"""

src/dipdup/dipdup.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -172,15 +172,6 @@ async def _fetch_contracts(self) -> None:
172172
self._ctx.config.contracts[contract.name] = contract_config
173173
self._ctx.config.initialize(skip_imports=True)
174174

175-
async def _subscribe_to_datasource_events(self) -> None:
176-
for datasource in self._ctx.datasources.values():
177-
if not isinstance(datasource, IndexDatasource):
178-
continue
179-
datasource.on_head(self._on_head)
180-
datasource.on_operations(self._on_operations)
181-
datasource.on_big_maps(self._on_big_maps)
182-
datasource.on_rollback(self._on_rollback)
183-
184175
async def _load_index_states(self) -> None:
185176
if self._indexes:
186177
raise RuntimeError('Index states are already loaded')
@@ -224,6 +215,22 @@ async def _process(index_state: IndexState) -> None:
224215
tasks = (create_task(_process(index_state)) for index_state in await IndexState.all())
225216
await gather(*tasks)
226217

218+
async def _subscribe_to_datasource_events(self) -> None:
219+
for datasource in self._ctx.datasources.values():
220+
if not isinstance(datasource, IndexDatasource):
221+
continue
222+
datasource.call_on_disconnected(self._on_disconnected)
223+
datasource.call_on_head(self._on_head)
224+
datasource.call_on_operations(self._on_operations)
225+
datasource.call_on_big_maps(self._on_big_maps)
226+
datasource.call_on_rollback(self._on_rollback)
227+
228+
async def _on_disconnected(self) -> None:
229+
# NOTE: Invalidate realtime queues; sync level will be reset
230+
self._logger.info('Datasource disconnected, dropping realtime queues')
231+
for index in self._indexes.values():
232+
index._queue.clear()
233+
227234
async def _on_head(self, datasource: IndexDatasource, head: HeadBlockData) -> None:
228235
# NOTE: Do not await query results - blocked database connection may cause Websocket timeout.
229236
self._tasks.append(

tests/test_dipdup/test_datasources/test_tzkt.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ async def test_on_operation_message_data(self) -> None:
213213
message = {'type': 1, 'state': 2, 'data': operations_json}
214214
async with with_tzkt(1) as tzkt:
215215
emit_mock = AsyncMock()
216-
tzkt.on_operations(emit_mock)
216+
tzkt.call_on_operations(emit_mock)
217217
tzkt.set_sync_level(HeadSubscription(), 1)
218218

219219
level = tzkt.get_channel_level(MessageType.operation)

0 commit comments

Comments
 (0)