|
1 | 1 | import asyncio |
2 | 2 | import logging |
3 | | -import os |
4 | | -import sys |
| 3 | +from collections import deque |
5 | 4 | from enum import Enum |
6 | | -from typing import Any, Awaitable, Callable, Dict, List, Optional, Union, cast |
| 5 | +from typing import Any, Awaitable, Callable, Deque, Dict, List, Optional, Union, cast |
7 | 6 |
|
8 | 7 | from aiosignalrcore.hub.base_hub_connection import BaseHubConnection # type: ignore |
9 | 8 | from aiosignalrcore.hub_connection_builder import HubConnectionBuilder # type: ignore |
@@ -92,6 +91,24 @@ class OperationFetcherChannel(Enum): |
92 | 91 | originations = 'originations' |
93 | 92 |
|
94 | 93 |
|
| 94 | +class CallbackExecutor: |
| 95 | + def __init__(self) -> None: |
| 96 | + self._queue: Deque[Awaitable] = deque() |
| 97 | + |
| 98 | + def submit(self, fn, *args, **kwargs): |
| 99 | + self._queue.append(fn(*args, **kwargs)) |
| 100 | + |
| 101 | + async def run(self): |
| 102 | + while True: |
| 103 | + try: |
| 104 | + coro = self._queue.popleft() |
| 105 | + await coro |
| 106 | + except IndexError: |
| 107 | + await asyncio.sleep(0.1) |
| 108 | + except asyncio.CancelledError: |
| 109 | + return |
| 110 | + |
| 111 | + |
95 | 112 | class OperationFetcher: |
96 | 113 | def __init__( |
97 | 114 | self, |
@@ -262,6 +279,7 @@ def __init__(self, url: str, cache: bool): |
262 | 279 | self._rollback_fn: Optional[Callable[[int, int], Awaitable[None]]] = None |
263 | 280 | self._package: Optional[str] = None |
264 | 281 | self._proxy = TzktRequestProxy(cache) |
| 282 | + self._callback_executor = CallbackExecutor() |
265 | 283 |
|
266 | 284 | async def add_index(self, index_name: str, index_config: Union[OperationIndexConfig, BigMapIndexConfig, BlockIndexConfig]): |
267 | 285 | self._logger.info('Adding index `%s`', index_name) |
@@ -297,10 +315,17 @@ def _get_client(self) -> BaseHubConnection: |
297 | 315 | } |
298 | 316 | ) |
299 | 317 | ).build() |
| 318 | + |
| 319 | + async def operation_callback(*args, **kwargs) -> None: |
| 320 | + self._callback_executor.submit(self.on_operation_message, *args, **kwargs) |
| 321 | + |
| 322 | + async def big_map_callback(*args, **kwargs) -> None: |
| 323 | + self._callback_executor.submit(self.on_big_map_message, *args, **kwargs) |
| 324 | + |
300 | 325 | self._client.on_open(self.on_connect) |
301 | 326 | self._client.on_error(self.on_error) |
302 | | - self._client.on('operations', self.on_operation_message) |
303 | | - self._client.on('bigmaps', self.on_big_map_message) |
| 327 | + self._client.on('operations', operation_callback) |
| 328 | + self._client.on('bigmaps', big_map_callback) |
304 | 329 |
|
305 | 330 | return self._client |
306 | 331 |
|
@@ -343,7 +368,10 @@ async def start(self): |
343 | 368 |
|
344 | 369 | if not rest_only: |
345 | 370 | self._logger.info('Starting websocket client') |
346 | | - await self._get_client().start() |
| 371 | + await asyncio.gather( |
| 372 | + await self._get_client().start(), |
| 373 | + await self._callback_executor.run(), |
| 374 | + ) |
347 | 375 |
|
348 | 376 | async def stop(self): |
349 | 377 | ... |
|
0 commit comments