Skip to content

Commit ce1a834

Browse files
Backport: Fix issue with processing rollbacks while sync is in progress (#832)
1 parent 4789ab9 commit ce1a834

File tree

10 files changed

+82
-38
lines changed

10 files changed

+82
-38
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
44

55
The format is based on [Keep a Changelog], and this project adheres to [Semantic Versioning].
66

7+
## [Unreleased]
8+
9+
### Fixed
10+
11+
- tzkt: Fixed issue with processing rollbacks while sync is in progress.
12+
713
## [6.5.11] - 2023-09-02
814

915
### Fixed

src/dipdup/datasources/tzkt/datasource.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from dipdup.models import OperationData
4444
from dipdup.models import QuoteData
4545
from dipdup.models import TokenTransferData
46+
from dipdup.models import TzktRollbackMessage
4647
from dipdup.utils import FormattedLogger
4748
from dipdup.utils import split_by_chunks
4849

@@ -119,7 +120,7 @@ class TzktMessageType(Enum):
119120
REORG = 2
120121

121122

122-
MessageData = dict[str, Any] | list[dict[str, Any]]
123+
MessageData = dict[str, Any] | list[dict[str, Any]] | TzktRollbackMessage
123124

124125

125126
class BufferedMessage(NamedTuple):

src/dipdup/dipdup.py

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
from dipdup.models import OperationData
5656
from dipdup.models import Schema
5757
from dipdup.models import TokenTransferData
58+
from dipdup.models import TzktRollbackMessage
5859
from dipdup.prometheus import Metrics
5960
from dipdup.scheduler import SchedulerManager
6061
from dipdup.transactions import TransactionManager
@@ -300,7 +301,11 @@ async def _on_events(self, datasource: IndexDatasource, events: tuple[EventData,
300301
index.push_events(events)
301302

302303
async def _on_rollback(
303-
self, datasource: IndexDatasource, type_: MessageType, from_level: int, to_level: int
304+
self,
305+
datasource: IndexDatasource,
306+
type_: MessageType,
307+
from_level: int,
308+
to_level: int,
304309
) -> None:
305310
"""Call `on_index_rollback` hook for each index that is affected by rollback"""
306311
if from_level <= to_level:
@@ -312,8 +317,6 @@ async def _on_rollback(
312317
Metrics.set_datasource_rollback(datasource.name)
313318

314319
# NOTE: Choose action for each index
315-
affected_indexes: set[str] = set()
316-
317320
for index_name, index in self._indexes.items():
318321
index_level = index.state.level
319322

@@ -328,19 +331,11 @@ async def _on_rollback(
328331

329332
else:
330333
self._logger.debug('%s: affected', index_name)
331-
affected_indexes.add(index_name)
332-
333-
hook_name = 'on_index_rollback'
334-
for index_name in affected_indexes:
335-
self._logger.warning('`%s` index is affected by rollback; firing `%s` hook', index_name, hook_name)
336-
await self._ctx.fire_hook(
337-
hook_name,
338-
index=self._indexes[index_name],
339-
from_level=from_level,
340-
to_level=to_level,
341-
)
334+
index.push_realtime_message(
335+
TzktRollbackMessage(from_level, to_level),
336+
)
342337

343-
self._logger.info('`%s` rollback complete', channel)
338+
self._logger.info('`%s` rollback processed', channel)
344339

345340

346341
class DipDup:

src/dipdup/index.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,3 +196,18 @@ async def _update_state(
196196
state.status = status or state.status
197197
state.level = level or state.level
198198
await state.save()
199+
200+
# TODO: Move to TezosTzktIndex
201+
async def _tzkt_rollback(
202+
self,
203+
from_level: int,
204+
to_level: int,
205+
) -> None:
206+
hook_name = 'on_index_rollback'
207+
self._logger.warning('Affected by rollback; firing `%s` hook', self.name, hook_name)
208+
await self._ctx.fire_hook(
209+
name=hook_name,
210+
index=self,
211+
from_level=from_level,
212+
to_level=to_level,
213+
)

src/dipdup/indexes/big_map/index.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717
from dipdup.models import BigMapAction
1818
from dipdup.models import BigMapData
1919
from dipdup.models import BigMapDiff
20+
from dipdup.models import TzktRollbackMessage
2021
from dipdup.prometheus import Metrics
2122

22-
BigMapQueueItem = tuple[BigMapData, ...]
23+
BigMapQueueItem = tuple[BigMapData, ...] | TzktRollbackMessage
2324

2425

2526
class BigMapIndex(
@@ -35,16 +36,20 @@ async def _process_queue(self) -> None:
3536
if self._queue:
3637
self._logger.debug('Processing websocket queue')
3738
while self._queue:
38-
big_maps = self._queue.popleft()
39-
message_level = big_maps[0].level
39+
message = self._queue.popleft()
40+
if isinstance(message, TzktRollbackMessage):
41+
await self._tzkt_rollback(message.from_level, message.to_level)
42+
continue
43+
44+
message_level = message[0].level
4045
if message_level <= self.state.level:
4146
self._logger.debug('Skipping outdated message: %s <= %s', message_level, self.state.level)
4247
continue
4348

4449
with ExitStack() as stack:
4550
if Metrics.enabled:
4651
stack.enter_context(Metrics.measure_level_realtime_duration())
47-
await self._process_level_big_maps(big_maps, message_level)
52+
await self._process_level_big_maps(message, message_level)
4853

4954
async def _synchronize(self, sync_level: int) -> None:
5055
"""Fetch operations via Fetcher and pass to message callback"""

src/dipdup/indexes/event/index.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313
from dipdup.indexes.event.matcher import match_events
1414
from dipdup.models import Event
1515
from dipdup.models import EventData
16+
from dipdup.models import TzktRollbackMessage
1617
from dipdup.models import UnknownEvent
1718
from dipdup.prometheus import Metrics
1819

19-
EventQueueItem = tuple[EventData, ...]
20+
EventQueueItem = tuple[EventData, ...] | TzktRollbackMessage
2021

2122

2223
class EventIndex(
@@ -31,16 +32,20 @@ async def _process_queue(self) -> None:
3132
if self._queue:
3233
self._logger.debug('Processing websocket queue')
3334
while self._queue:
34-
events = self._queue.popleft()
35-
message_level = events[0].level
35+
message = self._queue.popleft()
36+
if isinstance(message, TzktRollbackMessage):
37+
await self._tzkt_rollback(message.from_level, message.to_level)
38+
continue
39+
40+
message_level = message[0].level
3641
if message_level <= self.state.level:
3742
self._logger.debug('Skipping outdated message: %s <= %s', message_level, self.state.level)
3843
continue
3944

4045
with ExitStack() as stack:
4146
if Metrics.enabled:
4247
stack.enter_context(Metrics.measure_level_realtime_duration())
43-
await self._process_level_events(events, message_level)
48+
await self._process_level_events(message, message_level)
4449

4550
def _create_fetcher(self, first_level: int, last_level: int) -> EventFetcher:
4651
event_addresses = self._get_event_addresses()

src/dipdup/indexes/head/index.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@
77
from dipdup.exceptions import FrameworkException
88
from dipdup.index import Index
99
from dipdup.models import HeadBlockData
10+
from dipdup.models import TzktRollbackMessage
1011

11-
HeadQueueItem = HeadBlockData
12+
HeadQueueItem = HeadBlockData | TzktRollbackMessage
1213

1314

1415
class HeadIndex(
@@ -24,23 +25,27 @@ async def _synchronize(self, sync_level: int) -> None:
2425

2526
async def _process_queue(self) -> None:
2627
while self._queue:
27-
head = self._queue.popleft()
28-
message_level = head.level
28+
message = self._queue.popleft()
29+
if isinstance(message, TzktRollbackMessage):
30+
await self._tzkt_rollback(message.from_level, message.to_level)
31+
continue
32+
33+
message_level = message.level
2934
if message_level <= self.state.level:
3035
self._logger.debug('Skipping outdated message: %s <= %s', message_level, self.state.level)
3136
continue
3237

3338
self._logger.debug('Processing head realtime message, %s left in queue', len(self._queue))
3439

35-
batch_level = head.level
40+
batch_level = message.level
3641
index_level = self.state.level
3742
if batch_level <= index_level:
3843
raise FrameworkException(f'Batch level is lower than index level: {batch_level} <= {index_level}')
3944

4045
async with self._ctx._transactions.in_transaction(batch_level, message_level, self.name):
4146
self._logger.debug('Processing head info of level %s', batch_level)
4247
for handler_config in self._config.handlers:
43-
await self._call_matched_handler(handler_config, head)
48+
await self._call_matched_handler(handler_config, message)
4449
await self._update_state(level=batch_level)
4550

4651
async def _call_matched_handler(self, handler_config: HeadHandlerConfig, head: HeadBlockData) -> None:

src/dipdup/indexes/operation/index.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@
2727
from dipdup.indexes.operation.matcher import match_operation_subgroup
2828
from dipdup.indexes.operation.matcher import match_operation_unfiltered_subgroup
2929
from dipdup.models import OperationData
30+
from dipdup.models import TzktRollbackMessage
3031
from dipdup.prometheus import Metrics
3132

3233
_logger = logging.getLogger('dipdup.matcher')
3334

34-
OperationQueueItem = tuple[OperationSubgroup, ...]
35+
OperationQueueItem = tuple[OperationSubgroup, ...] | TzktRollbackMessage
3536

3637

3738
def entrypoint_filter(handlers: tuple[OperationHandlerConfig, ...]) -> set[str | None]:
@@ -190,12 +191,12 @@ async def _process_queue(self) -> None:
190191

191192
while self._queue:
192193
message = self._queue.popleft()
193-
messages_left = len(self._queue)
194-
195-
if not message:
196-
raise FrameworkException('Got empty message from realtime queue')
194+
if isinstance(message, TzktRollbackMessage):
195+
await self._tzkt_rollback(message.from_level, message.to_level)
196+
continue
197197

198198
if Metrics.enabled:
199+
messages_left = len(self._queue)
199200
Metrics.set_levels_to_realtime(self._config.name, messages_left)
200201

201202
message_level = message[0].operations[0].level

src/dipdup/indexes/token_transfer/index.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@
1010
from dipdup.indexes.token_transfer.fetcher import TokenTransferFetcher
1111
from dipdup.indexes.token_transfer.matcher import match_token_transfers
1212
from dipdup.models import TokenTransferData
13+
from dipdup.models import TzktRollbackMessage
1314
from dipdup.prometheus import Metrics
1415

15-
TokenTransferQueueItem = tuple[TokenTransferData, ...]
16+
TokenTransferQueueItem = tuple[TokenTransferData, ...] | TzktRollbackMessage
1617

1718

1819
class TokenTransferIndex(
@@ -115,13 +116,17 @@ async def _process_queue(self) -> None:
115116
if self._queue:
116117
self._logger.debug('Processing websocket queue')
117118
while self._queue:
118-
token_transfers = self._queue.popleft()
119-
message_level = token_transfers[0].level
119+
message = self._queue.popleft()
120+
if isinstance(message, TzktRollbackMessage):
121+
await self._tzkt_rollback(message.from_level, message.to_level)
122+
continue
123+
124+
message_level = message[0].level
120125
if message_level <= self.state.level:
121126
self._logger.debug('Skipping outdated message: %s <= %s', message_level, self.state.level)
122127
continue
123128

124129
with ExitStack() as stack:
125130
if Metrics.enabled:
126131
stack.enter_context(Metrics.measure_level_realtime_duration())
127-
await self._process_level_token_transfers(token_transfers, message_level)
132+
await self._process_level_token_transfers(message, message_level)

src/dipdup/models.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,12 @@ class UnknownEvent:
264264
payload: Any | None
265265

266266

267+
@dataclass(frozen=True)
268+
class TzktRollbackMessage:
269+
from_level: int
270+
to_level: int
271+
272+
267273
# ===> Model Versioning
268274

269275

0 commit comments

Comments
 (0)