diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 94abf59..cc78a0e 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -516,7 +516,7 @@ def __init__( # TODO reconnection logic self.ws_url = ws_url self.ws: Optional["ClientConnection"] = None - self.max_subscriptions = max_subscriptions + self.max_subscriptions = asyncio.Semaphore(max_subscriptions) self.max_connections = max_connections self.shutdown_timer = shutdown_timer self._received = {} @@ -631,6 +631,7 @@ async def send(self, payload: dict) -> int: # async with self._lock: original_id = get_next_id() # self._open_subscriptions += 1 + await self.max_subscriptions.acquire() try: await self.ws.send(json.dumps({**payload, **{"id": original_id}})) return original_id @@ -649,7 +650,9 @@ async def retrieve(self, item_id: int) -> Optional[dict]: retrieved item """ try: - return self._received.pop(item_id) + item = self._received.pop(item_id) + self.max_subscriptions.release() + return item except KeyError: await asyncio.sleep(0.001) return None @@ -876,7 +879,7 @@ async def decode_scale( scale_bytes: bytes, _attempt=1, _retries=3, - return_scale_obj=False, + return_scale_obj: bool = False, ) -> Union[ScaleObj, Any]: """ Helper function to decode arbitrary SCALE-bytes (e.g. 0x02000000) according to given RUST type_string @@ -2528,13 +2531,13 @@ async def runtime_call( Returns: ScaleType from the runtime call """ - await self.init_runtime(block_hash=block_hash) + runtime = await self.init_runtime(block_hash=block_hash) if params is None: params = {} try: - metadata_v15_value = self.runtime.metadata_v15.value() + metadata_v15_value = runtime.metadata_v15.value() apis = {entry["name"]: entry for entry in metadata_v15_value["apis"]} api_entry = apis[api]