Skip to content
Merged
4 changes: 2 additions & 2 deletions .github/workflows/unit-and-integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ jobs:
PYTHONUNBUFFERED: "1"
run: |
source venv/bin/activate
python -m uv run pytest -n 2 tests/unit_tests/ --reruns 3
python -m uv run pytest tests/unit_tests/ --reruns 3 -s

- name: Integration tests
timeout-minutes: 20
env:
PYTHONUNBUFFERED: "1"
run: |
source venv/bin/activate
python -m uv run pytest -n 2 tests/integration_tests/ --reruns 3
python -m uv run pytest tests/integration_tests/ --reruns 3 -s
13 changes: 10 additions & 3 deletions async_substrate_interface/async_substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,9 +842,15 @@ async def _exit_with_timer(self):
"""
try:
if self.shutdown_timer is not None:
logger.debug("Exiting with timer")
await asyncio.sleep(self.shutdown_timer)
logger.debug("Exiting with timer")
await self.shutdown()
if (
self.state != State.CONNECTING
and self._sending.qsize() == 0
and not self._received_subscriptions
and self._waiting_for_response <= 0
):
await self.shutdown()
except asyncio.CancelledError:
pass

Expand Down Expand Up @@ -985,6 +991,7 @@ async def unsubscribe(
original_id = get_next_id()
while original_id in self._in_use_ids:
original_id = get_next_id()
logger.debug(f"Unwatched extrinsic subscription {subscription_id}")
self._received_subscriptions.pop(subscription_id, None)

to_send = {
Expand Down Expand Up @@ -2516,6 +2523,7 @@ async def _make_rpc_request(
subscription_added = False

async with self.ws as ws:
await ws.mark_waiting_for_response()
for payload in payloads:
item_id = await ws.send(payload["payload"])
request_manager.add_request(item_id, payload["id"])
Expand All @@ -2527,7 +2535,6 @@ async def _make_rpc_request(
logger.debug(
f"Submitted payload ID {payload['id']} with websocket ID {item_id}: {output_payload}"
)
await ws.mark_waiting_for_response()

while True:
for item_id in request_manager.unresponded():
Expand Down
11 changes: 9 additions & 2 deletions tests/helpers/proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,21 @@


class ProxyServer:
def __init__(self, upstream: str, time_til_pause: float, time_til_resume: float):
def __init__(
self,
upstream: str,
time_til_pause: float,
time_til_resume: float,
port: int = 8080,
):
self.upstream_server = upstream
self.time_til_pause = time_til_pause
self.time_til_resume = time_til_resume
self.upstream_connection = None
self.connection_time = 0
self.shutdown_time = 0
self.resume_time = 0
self.port = port

def connect(self):
self.upstream_connection = connect(self.upstream_server)
Expand All @@ -41,7 +48,7 @@ def proxy_request(self, websocket: ServerConnection):
websocket.send(recd)

def serve(self):
with serve(self.proxy_request, "localhost", 8080) as self.server:
with serve(self.proxy_request, "localhost", self.port) as self.server:
self.server.serve_forever()

def connect_and_serve(self):
Expand Down
35 changes: 30 additions & 5 deletions tests/integration_tests/test_async_substrate_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os.path
import time
import threading
import socket

import bittensor_wallet
import pytest
Expand Down Expand Up @@ -197,6 +198,13 @@ async def test_query_map_with_odd_number_of_params():

@pytest.mark.asyncio
async def test_improved_reconnection():
def get_free_port():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0)) # Bind to port 0 = OS picks free port
s.listen(1)
port_ = s.getsockname()[1]
return port_

print("Testing test_improved_reconnection")
ws_logger_path = "/tmp/websockets-proxy-test"
ws_logger = logging.getLogger("websockets.proxy")
Expand All @@ -210,14 +218,15 @@ async def test_improved_reconnection():
os.remove(asi_logger_path)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.FileHandler(asi_logger_path))
port = get_free_port()
print(f"Testing using server on port {port}")
proxy = ProxyServer("wss://archive.sub.latent.to", 10, 20, port=port)

proxy = ProxyServer("wss://archive.sub.latent.to", 10, 20)

server_thread = threading.Thread(target=proxy.connect_and_serve)
server_thread = threading.Thread(target=proxy.connect_and_serve, daemon=True)
server_thread.start()
await asyncio.sleep(3) # give the server start up time
async with AsyncSubstrateInterface(
"ws://localhost:8080",
f"ws://localhost:{port}",
ss58_format=42,
chain_name="Bittensor",
retry_timeout=10.0,
Expand Down Expand Up @@ -250,7 +259,7 @@ async def test_improved_reconnection():
shutdown_thread = threading.Thread(target=proxy.close)
shutdown_thread.start()
shutdown_thread.join(timeout=5)
server_thread.join(timeout=5)
# server_thread.join(timeout=5)
print("test_improved_reconnection succeeded")


Expand Down Expand Up @@ -319,3 +328,19 @@ async def concurrent_task(substrate, task_id):
await asyncio.gather(*tasks)

print("test_concurrent_rpc_requests succeeded")


@pytest.mark.asyncio
async def test_wait_for_block():
async def handler(_):
return True

substrate = AsyncSubstrateInterface(
LATENT_LITE_ENTRYPOINT, ss58_format=42, chain_name="Bittensor"
)
await substrate.initialize()
current_block = await substrate.get_block_number(None)
result = await substrate.wait_for_block(
current_block + 3, result_handler=handler, task_return=False
)
assert result is True
Loading