Skip to content

Commit 17caca5

Browse files
Ability to process realtime messages with lag (#275)
* WIP * wip * Bump hasura, POC buffer * Fix config validators * Fix emitted levels * Better logging * Changelog * Docs
1 parent 76266c9 commit 17caca5

File tree

4 files changed

+56
-10
lines changed

4 files changed

+56
-10
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## [unreleased]
4+
5+
### Added
6+
7+
* tzkt: Ability to process realtime messages with lag.
8+
39
## 4.2.7 - 2022-03-02
410

511
### Fixed

src/dipdup/config.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,11 +211,13 @@ class TzktDatasourceConfig(NameMixin):
211211
:param kind: always 'tzkt'
212212
:param url: Base API URL, e.g. https://api.tzkt.io/
213213
:param http: HTTP client configuration
214+
:param buffer_size: Number of levels to keep in FIFO buffer before processing
214215
"""
215216

216217
kind: Literal['tzkt']
217218
url: str
218219
http: Optional[HTTPConfig] = None
220+
buffer_size: int = 0
219221

220222
def __hash__(self) -> int:
221223
return hash(self.kind + self.url)
@@ -737,8 +739,10 @@ def hash(self) -> str:
737739
# FIXME: How to convert pydantic dataclass into dict without json.dumps? asdict is not recursive.
738740
config_dict = json.loads(config_json)
739741

740-
# NOTE: We need to preserve datasource URL but remove it's HTTP tunables to avoid false-positives.
742+
# NOTE: We need to preserve datasource URL but remove its HTTP tunables to avoid false-positives.
741743
config_dict['datasource'].pop('http', None)
744+
# NOTE: TzKT tunable
745+
config_dict['datasource'].pop('buffer_size', None)
742746
# NOTE: Same for BigMapIndex tunables
743747
config_dict.pop('skip_history', None)
744748

src/dipdup/datasources/factory.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def _build_datasource(cls, name: str, config: DipDupConfig) -> Datasource:
3232
url=datasource_config.url,
3333
http_config=datasource_config.http,
3434
merge_subscriptions=config.advanced.merge_subscriptions,
35+
buffer_size=datasource_config.buffer_size,
3536
)
3637

3738
if isinstance(datasource_config, CoinbaseDatasourceConfig):

src/dipdup/datasources/tzkt/datasource.py

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ def __init__(
274274
http_config: Optional[HTTPConfig] = None,
275275
watchdog: Optional[Watchdog] = None,
276276
merge_subscriptions: bool = False,
277+
buffer_size: int = 0,
277278
) -> None:
278279
super().__init__(
279280
url=url,
@@ -282,6 +283,8 @@ def __init__(
282283
)
283284
self._logger = logging.getLogger('dipdup.tzkt')
284285
self._watchdog = watchdog
286+
self._buffer_size = buffer_size
287+
self._buffer: DefaultDict[int, List[Tuple[MessageType, Dict[str, Any]]]] = defaultdict(list)
285288

286289
self._ws_client: Optional[SignalRClient] = None
287290
self._level: DefaultDict[MessageType, Optional[int]] = defaultdict(lambda: None)
@@ -785,20 +788,28 @@ async def _on_error(self, message: CompletionMessage) -> NoReturn:
785788

786789
async def _extract_message_data(self, type_: MessageType, message: List[Any]) -> AsyncGenerator[Dict, None]:
787790
"""Parse message received from Websocket, ensure it's correct in the current context and yield data."""
791+
# NOTE: Parse messages and either buffer or yield data
788792
for item in message:
789793
tzkt_type = TzktMessageType(item['type'])
794+
# NOTE: Legacy, sync level returned by TzKT during negotiation
790795
if tzkt_type == TzktMessageType.STATE:
791796
continue
792797

793-
level, current_level = item['state'], self._level[type_]
794-
self._level[type_] = level
795-
self._logger.info('Realtime message received: %s, %s, %s -> %s', type_.value, tzkt_type.name, current_level, level)
798+
message_level, current_level = item['state'], self._level[type_]
799+
self._level[type_] = message_level
800+
self._logger.info(
801+
'Realtime message received: %s, %s, %s -> %s',
802+
type_.value,
803+
tzkt_type.name,
804+
current_level,
805+
message_level,
806+
)
796807

797-
# NOTE: Just yield data
808+
# NOTE: Put data messages to buffer by level
798809
if tzkt_type == TzktMessageType.DATA:
799-
yield item['data']
810+
self._buffer[message_level].append((type_, item['data']))
800811

801-
# NOTE: Emit rollback, but not on `head` message
812+
# NOTE: Try to process rollback automatically, emit if failed
802813
elif tzkt_type == TzktMessageType.REORG:
803814
# NOTE: operation/big_map channels have their own levels
804815
if type_ == MessageType.head:
@@ -812,15 +823,39 @@ async def _extract_message_data(self, type_: MessageType, message: List[Any]) ->
812823
raise RuntimeError('Reorg message received, but neither current nor sync level is known')
813824

814825
# NOTE: This rollback does not affect us, so we can safely ignore it
815-
if current_level <= level:
826+
if current_level <= message_level:
816827
return
817828

818-
self._logger.info('Emitting rollback from %s to %s', current_level, level)
819-
await self.emit_rollback(current_level, level)
829+
self._logger.info('Rollback requested from %s to %s', current_level, message_level)
830+
831+
# NOTE: Drop buffered messages in reversed order while possible
832+
rolled_back_levels = range(current_level, message_level, -1)
833+
for rolled_back_level in rolled_back_levels:
834+
if self._buffer.pop(rolled_back_level, None):
835+
self._logger.info('Level %s is buffered', rolled_back_level)
836+
else:
837+
self._logger.info('Level %s is not buffered, emitting rollback', rolled_back_level)
838+
await self.emit_rollback(current_level, message_level)
839+
return
840+
841+
self._logger.info('Rollback is not required, continuing')
820842

821843
else:
822844
raise NotImplementedError
823845

846+
# NOTE: Yield extensive data from buffer
847+
buffered_levels = sorted(self._buffer.keys())
848+
emitted_levels = buffered_levels[: len(buffered_levels) - self._buffer_size]
849+
850+
for level in emitted_levels:
851+
for idx, level_data in enumerate(self._buffer[level]):
852+
level_message_type, level_message = level_data
853+
if level_message_type == type_:
854+
yield level_message
855+
self._buffer[level].pop(idx)
856+
if not self._buffer[level]:
857+
self._buffer.pop(level)
858+
824859
async def _on_operations_message(self, message: List[Dict[str, Any]]) -> None:
825860
"""Parse and emit raw operations from WS"""
826861
level_operations: DefaultDict[int, Deque[OperationData]] = defaultdict(deque)

0 commit comments

Comments
 (0)