Skip to content

Commit 76061d5

Browse files
Fix TzKT realtime message processing (#304)
* TzKT fixes * Awaits * Stupid test * Review * Changelog * Changelog * Fix buffer
1 parent f374715 commit 76061d5

File tree

3 files changed

+91
-36
lines changed

3 files changed

+91
-36
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
* cli: Fixed `schema init` command crash with SQLite databases.
88
* index: Fixed spawning datasources in oneshot mode.
9+
* tzkt: Fixed processing realtime messages.
910

1011
## 5.0.0 - 2022-04-08
1112

src/dipdup/datasources/tzkt/datasource.py

Lines changed: 64 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from typing import DefaultDict
1818
from typing import Deque
1919
from typing import Dict
20+
from typing import Generator
2021
from typing import List
2122
from typing import NoReturn
2223
from typing import Optional
@@ -293,6 +294,21 @@ def __init__(
293294
def request_limit(self) -> int:
294295
return cast(int, self._http_config.batch_size)
295296

297+
def get_channel_level(self, message_type: MessageType) -> int:
298+
"""Get current level of the channel, or sync level is no messages were received yet."""
299+
channel_level = self._level[message_type]
300+
if channel_level is None:
301+
# NOTE: If no data messages were received since run, use sync level instead
302+
# NOTE: There's only one sync level for all channels, otherwise `Index.process` would fail
303+
channel_level = self.get_sync_level(HeadSubscription())
304+
if channel_level is None:
305+
raise RuntimeError('Neither current nor sync level is known')
306+
307+
return channel_level
308+
309+
def _set_channel_level(self, message_type: MessageType, level: int) -> None:
310+
self._level[message_type] = level
311+
296312
async def get_similar_contracts(
297313
self,
298314
address: str,
@@ -795,66 +811,78 @@ async def _extract_message_data(self, type_: MessageType, message: List[Any]) ->
795811
if tzkt_type == TzktMessageType.STATE:
796812
continue
797813

798-
message_level, current_level = item['state'], self._level[type_]
799-
self._level[type_] = message_level
814+
message_level = item['state']
815+
channel_level = self.get_channel_level(type_)
816+
self._set_channel_level(type_, message_level)
817+
800818
self._logger.info(
801819
'Realtime message received: %s, %s, %s -> %s',
802820
type_.value,
803821
tzkt_type.name,
804-
current_level,
822+
channel_level,
805823
message_level,
806824
)
807825

808826
# NOTE: Put data messages to buffer by level
809827
if tzkt_type == TzktMessageType.DATA:
810-
self._buffer[message_level].append((type_, item['data']))
828+
await self._process_data_message(type_, message_level, item['data'])
811829

812830
# NOTE: Try to process rollback automatically, emit if failed
813831
elif tzkt_type == TzktMessageType.REORG:
814-
# NOTE: operation/big_map channels have their own levels
815-
if type_ == MessageType.head:
816-
return
817-
818-
# NOTE: If no data messages were received since run, use sync level instead
819-
if current_level is None:
820-
# NOTE: There's only one sync level for all channels, otherwise `Index.process` would fail
821-
current_level = self.get_sync_level(HeadSubscription())
822-
if not current_level:
823-
raise RuntimeError('Reorg message received, but neither current nor sync level is known')
824-
825-
# NOTE: This rollback does not affect us, so we can safely ignore it
826-
if current_level <= message_level:
827-
return
828-
829-
self._logger.info('Rollback requested from %s to %s', current_level, message_level)
830-
831-
# NOTE: Drop buffered messages in reversed order while possible
832-
rolled_back_levels = range(current_level, message_level, -1)
833-
for rolled_back_level in rolled_back_levels:
834-
if self._buffer.pop(rolled_back_level, None):
835-
self._logger.info('Level %s is buffered', rolled_back_level)
836-
else:
837-
self._logger.info('Level %s is not buffered, emitting rollback', rolled_back_level)
838-
await self.emit_rollback(current_level, message_level)
839-
return
840-
841-
self._logger.info('Rollback is not required, continuing')
832+
await self._process_reorg_message(type_, channel_level, message_level)
842833

843834
else:
844-
raise NotImplementedError
835+
raise NotImplementedError('Unknown message type')
845836

846837
# NOTE: Yield extensive data from buffer
838+
for item in self._yield_from_buffer(type_):
839+
yield item
840+
841+
def _yield_from_buffer(self, type_: MessageType) -> Generator[Dict, None, None]:
847842
buffered_levels = sorted(self._buffer.keys())
848-
emitted_levels = buffered_levels[: len(buffered_levels) - self._buffer_size]
843+
if len(buffered_levels) < self._buffer_size:
844+
return
849845

850-
for level in emitted_levels:
846+
yielded_levels = buffered_levels[: len(buffered_levels) - self._buffer_size]
847+
for level in yielded_levels:
851848
for idx, level_data in enumerate(self._buffer[level]):
852849
level_message_type, level_message = level_data
853850
if level_message_type == type_:
854851
yield level_message
855852
self._buffer[level].pop(idx)
853+
856854
if not self._buffer[level]:
857-
self._buffer.pop(level)
855+
del self._buffer[level]
856+
857+
async def _process_data_message(self, type_: MessageType, message_level: int, message_data: Dict[str, Any]) -> None:
858+
self._buffer[message_level].append((type_, message_data))
859+
860+
async def _process_reorg_message(self, type_: MessageType, channel_level: int, message_level: int) -> None:
861+
# NOTE: No action required for this channel
862+
if type_ == MessageType.head:
863+
return
864+
865+
# NOTE: This rollback does not affect us, so we can safely ignore it
866+
if channel_level <= message_level:
867+
return
868+
869+
self._logger.info('Rollback requested from %s to %s', channel_level, message_level)
870+
871+
# NOTE: Drop buffered messages in reversed order while possible
872+
rolled_back_levels = range(channel_level, message_level, -1)
873+
for rolled_back_level in rolled_back_levels:
874+
if self._buffer.pop(rolled_back_level, None):
875+
self._logger.info('Level %s is buffered', rolled_back_level)
876+
else:
877+
self._logger.info(
878+
'Level %s is not buffered, emitting rollback to %s',
879+
rolled_back_level,
880+
message_level,
881+
)
882+
await self.emit_rollback(channel_level, message_level)
883+
return
884+
else:
885+
self._logger.info('Rollback is not required, continuing')
858886

859887
async def _on_operations_message(self, message: List[Dict[str, Any]]) -> None:
860888
"""Parse and emit raw operations from WS"""

tests/test_dipdup/test_datasources/test_tzkt/test_datasource.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
1+
import json
12
from contextlib import asynccontextmanager
3+
from os.path import dirname
4+
from os.path import join
25
from typing import AsyncIterator
36
from typing import Tuple
47
from typing import TypeVar
58
from unittest import IsolatedAsyncioTestCase
9+
from unittest.mock import AsyncMock
610

711
from dipdup.config import HTTPConfig
12+
from dipdup.datasources.subscription import HeadSubscription
813
from dipdup.datasources.tzkt.datasource import TzktDatasource
14+
from dipdup.enums import MessageType
15+
from dipdup.models import OperationData
916

1017

1118
@asynccontextmanager
@@ -192,3 +199,22 @@ async def test_iter_migration_originations(self) -> None:
192199
originations = await take_two(tzkt.iter_migration_originations())
193200
self.assertEqual(67955553, originations[0].id)
194201
self.assertEqual(67955554, originations[1].id)
202+
203+
async def test_on_operation_message_data(self) -> None:
204+
with open(join(dirname(__file__), '..', '..', 'ftzfun.json')) as f:
205+
operations_json = json.load(f)
206+
207+
message = {'type': 1, 'state': 2, 'data': operations_json}
208+
async with with_tzkt(1) as tzkt:
209+
emit_mock = AsyncMock()
210+
tzkt.on_operations(emit_mock)
211+
tzkt.set_sync_level(HeadSubscription(), 1)
212+
213+
level = tzkt.get_channel_level(MessageType.operation)
214+
self.assertEqual(1, level)
215+
216+
await tzkt._on_operations_message([message])
217+
218+
level = tzkt.get_channel_level(MessageType.operation)
219+
self.assertEqual(2, level)
220+
self.assertIsInstance(emit_mock.await_args_list[0][0][1][0], OperationData)

0 commit comments

Comments
 (0)