Skip to content

Commit 38875d5

Browse files
Fix processing realtime messages with data from multiple levels (#260)
1 parent 4eaf39b commit 38875d5

File tree

2 files changed

+10
-5
lines changed

2 files changed

+10
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ Please use [this](https://docs.gitlab.com/ee/development/changelog.html) documen
77
### Fixed
88

99
* database: Fixed generating table names from uppercase model names.
10+
* tzkt: Fixed processing realtime messages with data from multiple levels.
1011

1112
## 4.2.5 - 2022-02-21
1213

src/dipdup/datasources/tzkt/datasource.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -706,23 +706,27 @@ async def _extract_message_data(self, type_: MessageType, message: List[Any]) ->
706706

707707
async def _on_operations_message(self, message: List[Dict[str, Any]]) -> None:
708708
"""Parse and emit raw operations from WS"""
709+
level_operations: DefaultDict[int, Deque[OperationData]] = defaultdict(deque)
709710
async for data in self._extract_message_data(MessageType.operation, message):
710-
operations: Deque[OperationData] = deque()
711711
for operation_json in data:
712712
if operation_json['status'] != 'applied':
713713
continue
714714
operation = self.convert_operation(operation_json)
715-
operations.append(operation)
716-
if operations:
717-
await self.emit_operations(tuple(operations))
715+
level_operations[operation.level].append(operation)
716+
717+
for _level, operations in level_operations.items():
718+
await self.emit_operations(tuple(operations))
718719

719720
async def _on_big_maps_message(self, message: List[Dict[str, Any]]) -> None:
720721
"""Parse and emit raw big map diffs from WS"""
722+
level_big_maps: DefaultDict[int, Deque[BigMapData]] = defaultdict(deque)
721723
async for data in self._extract_message_data(MessageType.big_map, message):
722724
big_maps: Deque[BigMapData] = deque()
723725
for big_map_json in data:
724726
big_map = self.convert_big_map(big_map_json)
725-
big_maps.append(big_map)
727+
level_big_maps[big_map.level].append(big_map)
728+
729+
for _level, big_maps in level_big_maps.items():
726730
await self.emit_big_maps(tuple(big_maps))
727731

728732
async def _on_head_message(self, message: List[Dict[str, Any]]) -> None:

0 commit comments

Comments
 (0)