Skip to content

Commit 4d480d1

Browse files
committed
Better timeout and error handling
1 parent e1b59b3 commit 4d480d1

File tree

8 files changed

+46
-35
lines changed

8 files changed

+46
-35
lines changed

apps/hip-3-pusher/config/config.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ market_symbol = "BTC"
88
use_testnet = false
99
oracle_pusher_key_path = "/path/to/oracle_pusher_key.txt"
1010
publish_interval = 3.0
11+
publish_timeout = 5.0
1112
enable_publish = false
1213

1314
[kms]

apps/hip-3-pusher/src/pusher/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class HyperliquidConfig(BaseModel):
3535
use_testnet: bool
3636
oracle_pusher_key_path: FilePath
3737
publish_interval: float
38+
publish_timeout: float
3839
enable_publish: bool
3940

4041
@model_validator(mode="after")
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,6 @@
1-
class StaleConnection(Exception):
1+
class StaleConnectionError(Exception):
2+
pass
3+
4+
5+
class PushError(Exception):
26
pass

apps/hip-3-pusher/src/pusher/hermes_listener.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from tenacity import retry, retry_if_exception_type, wait_exponential
77

88
from pusher.config import Config, STALE_TIMEOUT_SECONDS
9-
from pusher.exception import StaleConnection
9+
from pusher.exception import StaleConnectionError
1010
from pusher.price_state import PriceState, PriceUpdate
1111

1212

@@ -34,7 +34,7 @@ async def subscribe_all(self):
3434
await asyncio.gather(*(self.subscribe_single(url) for url in self.hermes_urls))
3535

3636
@retry(
37-
retry=retry_if_exception_type((StaleConnection, websockets.ConnectionClosed)),
37+
retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)),
3838
wait=wait_exponential(multiplier=1, min=1, max=10),
3939
reraise=True,
4040
)
@@ -55,7 +55,7 @@ async def subscribe_single_inner(self, url):
5555
data = json.loads(message)
5656
self.parse_hermes_message(data)
5757
except asyncio.TimeoutError:
58-
raise StaleConnection(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
58+
raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
5959
except websockets.ConnectionClosed:
6060
raise
6161
except json.JSONDecodeError as e:

apps/hip-3-pusher/src/pusher/hyperliquid_listener.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import time
77

88
from pusher.config import Config, STALE_TIMEOUT_SECONDS
9-
from pusher.exception import StaleConnection
9+
from pusher.exception import StaleConnectionError
1010
from pusher.price_state import PriceState, PriceUpdate
1111

1212
# This will be in config, but note here.
@@ -35,7 +35,7 @@ async def subscribe_all(self):
3535
await asyncio.gather(*(self.subscribe_single(hyperliquid_ws_url) for hyperliquid_ws_url in self.hyperliquid_ws_urls))
3636

3737
@retry(
38-
retry=retry_if_exception_type((StaleConnection, websockets.ConnectionClosed)),
38+
retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)),
3939
wait=wait_exponential(multiplier=1, min=1, max=10),
4040
reraise=True,
4141
)
@@ -65,7 +65,7 @@ async def subscribe_single_inner(self, url):
6565
else:
6666
logger.error("Received unknown channel: {}", channel)
6767
except asyncio.TimeoutError:
68-
raise StaleConnection(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...")
68+
raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...")
6969
except websockets.ConnectionClosed:
7070
raise
7171
except json.JSONDecodeError as e:

apps/hip-3-pusher/src/pusher/kms_signer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
import boto3
32
from cryptography.hazmat.primitives import serialization
43
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
@@ -12,6 +11,7 @@
1211
from pathlib import Path
1312

1413
from pusher.config import Config
14+
from pusher.exception import PushError
1515

1616
SECP256K1_N_HALF = SECP256K1_N // 2
1717

@@ -94,7 +94,7 @@ def _send_update(self, action, signature, timestamp):
9494
except Exception as e:
9595
logger.exception("perp_deploy_set_oracle exception for endpoint: {} error: {}", exchange.base_url, repr(e))
9696

97-
return None
97+
raise PushError("all push endpoints failed")
9898

9999
def sign_l1_action(self, action, nonce, is_mainnet):
100100
hash = action_hash(action, vault_address=None, nonce=nonce, expires_after=None)

apps/hip-3-pusher/src/pusher/lazer_listener.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from tenacity import retry, retry_if_exception_type, wait_exponential
77

88
from pusher.config import Config, STALE_TIMEOUT_SECONDS
9-
from pusher.exception import StaleConnection
9+
from pusher.exception import StaleConnectionError
1010
from pusher.price_state import PriceState, PriceUpdate
1111

1212

@@ -38,7 +38,7 @@ async def subscribe_all(self):
3838
await asyncio.gather(*(self.subscribe_single(router_url) for router_url in self.lazer_urls))
3939

4040
@retry(
41-
retry=retry_if_exception_type((StaleConnection, websockets.ConnectionClosed)),
41+
retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)),
4242
wait=wait_exponential(multiplier=1, min=1, max=10),
4343
reraise=True,
4444
)
@@ -63,7 +63,7 @@ async def subscribe_single_inner(self, router_url):
6363
data = json.loads(message)
6464
self.parse_lazer_message(data)
6565
except asyncio.TimeoutError:
66-
raise StaleConnection(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
66+
raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
6767
except websockets.ConnectionClosed:
6868
raise
6969
except json.JSONDecodeError as e:

apps/hip-3-pusher/src/pusher/publisher.py

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from hyperliquid.exchange import Exchange
88

99
from pusher.config import Config
10+
from pusher.exception import PushError
1011
from pusher.kms_signer import KMSSigner
1112
from pusher.metrics import Metrics
1213
from pusher.price_state import PriceState
@@ -30,7 +31,10 @@ def __init__(self, config: Config, price_state: PriceState, metrics: Metrics):
3031
oracle_pusher_key = Path(config.hyperliquid.oracle_pusher_key_path).read_text().strip()
3132
oracle_account: LocalAccount = Account.from_key(oracle_pusher_key)
3233
logger.info("oracle pusher local pubkey: {}", oracle_account.address)
33-
self.publisher_exchanges = [Exchange(wallet=oracle_account, base_url=url) for url in self.push_urls]
34+
self.publisher_exchanges = [Exchange(wallet=oracle_account,
35+
base_url=url,
36+
timeout=config.hyperliquid.publish_timeout)
37+
for url in self.push_urls]
3438
if config.kms.enable_kms:
3539
self.enable_kms = True
3640
self.kms_signer = KMSSigner(config, self.publisher_exchanges)
@@ -70,23 +74,29 @@ def publish(self):
7074
# TODO: "Each update can change oraclePx and markPx by at most 1%."
7175
# TODO: "The markPx cannot be updated such that open interest would be 10x the open interest cap."
7276

73-
push_response = None
7477
if self.enable_publish:
75-
if self.enable_kms:
76-
push_response = self.kms_signer.set_oracle(
77-
dex=self.market_name,
78-
oracle_pxs=oracle_pxs,
79-
all_mark_pxs=mark_pxs,
80-
external_perp_pxs=external_perp_pxs,
81-
)
82-
else:
83-
push_response = self._send_update(
84-
oracle_pxs=oracle_pxs,
85-
all_mark_pxs=mark_pxs,
86-
external_perp_pxs=external_perp_pxs,
87-
)
88-
89-
self._handle_response(push_response)
78+
try:
79+
if self.enable_kms:
80+
push_response = self.kms_signer.set_oracle(
81+
dex=self.market_name,
82+
oracle_pxs=oracle_pxs,
83+
all_mark_pxs=mark_pxs,
84+
external_perp_pxs=external_perp_pxs,
85+
)
86+
else:
87+
push_response = self._send_update(
88+
oracle_pxs=oracle_pxs,
89+
all_mark_pxs=mark_pxs,
90+
external_perp_pxs=external_perp_pxs,
91+
)
92+
self._handle_response(push_response)
93+
except PushError:
94+
logger.error("All push attempts failed")
95+
self.metrics.failed_push_counter.add(1, self.metrics_labels)
96+
except Exception as e:
97+
logger.exception("Unexpected exception in push request: {}", repr(e))
98+
else:
99+
logger.debug("push disabled")
90100

91101
def _send_update(self, oracle_pxs, all_mark_pxs, external_perp_pxs):
92102
for exchange in self.publisher_exchanges:
@@ -100,14 +110,9 @@ def _send_update(self, oracle_pxs, all_mark_pxs, external_perp_pxs):
100110
except Exception as e:
101111
logger.exception("perp_deploy_set_oracle exception for endpoint: {} error: {}", exchange.base_url, repr(e))
102112

103-
return None
113+
raise PushError("all push endpoints failed")
104114

105115
def _handle_response(self, response):
106-
if response is None:
107-
logger.error("Push API call failed")
108-
self.metrics.failed_push_counter.add(1, self.metrics_labels)
109-
return
110-
111116
logger.debug("publish: push response: {} {}", response, type(response))
112117
status = response.get("status")
113118
if status == "ok":

0 commit comments

Comments
 (0)