Skip to content

Commit 75f2e24

Browse files
authored
feat(hip-3-pusher): AWS client config, backup write support, timeout/metric fixes
2 parents f6df66d + ab709a7 commit 75f2e24

File tree

12 files changed

+167
-131
lines changed

12 files changed

+167
-131
lines changed

apps/hip-3-pusher/config/config.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,12 @@ market_symbol = "BTC"
88
use_testnet = false
99
oracle_pusher_key_path = "/path/to/oracle_pusher_key.txt"
1010
publish_interval = 3.0
11+
publish_timeout = 5.0
1112
enable_publish = false
1213

1314
[kms]
1415
enable_kms = false
15-
key_path = "/path/to/aws_kms_key_id.txt"
16-
access_key_id_path = "/path/to/aws_access_key_id.txt"
17-
secret_access_key_path = "/path/to/aws_secret_access_key.txt"
18-
aws_region_name = "ap-northeast-1"
16+
aws_kms_key_id_path = "/path/to/aws_kms_key_id.txt"
1917

2018
[lazer]
2119
lazer_urls = ["wss://pyth-lazer-0.dourolabs.app/v1/stream", "wss://pyth-lazer-1.dourolabs.app/v1/stream"]

apps/hip-3-pusher/pyproject.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@ dependencies = [
1212
"opentelemetry-exporter-prometheus~=0.58b0",
1313
"opentelemetry-sdk~=1.37.0",
1414
"prometheus-client~=0.23.1",
15-
"setuptools~=80.9",
1615
"tenacity~=9.1.2",
1716
"websockets~=15.0.1",
18-
"wheel~=0.45.1",
1917
]
2018

2119
[build-system]

apps/hip-3-pusher/src/pusher/config.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
1-
from pydantic import BaseModel
1+
from hyperliquid.utils.constants import MAINNET_API_URL, TESTNET_API_URL
2+
from pydantic import BaseModel, FilePath, model_validator
3+
from typing import Optional
24

35
STALE_TIMEOUT_SECONDS = 5
46

57

68
class KMSConfig(BaseModel):
79
enable_kms: bool
8-
aws_region_name: str
9-
key_path: str
10-
access_key_id_path: str
11-
secret_access_key_path: str
10+
aws_kms_key_id_path: FilePath
1211

1312

1413
class LazerConfig(BaseModel):
@@ -30,13 +29,21 @@ class HermesConfig(BaseModel):
3029

3130
class HyperliquidConfig(BaseModel):
3231
hyperliquid_ws_urls: list[str]
32+
push_urls: Optional[list[str]] = None
3333
market_name: str
3434
market_symbol: str
3535
use_testnet: bool
36-
oracle_pusher_key_path: str
36+
oracle_pusher_key_path: FilePath
3737
publish_interval: float
38+
publish_timeout: float
3839
enable_publish: bool
3940

41+
@model_validator(mode="after")
42+
def set_default_urls(self):
43+
if self.push_urls is None:
44+
self.push_urls = [TESTNET_API_URL] if self.use_testnet else [MAINNET_API_URL]
45+
return self
46+
4047

4148
class Config(BaseModel):
4249
stale_price_threshold_seconds: int
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,6 @@
1-
class StaleConnection(Exception):
1+
class StaleConnectionError(Exception):
2+
pass
3+
4+
5+
class PushError(Exception):
26
pass

apps/hip-3-pusher/src/pusher/hermes_listener.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from tenacity import retry, retry_if_exception_type, wait_exponential
77

88
from pusher.config import Config, STALE_TIMEOUT_SECONDS
9-
from pusher.exception import StaleConnection
9+
from pusher.exception import StaleConnectionError
1010
from pusher.price_state import PriceState, PriceUpdate
1111

1212

@@ -34,7 +34,7 @@ async def subscribe_all(self):
3434
await asyncio.gather(*(self.subscribe_single(url) for url in self.hermes_urls))
3535

3636
@retry(
37-
retry=retry_if_exception_type((StaleConnection, websockets.ConnectionClosed)),
37+
retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)),
3838
wait=wait_exponential(multiplier=1, min=1, max=10),
3939
reraise=True,
4040
)
@@ -55,7 +55,7 @@ async def subscribe_single_inner(self, url):
5555
data = json.loads(message)
5656
self.parse_hermes_message(data)
5757
except asyncio.TimeoutError:
58-
raise StaleConnection(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
58+
raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
5959
except websockets.ConnectionClosed:
6060
raise
6161
except json.JSONDecodeError as e:

apps/hip-3-pusher/src/pusher/hyperliquid_listener.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import time
77

88
from pusher.config import Config, STALE_TIMEOUT_SECONDS
9-
from pusher.exception import StaleConnection
9+
from pusher.exception import StaleConnectionError
1010
from pusher.price_state import PriceState, PriceUpdate
1111

1212
# This will be in config, but note here.
@@ -35,7 +35,7 @@ async def subscribe_all(self):
3535
await asyncio.gather(*(self.subscribe_single(hyperliquid_ws_url) for hyperliquid_ws_url in self.hyperliquid_ws_urls))
3636

3737
@retry(
38-
retry=retry_if_exception_type((StaleConnection, websockets.ConnectionClosed)),
38+
retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)),
3939
wait=wait_exponential(multiplier=1, min=1, max=10),
4040
reraise=True,
4141
)
@@ -65,7 +65,7 @@ async def subscribe_single_inner(self, url):
6565
else:
6666
logger.error("Received unknown channel: {}", channel)
6767
except asyncio.TimeoutError:
68-
raise StaleConnection(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...")
68+
raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...")
6969
except websockets.ConnectionClosed:
7070
raise
7171
except json.JSONDecodeError as e:

apps/hip-3-pusher/src/pusher/kms_signer.py

Lines changed: 58 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,51 +6,61 @@
66
from eth_keys.datatypes import Signature
77
from eth_utils import keccak, to_hex
88
from hyperliquid.exchange import Exchange
9-
from hyperliquid.utils.constants import TESTNET_API_URL, MAINNET_API_URL
109
from hyperliquid.utils.signing import get_timestamp_ms, action_hash, construct_phantom_agent, l1_payload
1110
from loguru import logger
11+
from pathlib import Path
1212

1313
from pusher.config import Config
14+
from pusher.exception import PushError
1415

1516
SECP256K1_N_HALF = SECP256K1_N // 2
1617

1718

19+
def _init_client():
20+
# AWS_DEFAULT_REGION, AWS_ACCESS_KEY_ID, and AWS_SECRET_ACCESS_KEY should be set as environment variables
21+
return boto3.client(
22+
"kms",
23+
# can specify an endpoint for e.g. LocalStack
24+
# endpoint_url="http://localhost:4566"
25+
)
26+
27+
1828
class KMSSigner:
19-
def __init__(self, config: Config):
20-
use_testnet = config.hyperliquid.use_testnet
21-
url = TESTNET_API_URL if use_testnet else MAINNET_API_URL
22-
self.oracle_publisher_exchange: Exchange = Exchange(wallet=None, base_url=url)
23-
self.client = self._init_client(config)
29+
def __init__(self, config: Config, publisher_exchanges: list[Exchange]):
30+
self.use_testnet = config.hyperliquid.use_testnet
31+
self.publisher_exchanges = publisher_exchanges
32+
33+
# AWS client and public key load
34+
self.client = _init_client()
35+
try:
36+
self._load_public_key(config.kms.aws_kms_key_id_path)
37+
except Exception as e:
38+
logger.exception("Failed to load public key from KMS; it might be incorrectly configured; error: {}", repr(e))
39+
exit()
2440

41+
def _load_public_key(self, key_path: str):
2542
# Fetch public key once so we can derive address and check recovery id
26-
key_path = config.kms.key_path
27-
self.key_id = open(key_path, "r").read().strip()
28-
self.pubkey_der = self.client.get_public_key(KeyId=self.key_id)["PublicKey"]
43+
self.aws_kms_key_id = Path(key_path).read_text().strip()
44+
pubkey_der = self.client.get_public_key(KeyId=self.aws_kms_key_id)["PublicKey"]
45+
self.pubkey = serialization.load_der_public_key(pubkey_der)
46+
self._construct_pubkey_address_and_bytes()
47+
48+
def _construct_pubkey_address_and_bytes(self):
2949
# Construct eth address to log
30-
pub = serialization.load_der_public_key(self.pubkey_der)
31-
numbers = pub.public_numbers()
50+
numbers = self.pubkey.public_numbers()
3251
x = numbers.x.to_bytes(32, "big")
3352
y = numbers.y.to_bytes(32, "big")
3453
uncompressed = b"\x04" + x + y
35-
self.public_key_bytes = uncompressed
3654
self.address = "0x" + keccak(uncompressed[1:])[-20:].hex()
37-
logger.info("KMSSigner address: {}", self.address)
38-
39-
def _init_client(self, config):
40-
aws_region_name = config.kms.aws_region_name
41-
access_key_id_path = config.kms.access_key_id_path
42-
access_key_id = open(access_key_id_path, "r").read().strip()
43-
secret_access_key_path = config.kms.secret_access_key_path
44-
secret_access_key = open(secret_access_key_path, "r").read().strip()
45-
46-
return boto3.client(
47-
"kms",
48-
region_name=aws_region_name,
49-
aws_access_key_id=access_key_id,
50-
aws_secret_access_key=secret_access_key,
51-
# can specify an endpoint for e.g. LocalStack
52-
# endpoint_url="http://localhost:4566"
55+
logger.info("public key loaded from KMS: {}", self.address)
56+
57+
# Parse KMS public key into uncompressed secp256k1 bytes
58+
pubkey_bytes = self.pubkey.public_bytes(
59+
serialization.Encoding.X962,
60+
serialization.PublicFormat.UncompressedPoint,
5361
)
62+
# Strip leading 0x04 (uncompressed point indicator)
63+
self.raw_pubkey_bytes = pubkey_bytes[1:]
5464

5565
def set_oracle(self, dex, oracle_pxs, all_mark_pxs, external_perp_pxs):
5666
timestamp = get_timestamp_ms()
@@ -67,15 +77,24 @@ def set_oracle(self, dex, oracle_pxs, all_mark_pxs, external_perp_pxs):
6777
},
6878
}
6979
signature = self.sign_l1_action(
70-
action,
71-
timestamp,
72-
self.oracle_publisher_exchange.base_url == MAINNET_API_URL,
73-
)
74-
return self.oracle_publisher_exchange._post_action(
75-
action,
76-
signature,
77-
timestamp,
80+
action=action,
81+
nonce=timestamp,
82+
is_mainnet=not self.use_testnet,
7883
)
84+
return self._send_update(action, signature, timestamp)
85+
86+
def _send_update(self, action, signature, timestamp):
87+
for exchange in self.publisher_exchanges:
88+
try:
89+
return exchange._post_action(
90+
action=action,
91+
signature=signature,
92+
nonce=timestamp,
93+
)
94+
except Exception as e:
95+
logger.exception("perp_deploy_set_oracle exception for endpoint: {} error: {}", exchange.base_url, repr(e))
96+
97+
raise PushError("all push endpoints failed")
7998

8099
def sign_l1_action(self, action, nonce, is_mainnet):
81100
hash = action_hash(action, vault_address=None, nonce=nonce, expires_after=None)
@@ -88,7 +107,7 @@ def sign_l1_action(self, action, nonce, is_mainnet):
88107
def sign_message(self, message_hash: bytes) -> dict:
89108
# Send message hash to KMS for signing
90109
resp = self.client.sign(
91-
KeyId=self.key_id,
110+
KeyId=self.aws_kms_key_id,
92111
Message=message_hash,
93112
MessageType="DIGEST",
94113
SigningAlgorithm="ECDSA_SHA_256", # required for secp256k1
@@ -99,20 +118,12 @@ def sign_message(self, message_hash: bytes) -> dict:
99118
# Ethereum requires low-s form
100119
if s > SECP256K1_N_HALF:
101120
s = SECP256K1_N - s
102-
# Parse KMS public key into uncompressed secp256k1 bytes
103-
# TODO: Pull this into init
104-
pubkey = serialization.load_der_public_key(self.pubkey_der)
105-
pubkey_bytes = pubkey.public_bytes(
106-
serialization.Encoding.X962,
107-
serialization.PublicFormat.UncompressedPoint,
108-
)
109-
# Strip leading 0x04 (uncompressed point indicator)
110-
raw_pubkey_bytes = pubkey_bytes[1:]
121+
111122
# Try both recovery ids
112123
for v in (0, 1):
113124
sig_obj = Signature(vrs=(v, r, s))
114125
recovered_pub = sig_obj.recover_public_key_from_msg_hash(message_hash)
115-
if recovered_pub.to_bytes() == raw_pubkey_bytes:
126+
if recovered_pub.to_bytes() == self.raw_pubkey_bytes:
116127
return {
117128
"r": to_hex(r),
118129
"s": to_hex(s),

apps/hip-3-pusher/src/pusher/lazer_listener.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from tenacity import retry, retry_if_exception_type, wait_exponential
77

88
from pusher.config import Config, STALE_TIMEOUT_SECONDS
9-
from pusher.exception import StaleConnection
9+
from pusher.exception import StaleConnectionError
1010
from pusher.price_state import PriceState, PriceUpdate
1111

1212

@@ -38,7 +38,7 @@ async def subscribe_all(self):
3838
await asyncio.gather(*(self.subscribe_single(router_url) for router_url in self.lazer_urls))
3939

4040
@retry(
41-
retry=retry_if_exception_type((StaleConnection, websockets.ConnectionClosed)),
41+
retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)),
4242
wait=wait_exponential(multiplier=1, min=1, max=10),
4343
reraise=True,
4444
)
@@ -63,7 +63,7 @@ async def subscribe_single_inner(self, router_url):
6363
data = json.loads(message)
6464
self.parse_lazer_message(data)
6565
except asyncio.TimeoutError:
66-
raise StaleConnection(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
66+
raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
6767
except websockets.ConnectionClosed:
6868
raise
6969
except json.JSONDecodeError as e:

apps/hip-3-pusher/src/pusher/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,4 @@ async def main():
6262
try:
6363
asyncio.run(main())
6464
except Exception as e:
65-
logger.exception("Uncaught exception, exiting: {}", e)
65+
logger.exception("Uncaught exception, exiting; error: {}", repr(e))

apps/hip-3-pusher/src/pusher/metrics.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@ def __init__(self, config: Config):
1717
reader = PrometheusMetricReader()
1818
# Meter is responsible for creating and recording metrics
1919
set_meter_provider(MeterProvider(metric_readers=[reader]))
20-
# TODO: sync version number and add?
2120
self.meter = get_meter_provider().get_meter(METER_NAME)
22-
2321
self._init_metrics()
2422

2523
def _init_metrics(self):
@@ -35,5 +33,8 @@ def _init_metrics(self):
3533
name="hip_3_pusher_failed_push_count",
3634
description="Number of failed push attempts",
3735
)
38-
39-
# TODO: labels/attributes
36+
self.push_interval_histogram = self.meter.create_histogram(
37+
name="hip_3_pusher_push_interval",
38+
description="Interval between push requests (seconds)",
39+
unit="s",
40+
)

0 commit comments

Comments
 (0)