Skip to content

Commit 4bf3140

Browse files
Ignore indexes with different message types on rollback (#343)
1 parent 02896c0 commit 4bf3140

File tree

6 files changed

+53
-10
lines changed

6 files changed

+53
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### Fixed
66

7+
* index: Ignore indexes with different message types on rollback.
78
* metadata: Add `ithacanet` to available networks.
89

910
## 5.1.0 - 2022-05-12

src/dipdup/datasources/datasource.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from dipdup.datasources.subscription import HeadSubscription
1313
from dipdup.datasources.subscription import Subscription
1414
from dipdup.datasources.subscription import SubscriptionManager
15+
from dipdup.enums import MessageType
1516
from dipdup.http import HTTPGateway
1617
from dipdup.models import BigMapData
1718
from dipdup.models import HeadBlockData
@@ -24,7 +25,7 @@
2425
HeadCallbackT = Callable[['IndexDatasource', HeadBlockData], Awaitable[None]]
2526
OperationsCallbackT = Callable[['IndexDatasource', Tuple[OperationData, ...]], Awaitable[None]]
2627
BigMapsCallbackT = Callable[['IndexDatasource', Tuple[BigMapData, ...]], Awaitable[None]]
27-
RollbackCallbackT = Callable[['IndexDatasource', int, int], Awaitable[None]]
28+
RollbackCallbackT = Callable[['IndexDatasource', MessageType, int, int], Awaitable[None]]
2829

2930

3031
class Datasource(HTTPGateway):
@@ -114,9 +115,9 @@ async def emit_big_maps(self, big_maps: Tuple[BigMapData, ...]) -> None:
114115
for fn in self._on_big_maps:
115116
await fn(self, big_maps)
116117

117-
async def emit_rollback(self, from_level: int, to_level: int) -> None:
118+
async def emit_rollback(self, type_: MessageType, from_level: int, to_level: int) -> None:
118119
for fn in self._on_rollback:
119-
await fn(self, from_level, to_level)
120+
await fn(self, type_, from_level, to_level)
120121

121122
def set_network(self, network: str) -> None:
122123
if self._network:

src/dipdup/datasources/tzkt/datasource.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -967,7 +967,7 @@ async def _on_message(self, type_: MessageType, message: List[Dict[str, Any]]) -
967967
# NOTE: Try to process rollback automatically, emit if failed
968968
elif tzkt_type == TzktMessageType.REORG:
969969
if not self._buffer.rollback(type_, channel_level, message_level):
970-
await self.emit_rollback(channel_level, message_level)
970+
await self.emit_rollback(type_, channel_level, message_level)
971971

972972
else:
973973
raise NotImplementedError(f'Unknown message type: {tzkt_type}')

src/dipdup/dipdup.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from dipdup.datasources.datasource import IndexDatasource
3838
from dipdup.datasources.factory import DatasourceFactory
3939
from dipdup.datasources.tzkt.datasource import TzktDatasource
40+
from dipdup.enums import MessageType
4041
from dipdup.enums import ReindexingReason
4142
from dipdup.exceptions import ConfigInitializationException
4243
from dipdup.exceptions import ConflictingHooksError
@@ -269,12 +270,13 @@ async def _on_big_maps(self, datasource: TzktDatasource, big_maps: Tuple[BigMapD
269270
for index in big_map_indexes:
270271
index.push_big_maps(big_maps)
271272

272-
async def _on_rollback(self, datasource: TzktDatasource, from_level: int, to_level: int) -> None:
273+
async def _on_rollback(self, datasource: TzktDatasource, type_: MessageType, from_level: int, to_level: int) -> None:
273274
"""Perform a single level rollback when possible, otherwise call `on_rollback` hook"""
274275
if from_level <= to_level:
275276
raise RuntimeError(f'Attempt to rollback forward: {from_level} -> {to_level}')
276277

277-
self._logger.warning('Datasource `%s` has rolled back: %s -> %s', datasource.name, from_level, to_level)
278+
channel = f'{datasource.name}:{type_.value}'
279+
self._logger.info('Channel `%s` has rolled back: %s -> %s', channel, from_level, to_level)
278280
if Metrics.enabled:
279281
Metrics.set_datasource_rollback(datasource.name)
280282

@@ -286,12 +288,16 @@ async def _on_rollback(self, datasource: TzktDatasource, from_level: int, to_lev
286288
for index_name, index in self._indexes.items():
287289
index_level = index.state.level
288290

289-
if index.datasource != datasource:
290-
self._logger.debug('%s: another datasource, skipping', index_name)
291+
if index.message_type != type_:
292+
self._logger.debug('%s: different channel, skipping', index_name)
293+
ignored_indexes.add(index_name)
294+
295+
elif index.datasource != datasource:
296+
self._logger.debug('%s: different datasource, skipping', index_name)
291297
ignored_indexes.add(index_name)
292298

293299
elif to_level >= index_level:
294-
self._logger.debug('%s: not affected, skipping', index_name)
300+
self._logger.debug('%s: level is too low, skipping', index_name)
295301
ignored_indexes.add(index_name)
296302

297303
elif from_level - to_level == 1:
@@ -321,7 +327,7 @@ async def _on_rollback(self, datasource: TzktDatasource, from_level: int, to_lev
321327
index.push_rollback(from_level)
322328

323329
if not unprocessed_indexes:
324-
self._logger.info('Rollback complete')
330+
self._logger.info('`%s` rollback complete', channel)
325331
return
326332

327333
if self._index_rollback:

src/dipdup/index.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from dipdup.datasources.tzkt.datasource import TokenTransferFetcher
4444
from dipdup.datasources.tzkt.datasource import TzktDatasource
4545
from dipdup.datasources.tzkt.models import deserialize_storage
46+
from dipdup.enums import MessageType
4647
from dipdup.exceptions import ConfigInitializationException
4748
from dipdup.exceptions import ConfigurationError
4849
from dipdup.exceptions import InvalidDataError
@@ -138,6 +139,7 @@ class Index:
138139
Provides common interface for managing index state and switching between sync and realtime modes.
139140
"""
140141

142+
message_type: MessageType
141143
_queue: Deque
142144

143145
def __init__(self, ctx: DipDupContext, config: ResolvedIndexConfigT, datasource: TzktDatasource) -> None:
@@ -272,6 +274,7 @@ def _extract_level(
272274

273275

274276
class OperationIndex(Index):
277+
message_type = MessageType.operation
275278
_config: OperationIndexConfig
276279

277280
def __init__(self, ctx: DipDupContext, config: OperationIndexConfig, datasource: TzktDatasource) -> None:
@@ -618,6 +621,7 @@ async def _get_contract_hashes(self, address: str) -> Tuple[int, int]:
618621

619622

620623
class BigMapIndex(Index):
624+
message_type = MessageType.big_map
621625
_config: BigMapIndexConfig
622626

623627
def __init__(self, ctx: DipDupContext, config: BigMapIndexConfig, datasource: TzktDatasource) -> None:
@@ -842,6 +846,7 @@ def _get_big_map_pairs(self) -> Set[Tuple[str, str]]:
842846

843847

844848
class HeadIndex(Index):
849+
message_type: MessageType = MessageType.head
845850
_config: HeadIndexConfig
846851

847852
def __init__(self, ctx: DipDupContext, config: HeadIndexConfig, datasource: TzktDatasource) -> None:
@@ -886,6 +891,7 @@ def push_head(self, head: HeadBlockData) -> None:
886891

887892

888893
class TokenTransferIndex(Index):
894+
message_type = MessageType.token_transfer
889895
_config: TokenTransferIndexConfig
890896

891897
def __init__(self, ctx: DipDupContext, config: TokenTransferIndexConfig, datasource: TzktDatasource) -> None:

tests/test_dipdup/test_rollback.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from dipdup.context import DipDupContext
1616
from dipdup.datasources.tzkt.datasource import TzktDatasource
1717
from dipdup.dipdup import IndexDispatcher
18+
from dipdup.enums import MessageType
1819
from dipdup.enums import ReindexingReason
1920
from dipdup.index import BigMapIndex
2021
from dipdup.index import HeadIndex
@@ -133,6 +134,7 @@ async def test_forward_rollback(self) -> None:
133134
with self.assertRaises(RuntimeError):
134135
await dispatcher._on_rollback(
135136
datasource=Mock(spec=TzktDatasource),
137+
type_=MessageType.operation,
136138
from_level=from_level,
137139
to_level=to_level,
138140
)
@@ -146,6 +148,7 @@ async def test_not_affected_by_level(self) -> None:
146148
}
147149
await dispatcher._on_rollback(
148150
datasource=operation_index.datasource,
151+
type_=MessageType.operation,
149152
from_level=from_level,
150153
to_level=to_level,
151154
)
@@ -163,6 +166,24 @@ async def test_not_affected_by_datasource(self) -> None:
163166
}
164167
await dispatcher._on_rollback(
165168
datasource=other_datasource,
169+
type_=MessageType.operation,
170+
from_level=from_level,
171+
to_level=to_level,
172+
)
173+
self.assertIsNone(operation_index._next_head_level)
174+
dispatcher._ctx.fire_hook.assert_not_awaited() # type: ignore
175+
self.assertEqual(0, len(operation_index._queue))
176+
177+
async def test_not_affected_by_type(self) -> None:
178+
index_level, from_level, to_level = 20, 20, 15
179+
dispatcher = _get_index_dispatcher()
180+
operation_index = _get_operation_index(level=index_level)
181+
dispatcher._indexes = {
182+
'operation': operation_index,
183+
}
184+
await dispatcher._on_rollback(
185+
datasource=operation_index.datasource,
186+
type_=MessageType.head,
166187
from_level=from_level,
167188
to_level=to_level,
168189
)
@@ -179,6 +200,7 @@ async def test_unprocessed_head(self) -> None:
179200
}
180201
await dispatcher._on_rollback(
181202
datasource=head_index.datasource,
203+
type_=MessageType.head,
182204
from_level=from_level,
183205
to_level=to_level,
184206
)
@@ -199,6 +221,7 @@ async def test_unprocessed(self) -> None:
199221
}
200222
await dispatcher._on_rollback(
201223
datasource=operation_index.datasource,
224+
type_=MessageType.operation,
202225
from_level=from_level,
203226
to_level=to_level,
204227
)
@@ -219,6 +242,7 @@ async def test_single_level_supported(self) -> None:
219242
}
220243
await dispatcher._on_rollback(
221244
datasource=operation_index.datasource,
245+
type_=MessageType.operation,
222246
from_level=from_level,
223247
to_level=to_level,
224248
)
@@ -235,6 +259,7 @@ async def test_single_level_not_supported(self) -> None:
235259
}
236260
await dispatcher._on_rollback(
237261
datasource=big_map_index.datasource,
262+
type_=MessageType.big_map,
238263
from_level=from_level,
239264
to_level=to_level,
240265
)
@@ -266,6 +291,7 @@ async def test_single_level_new_head_equal(self) -> None:
266291

267292
await dispatcher._on_rollback(
268293
datasource=operation_index.datasource,
294+
type_=MessageType.operation,
269295
from_level=from_level,
270296
to_level=to_level,
271297
)
@@ -307,6 +333,7 @@ async def test_single_level_new_head_less(self) -> None:
307333

308334
await dispatcher._on_rollback(
309335
datasource=operation_index.datasource,
336+
type_=MessageType.operation,
310337
from_level=from_level,
311338
to_level=to_level,
312339
)
@@ -354,6 +381,7 @@ async def test_single_level_new_head_less_not_matched(self) -> None:
354381

355382
await dispatcher._on_rollback(
356383
datasource=operation_index.datasource,
384+
type_=MessageType.operation,
357385
from_level=from_level,
358386
to_level=to_level,
359387
)
@@ -394,6 +422,7 @@ async def test_single_level_new_head_more(self) -> None:
394422

395423
await dispatcher._on_rollback(
396424
datasource=operation_index.datasource,
425+
type_=MessageType.operation,
397426
from_level=from_level,
398427
to_level=to_level,
399428
)

0 commit comments

Comments
 (0)