Skip to content

Commit e1c83bc

Browse files
authored
feat(hip-3-pusher): set up pytest, websocket timeout and better reconnect handling
2 parents 6f4abf4 + 42cf325 commit e1c83bc

File tree

14 files changed

+228
-54
lines changed

14 files changed

+228
-54
lines changed

apps/hip-3-pusher/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,4 @@ RUN --mount=type=cache,target=/root/.cache/uv \
3030
uv sync --locked --no-dev
3131

3232
# Run the app by default
33-
CMD ["uv", "run", "src/main.py", "--config", "config/config.toml"]
33+
CMD ["uv", "run", "src/pusher/main.py", "--config", "config/config.toml"]

apps/hip-3-pusher/pyproject.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,18 @@ description = "Hyperliquid HIP-3 market oracle pusher"
55
readme = "README.md"
66
requires-python = "==3.13.*"
77
dependencies = [
8-
"boto3~=1.40.34",
8+
"boto3~=1.40.38",
99
"cryptography~=46.0.1",
1010
"hyperliquid-python-sdk~=0.19.0",
1111
"loguru~=0.7.3",
1212
"opentelemetry-exporter-prometheus~=0.58b0",
1313
"opentelemetry-sdk~=1.37.0",
1414
"prometheus-client~=0.23.1",
15+
"tenacity~=9.1.2",
1516
"websockets~=15.0.1",
1617
]
18+
19+
[dependency-groups]
20+
dev = [
21+
"pytest~=8.4.2",
22+
]

apps/hip-3-pusher/src/config.py renamed to apps/hip-3-pusher/src/pusher/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from pydantic import BaseModel
22

3+
STALE_TIMEOUT_SECONDS = 5
4+
35

46
class KMSConfig(BaseModel):
57
enable_kms: bool
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
class StaleConnection(Exception):
2+
pass

apps/hip-3-pusher/src/hermes_listener.py renamed to apps/hip-3-pusher/src/pusher/hermes_listener.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
from loguru import logger
44
import time
55
import websockets
6+
from tenacity import retry, retry_if_exception_type, wait_exponential
67

7-
from config import Config
8-
from price_state import PriceState, PriceUpdate
8+
from pusher.config import Config, STALE_TIMEOUT_SECONDS
9+
from pusher.exception import StaleConnection
10+
from pusher.price_state import PriceState, PriceUpdate
911

1012

1113
class HermesListener:
@@ -31,14 +33,13 @@ def get_subscribe_request(self):
3133
async def subscribe_all(self):
3234
await asyncio.gather(*(self.subscribe_single(url) for url in self.hermes_urls))
3335

36+
@retry(
37+
retry=retry_if_exception_type((StaleConnection, websockets.ConnectionClosed)),
38+
wait=wait_exponential(multiplier=1, min=1, max=10),
39+
reraise=True,
40+
)
3441
async def subscribe_single(self, url):
35-
while True:
36-
try:
37-
await self.subscribe_single_inner(url)
38-
except websockets.ConnectionClosed:
39-
logger.error("Connection to {} closed; retrying", url)
40-
except Exception as e:
41-
logger.exception("Error on {}: {}", url, e)
42+
return await self.subscribe_single_inner(url)
4243

4344
async def subscribe_single_inner(self, url):
4445
async with websockets.connect(url) as ws:
@@ -48,12 +49,19 @@ async def subscribe_single_inner(self, url):
4849
logger.info("Sent Hermes subscribe request to {}", url)
4950

5051
# listen for updates
51-
async for message in ws:
52+
while True:
5253
try:
54+
message = await asyncio.wait_for(ws.recv(), timeout=STALE_TIMEOUT_SECONDS)
5355
data = json.loads(message)
5456
self.parse_hermes_message(data)
57+
except asyncio.TimeoutError:
58+
raise StaleConnection(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
59+
except websockets.ConnectionClosed:
60+
raise
5561
except json.JSONDecodeError as e:
5662
logger.error("Failed to decode JSON message: {}", e)
63+
except Exception as e:
64+
logger.error("Unexpected exception: {}", e)
5765

5866
def parse_hermes_message(self, data):
5967
"""

apps/hip-3-pusher/src/hyperliquid_listener.py renamed to apps/hip-3-pusher/src/pusher/hyperliquid_listener.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
import json
33
import websockets
44
from loguru import logger
5+
from tenacity import retry, retry_if_exception_type, wait_exponential
56
import time
67

7-
from config import Config
8-
from price_state import PriceState, PriceUpdate
8+
from pusher.config import Config, STALE_TIMEOUT_SECONDS
9+
from pusher.exception import StaleConnection
10+
from pusher.price_state import PriceState, PriceUpdate
911

1012
# This will be in config, but note here.
1113
# Other RPC providers exist but so far we've seen their support is incomplete.
@@ -32,14 +34,13 @@ def get_subscribe_request(self, asset):
3234
async def subscribe_all(self):
3335
await asyncio.gather(*(self.subscribe_single(hyperliquid_ws_url) for hyperliquid_ws_url in self.hyperliquid_ws_urls))
3436

37+
@retry(
38+
retry=retry_if_exception_type((StaleConnection, websockets.ConnectionClosed)),
39+
wait=wait_exponential(multiplier=1, min=1, max=10),
40+
reraise=True,
41+
)
3542
async def subscribe_single(self, url):
36-
while True:
37-
try:
38-
await self.subscribe_single_inner(url)
39-
except websockets.ConnectionClosed:
40-
logger.error("Connection to {} closed; retrying", url)
41-
except Exception as e:
42-
logger.exception("Error on {}: {}", url, e)
43+
return await self.subscribe_single_inner(url)
4344

4445
async def subscribe_single_inner(self, url):
4546
async with websockets.connect(url) as ws:
@@ -48,8 +49,9 @@ async def subscribe_single_inner(self, url):
4849
logger.info("Sent subscribe request to {}", url)
4950

5051
# listen for updates
51-
async for message in ws:
52+
while True:
5253
try:
54+
message = await asyncio.wait_for(ws.recv(), timeout=STALE_TIMEOUT_SECONDS)
5355
data = json.loads(message)
5456
channel = data.get("channel", None)
5557
if not channel:
@@ -62,8 +64,14 @@ async def subscribe_single_inner(self, url):
6264
self.parse_hyperliquid_ws_message(data)
6365
else:
6466
logger.error("Received unknown channel: {}", channel)
67+
except asyncio.TimeoutError:
68+
raise StaleConnection(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...")
69+
except websockets.ConnectionClosed:
70+
raise
6571
except json.JSONDecodeError as e:
6672
logger.error("Failed to decode JSON message: {} error: {}", message, e)
73+
except Exception as e:
74+
logger.error("Unexpected exception: {}", e)
6775

6876
def parse_hyperliquid_ws_message(self, message):
6977
try:

apps/hip-3-pusher/src/kms_signer.py renamed to apps/hip-3-pusher/src/pusher/kms_signer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from hyperliquid.utils.signing import get_timestamp_ms, action_hash, construct_phantom_agent, l1_payload
1111
from loguru import logger
1212

13-
from config import Config
13+
from pusher.config import Config
1414

1515
SECP256K1_N_HALF = SECP256K1_N // 2
1616

apps/hip-3-pusher/src/lazer_listener.py renamed to apps/hip-3-pusher/src/pusher/lazer_listener.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
from loguru import logger
44
import time
55
import websockets
6+
from tenacity import retry, retry_if_exception_type, wait_exponential
67

7-
from config import Config
8-
from price_state import PriceState, PriceUpdate
8+
from pusher.config import Config, STALE_TIMEOUT_SECONDS
9+
from pusher.exception import StaleConnection
10+
from pusher.price_state import PriceState, PriceUpdate
911

1012

1113
class LazerListener:
@@ -35,14 +37,13 @@ def get_subscribe_request(self, subscription_id: int):
3537
async def subscribe_all(self):
3638
await asyncio.gather(*(self.subscribe_single(router_url) for router_url in self.lazer_urls))
3739

40+
@retry(
41+
retry=retry_if_exception_type((StaleConnection, websockets.ConnectionClosed)),
42+
wait=wait_exponential(multiplier=1, min=1, max=10),
43+
reraise=True,
44+
)
3845
async def subscribe_single(self, router_url):
39-
while True:
40-
try:
41-
await self.subscribe_single_inner(router_url)
42-
except websockets.ConnectionClosed:
43-
logger.error("Connection to {} closed; retrying", router_url)
44-
except Exception as e:
45-
logger.exception("Error on {}: {}", router_url, e)
46+
return await self.subscribe_single_inner(router_url)
4647

4748
async def subscribe_single_inner(self, router_url):
4849
headers = {
@@ -56,12 +57,19 @@ async def subscribe_single_inner(self, router_url):
5657
logger.info("Sent Lazer subscribe request to {}", router_url)
5758

5859
# listen for updates
59-
async for message in ws:
60+
while True:
6061
try:
62+
message = await asyncio.wait_for(ws.recv(), timeout=STALE_TIMEOUT_SECONDS)
6163
data = json.loads(message)
6264
self.parse_lazer_message(data)
65+
except asyncio.TimeoutError:
66+
raise StaleConnection(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
67+
except websockets.ConnectionClosed:
68+
raise
6369
except json.JSONDecodeError as e:
6470
logger.error("Failed to decode JSON message: {}", e)
71+
except Exception as e:
72+
logger.error("Unexpected exception: {}", e)
6573

6674
def parse_lazer_message(self, data):
6775
"""

apps/hip-3-pusher/src/main.py renamed to apps/hip-3-pusher/src/pusher/main.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55
import sys
66
import tomllib
77

8-
from config import Config
9-
from hyperliquid_listener import HyperliquidListener
10-
from lazer_listener import LazerListener
11-
from hermes_listener import HermesListener
12-
from price_state import PriceState
13-
from publisher import Publisher
14-
from metrics import Metrics
8+
from pusher.config import Config
9+
from pusher.hyperliquid_listener import HyperliquidListener
10+
from pusher.lazer_listener import LazerListener
11+
from pusher.hermes_listener import HermesListener
12+
from pusher.price_state import PriceState
13+
from pusher.publisher import Publisher
14+
from pusher.metrics import Metrics
1515

1616

1717
def load_config():

apps/hip-3-pusher/src/metrics.py renamed to apps/hip-3-pusher/src/pusher/metrics.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from opentelemetry.metrics import get_meter_provider, set_meter_provider
44
from opentelemetry.sdk.metrics import MeterProvider
55

6-
from config import Config
6+
from pusher.config import Config
77

88
METER_NAME = "hip3pusher"
99

0 commit comments

Comments
 (0)