Skip to content

Commit 5f2557f

Browse files
authored
Merge pull request #391 from dipdup-net/fix/transfers_subscription
Fix token transfers handling for realtime index status
2 parents ed183ca + 869701f commit 5f2557f

File tree

4 files changed

+39
-1
lines changed

4 files changed

+39
-1
lines changed

src/dipdup/datasources/datasource.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from dipdup.models import BigMapData
1818
from dipdup.models import HeadBlockData
1919
from dipdup.models import OperationData
20+
from dipdup.models import TokenTransferData
2021
from dipdup.utils import FormattedLogger
2122

2223
_logger = logging.getLogger('dipdup.datasource')
@@ -25,6 +26,7 @@
2526
EmptyCallbackT = Callable[[], Awaitable[None]]
2627
HeadCallbackT = Callable[['IndexDatasource', HeadBlockData], Awaitable[None]]
2728
OperationsCallbackT = Callable[['IndexDatasource', Tuple[OperationData, ...]], Awaitable[None]]
29+
TokenTransfersCallbackT = Callable[['IndexDatasource', Tuple[TokenTransferData, ...]], Awaitable[None]]
2830
BigMapsCallbackT = Callable[['IndexDatasource', Tuple[BigMapData, ...]], Awaitable[None]]
2931
RollbackCallbackT = Callable[['IndexDatasource', MessageType, int, int], Awaitable[None]]
3032

@@ -74,6 +76,7 @@ def __init__(self, url: str, http_config: HTTPConfig, merge_subscriptions: bool
7476
self._on_disconnected_callbacks: Set[EmptyCallbackT] = set()
7577
self._on_head_callbacks: Set[HeadCallbackT] = set()
7678
self._on_operations_callbacks: Set[OperationsCallbackT] = set()
79+
self._on_token_transfers_callbacks: Set[TokenTransfersCallbackT] = set()
7780
self._on_big_maps_callbacks: Set[BigMapsCallbackT] = set()
7881
self._on_rollback_callbacks: Set[RollbackCallbackT] = set()
7982
self._subscriptions: SubscriptionManager = SubscriptionManager(merge_subscriptions)
@@ -100,6 +103,9 @@ def call_on_head(self, fn: HeadCallbackT) -> None:
100103
def call_on_operations(self, fn: OperationsCallbackT) -> None:
101104
self._on_operations_callbacks.add(fn)
102105

106+
def call_on_token_transfers(self, fn: TokenTransfersCallbackT) -> None:
107+
self._on_token_transfers_callbacks.add(fn)
108+
103109
def call_on_big_maps(self, fn: BigMapsCallbackT) -> None:
104110
self._on_big_maps_callbacks.add(fn)
105111

@@ -120,6 +126,10 @@ async def emit_operations(self, operations: Tuple[OperationData, ...]) -> None:
120126
for fn in self._on_operations_callbacks:
121127
await fn(self, operations)
122128

129+
async def emit_token_transfers(self, token_transfers: Tuple[TokenTransferData, ...]) -> None:
130+
for fn in self._on_token_transfers_callbacks:
131+
await fn(self, token_transfers)
132+
123133
async def emit_big_maps(self, big_maps: Tuple[BigMapData, ...]) -> None:
124134
for fn in self._on_big_maps_callbacks:
125135
await fn(self, big_maps)

src/dipdup/datasources/tzkt/datasource.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -899,6 +899,7 @@ def _get_ws_client(self) -> SignalRClient:
899899
self._ws_client.on_error(self._on_error)
900900

901901
self._ws_client.on('operations', partial(self._on_message, MessageType.operation))
902+
self._ws_client.on('transfers', partial(self._on_message, MessageType.token_transfer))
902903
self._ws_client.on('bigmaps', partial(self._on_message, MessageType.big_map))
903904
self._ws_client.on('head', partial(self._on_message, MessageType.head))
904905

@@ -977,6 +978,8 @@ async def _on_message(self, type_: MessageType, message: List[Dict[str, Any]]) -
977978
for buffered_message in self._buffer.yield_from():
978979
if buffered_message.type == MessageType.operation:
979980
await self._process_operations_data(cast(list, buffered_message.data))
981+
elif buffered_message.type == MessageType.token_transfer:
982+
await self._process_token_transfers_data(cast(list, buffered_message.data))
980983
elif buffered_message.type == MessageType.big_map:
981984
await self._process_big_maps_data(cast(list, buffered_message.data))
982985
elif buffered_message.type == MessageType.head:
@@ -997,6 +1000,17 @@ async def _process_operations_data(self, data: List[Dict[str, Any]]) -> None:
9971000
for _level, operations in level_operations.items():
9981001
await self.emit_operations(tuple(operations))
9991002

1003+
async def _process_token_transfers_data(self, data: List[Dict[str, Any]]) -> None:
1004+
"""Parse and emit raw token transfers from WS"""
1005+
level_token_transfers: DefaultDict[int, Deque[TokenTransferData]] = defaultdict(deque)
1006+
1007+
for token_transfer_json in data:
1008+
token_transfer = self.convert_token_transfer(token_transfer_json)
1009+
level_token_transfers[token_transfer.level].append(token_transfer)
1010+
1011+
for _level, token_transfers in level_token_transfers.items():
1012+
await self.emit_token_transfers(tuple(token_transfers))
1013+
10001014
async def _process_big_maps_data(self, data: List[Dict[str, Any]]) -> None:
10011015
"""Parse and emit raw big map diffs from WS"""
10021016
level_big_maps: DefaultDict[int, Deque[BigMapData]] = defaultdict(deque)

src/dipdup/dipdup.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
from dipdup.index import HeadIndex
4747
from dipdup.index import Index
4848
from dipdup.index import OperationIndex
49+
from dipdup.index import TokenTransferIndex
4950
from dipdup.index import extract_operation_subgroups
5051
from dipdup.models import BigMapData
5152
from dipdup.models import Contract
@@ -55,6 +56,7 @@
5556
from dipdup.models import IndexStatus
5657
from dipdup.models import OperationData
5758
from dipdup.models import Schema
59+
from dipdup.models import TokenTransferData
5860
from dipdup.prometheus import Metrics
5961
from dipdup.scheduler import SchedulerManager
6062
from dipdup.utils import is_importable
@@ -220,6 +222,7 @@ async def _subscribe_to_datasource_events(self) -> None:
220222
continue
221223
datasource.call_on_head(self._on_head)
222224
datasource.call_on_operations(self._on_operations)
225+
datasource.call_on_token_transfers(self._on_token_transfers)
223226
datasource.call_on_big_maps(self._on_big_maps)
224227
datasource.call_on_rollback(self._on_rollback)
225228

@@ -259,6 +262,11 @@ async def _on_operations(self, datasource: IndexDatasource, operations: Tuple[Op
259262
for index in operation_indexes:
260263
index.push_operations(operation_subgroups)
261264

265+
async def _on_token_transfers(self, datasource: IndexDatasource, token_transfers: Tuple[TokenTransferData, ...]) -> None:
266+
token_transfer_indexes = (i for i in self._indexes.values() if isinstance(i, TokenTransferIndex) and i.datasource == datasource)
267+
for index in token_transfer_indexes:
268+
index.push_token_transfers(token_transfers)
269+
262270
async def _on_big_maps(self, datasource: IndexDatasource, big_maps: Tuple[BigMapData, ...]) -> None:
263271
big_map_indexes = (i for i in self._indexes.values() if isinstance(i, BigMapIndex) and i.datasource == datasource)
264272
for index in big_map_indexes:

src/dipdup/index.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -937,8 +937,14 @@ def __init__(self, ctx: DipDupContext, config: TokenTransferIndexConfig, datasou
937937
super().__init__(ctx, config, datasource)
938938
self._queue: Deque[Tuple[TokenTransferData, ...]] = deque()
939939

940+
def push_token_transfers(self, token_transfers: Tuple[TokenTransferData, ...]) -> None:
941+
self._queue.append(token_transfers)
942+
943+
if Metrics.enabled:
944+
Metrics.set_levels_to_realtime(self._config.name, len(self._queue))
945+
940946
async def _synchronize(self, last_level: int, cache: bool = False) -> None:
941-
"""Fetch operations via Fetcher and pass to message callback"""
947+
"""Fetch token transfers via Fetcher and pass to message callback"""
942948
first_level = await self._enter_sync_state(last_level)
943949
if first_level is None:
944950
return

0 commit comments

Comments
 (0)