Skip to content

Commit 6360483

Browse files
committed
websocket timeouts and retries
1 parent 5366f99 commit 6360483

File tree

7 files changed

+77
-37
lines changed

7 files changed

+77
-37
lines changed

apps/hip-3-pusher/pyproject.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,18 @@ description = "Hyperliquid HIP-3 market oracle pusher"
55
readme = "README.md"
66
requires-python = "==3.13.*"
77
dependencies = [
8-
"boto3~=1.40.34",
8+
"boto3~=1.40.38",
99
"cryptography~=46.0.1",
1010
"hyperliquid-python-sdk~=0.19.0",
1111
"loguru~=0.7.3",
1212
"opentelemetry-exporter-prometheus~=0.58b0",
1313
"opentelemetry-sdk~=1.37.0",
1414
"prometheus-client~=0.23.1",
15+
"tenacity~=9.1.2",
1516
"websockets~=15.0.1",
1617
]
1718

1819
[dependency-groups]
1920
dev = [
20-
"pytest>=8.4.2",
21+
"pytest~=8.4.2",
2122
]

apps/hip-3-pusher/src/pusher/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from pydantic import BaseModel
22

3+
STALE_TIMEOUT_SECONDS = 5
4+
35

46
class KMSConfig(BaseModel):
57
enable_kms: bool
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
class StaleConnection(Exception):
2+
pass

apps/hip-3-pusher/src/pusher/hermes_listener.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
from loguru import logger
44
import time
55
import websockets
6+
from tenacity import retry, retry_if_exception_type, wait_exponential
67

7-
from pusher.config import Config
8+
from pusher.config import Config, STALE_TIMEOUT_SECONDS
9+
from pusher.exception import StaleConnection
810
from pusher.price_state import PriceState, PriceUpdate
911

1012

@@ -31,14 +33,13 @@ def get_subscribe_request(self):
3133
async def subscribe_all(self):
3234
await asyncio.gather(*(self.subscribe_single(url) for url in self.hermes_urls))
3335

36+
@retry(
37+
retry=retry_if_exception_type((StaleConnection, websockets.ConnectionClosed)),
38+
wait=wait_exponential(multiplier=1, min=1, max=10),
39+
reraise=True,
40+
)
3441
async def subscribe_single(self, url):
35-
while True:
36-
try:
37-
await self.subscribe_single_inner(url)
38-
except websockets.ConnectionClosed:
39-
logger.error("Connection to {} closed; retrying", url)
40-
except Exception as e:
41-
logger.exception("Error on {}: {}", url, e)
42+
return await self.subscribe_single_inner(url)
4243

4344
async def subscribe_single_inner(self, url):
4445
async with websockets.connect(url) as ws:
@@ -48,12 +49,19 @@ async def subscribe_single_inner(self, url):
4849
logger.info("Sent Hermes subscribe request to {}", url)
4950

5051
# listen for updates
51-
async for message in ws:
52+
while True:
5253
try:
54+
message = await asyncio.wait_for(ws.recv(), timeout=STALE_TIMEOUT_SECONDS)
5355
data = json.loads(message)
5456
self.parse_hermes_message(data)
57+
except asyncio.TimeoutError:
58+
raise StaleConnection(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
59+
except websockets.ConnectionClosed:
60+
raise
5561
except json.JSONDecodeError as e:
5662
logger.error("Failed to decode JSON message: {}", e)
63+
except Exception as e:
64+
logger.error("Unexpected exception: {}", e)
5765

5866
def parse_hermes_message(self, data):
5967
"""

apps/hip-3-pusher/src/pusher/hyperliquid_listener.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
import json
33
import websockets
44
from loguru import logger
5+
from tenacity import retry, retry_if_exception_type, wait_exponential
56
import time
67

7-
from pusher.config import Config
8+
from pusher.config import Config, STALE_TIMEOUT_SECONDS
9+
from pusher.exception import StaleConnection
810
from pusher.price_state import PriceState, PriceUpdate
911

1012
# This will be in config, but note here.
@@ -32,14 +34,13 @@ def get_subscribe_request(self, asset):
3234
async def subscribe_all(self):
3335
await asyncio.gather(*(self.subscribe_single(hyperliquid_ws_url) for hyperliquid_ws_url in self.hyperliquid_ws_urls))
3436

37+
@retry(
38+
retry=retry_if_exception_type((StaleConnection, websockets.ConnectionClosed)),
39+
wait=wait_exponential(multiplier=1, min=1, max=10),
40+
reraise=True,
41+
)
3542
async def subscribe_single(self, url):
36-
while True:
37-
try:
38-
await self.subscribe_single_inner(url)
39-
except websockets.ConnectionClosed:
40-
logger.error("Connection to {} closed; retrying", url)
41-
except Exception as e:
42-
logger.exception("Error on {}: {}", url, e)
43+
return await self.subscribe_single_inner(url)
4344

4445
async def subscribe_single_inner(self, url):
4546
async with websockets.connect(url) as ws:
@@ -48,8 +49,9 @@ async def subscribe_single_inner(self, url):
4849
logger.info("Sent subscribe request to {}", url)
4950

5051
# listen for updates
51-
async for message in ws:
52+
while True:
5253
try:
54+
message = await asyncio.wait_for(ws.recv(), timeout=STALE_TIMEOUT_SECONDS)
5355
data = json.loads(message)
5456
channel = data.get("channel", None)
5557
if not channel:
@@ -62,8 +64,14 @@ async def subscribe_single_inner(self, url):
6264
self.parse_hyperliquid_ws_message(data)
6365
else:
6466
logger.error("Received unknown channel: {}", channel)
67+
except asyncio.TimeoutError:
68+
raise StaleConnection(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...")
69+
except websockets.ConnectionClosed:
70+
raise
6571
except json.JSONDecodeError as e:
6672
logger.error("Failed to decode JSON message: {} error: {}", message, e)
73+
except Exception as e:
74+
logger.error("Unexpected exception: {}", e)
6775

6876
def parse_hyperliquid_ws_message(self, message):
6977
try:

apps/hip-3-pusher/src/pusher/lazer_listener.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
from loguru import logger
44
import time
55
import websockets
6+
from tenacity import retry, retry_if_exception_type, wait_exponential
67

7-
from pusher.config import Config
8+
from pusher.config import Config, STALE_TIMEOUT_SECONDS
9+
from pusher.exception import StaleConnection
810
from pusher.price_state import PriceState, PriceUpdate
911

1012

@@ -35,14 +37,13 @@ def get_subscribe_request(self, subscription_id: int):
3537
async def subscribe_all(self):
3638
await asyncio.gather(*(self.subscribe_single(router_url) for router_url in self.lazer_urls))
3739

40+
@retry(
41+
retry=retry_if_exception_type((StaleConnection, websockets.ConnectionClosed)),
42+
wait=wait_exponential(multiplier=1, min=1, max=10),
43+
reraise=True,
44+
)
3845
async def subscribe_single(self, router_url):
39-
while True:
40-
try:
41-
await self.subscribe_single_inner(router_url)
42-
except websockets.ConnectionClosed:
43-
logger.error("Connection to {} closed; retrying", router_url)
44-
except Exception as e:
45-
logger.exception("Error on {}: {}", router_url, e)
46+
return await self.subscribe_single_inner(router_url)
4647

4748
async def subscribe_single_inner(self, router_url):
4849
headers = {
@@ -56,12 +57,19 @@ async def subscribe_single_inner(self, router_url):
5657
logger.info("Sent Lazer subscribe request to {}", router_url)
5758

5859
# listen for updates
59-
async for message in ws:
60+
while True:
6061
try:
62+
message = await asyncio.wait_for(ws.recv(), timeout=STALE_TIMEOUT_SECONDS)
6163
data = json.loads(message)
6264
self.parse_lazer_message(data)
65+
except asyncio.TimeoutError:
66+
raise StaleConnection(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
67+
except websockets.ConnectionClosed:
68+
raise
6369
except json.JSONDecodeError as e:
6470
logger.error("Failed to decode JSON message: {}", e)
71+
except Exception as e:
72+
logger.error("Unexpected exception: {}", e)
6573

6674
def parse_lazer_message(self, data):
6775
"""

apps/hip-3-pusher/uv.lock

Lines changed: 19 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)