Skip to content

Commit 7166cfe

Browse files
authored
fix(hip-3-pusher): KMS support, various enhancements
2 parents 67fb0f1 + 46c383b commit 7166cfe

File tree

12 files changed

+325
-250
lines changed

12 files changed

+325
-250
lines changed

apps/hip-3-pusher/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# HIP-3 Pusher
2+
3+
`hip-3-pusher` is intended to serve as an oracle updater for
4+
[HIP-3 markets](https://hyperliquid.gitbook.io/hyperliquid-docs/hyperliquid-improvement-proposals-hips/hip-3-builder-deployed-perpetuals).
5+
6+
Currently it:
7+
- Sources market data from Hyperliquid, Pyth Lazer, and Pythnet
8+
- Supports KMS for signing oracle updates
9+
- Provides telemetry to Pyth's internal observability system

apps/hip-3-pusher/pyproject.toml

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
11
[project]
22
name = "hip-3-pusher"
3-
version = "0.1.3"
3+
version = "0.1.4"
44
description = "Hyperliquid HIP-3 market oracle pusher"
55
readme = "README.md"
6-
requires-python = ">=3.13"
6+
requires-python = "==3.13.*"
77
dependencies = [
8-
"asn1crypto>=1.5.1",
9-
"boto3>=1.40.31",
10-
"cryptography>=45.0.7",
11-
"hyperliquid-python-sdk>=0.19.0",
12-
"loguru>=0.7.3",
13-
"opentelemetry-exporter-prometheus>=0.58b0",
14-
"opentelemetry-sdk>=1.37.0",
15-
"prometheus-client>=0.23.1",
16-
"toml>=0.10.2",
17-
"websockets>=15.0.1",
8+
"boto3~=1.40.34",
9+
"cryptography~=46.0.1",
10+
"hyperliquid-python-sdk~=0.19.0",
11+
"loguru~=0.7.3",
12+
"opentelemetry-exporter-prometheus~=0.58b0",
13+
"opentelemetry-sdk~=1.37.0",
14+
"prometheus-client~=0.23.1",
15+
"websockets~=15.0.1",
1816
]

apps/hip-3-pusher/src/config.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from pydantic import BaseModel
2+
3+
4+
class KMSConfig(BaseModel):
5+
enable_kms: bool
6+
aws_region_name: str
7+
key_path: str
8+
access_key_id_path: str
9+
secret_access_key_path: str
10+
11+
12+
class LazerConfig(BaseModel):
13+
lazer_urls: list[str]
14+
lazer_api_key: str
15+
base_feed_id: int
16+
base_feed_exponent: int
17+
quote_feed_id: int
18+
quote_feed_exponent: int
19+
20+
21+
class HermesConfig(BaseModel):
22+
hermes_urls: list[str]
23+
base_feed_id: str
24+
base_feed_exponent: int
25+
quote_feed_id: str
26+
quote_feed_exponent: int
27+
28+
29+
class HyperliquidConfig(BaseModel):
30+
hyperliquid_ws_urls: list[str]
31+
market_name: str
32+
market_symbol: str
33+
use_testnet: bool
34+
oracle_pusher_key_path: str
35+
publish_interval: float
36+
enable_publish: bool
37+
38+
39+
class Config(BaseModel):
40+
stale_price_threshold_seconds: int
41+
prometheus_port: int
42+
hyperliquid: HyperliquidConfig
43+
kms: KMSConfig
44+
lazer: LazerConfig
45+
hermes: HermesConfig

apps/hip-3-pusher/src/hermes_listener.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,18 @@
44
import time
55
import websockets
66

7-
from price_state import PriceState
7+
from config import Config
8+
from price_state import PriceState, PriceUpdate
89

910

1011
class HermesListener:
1112
"""
1213
Subscribe to Hermes price updates for needed feeds.
1314
"""
14-
def __init__(self, config, price_state: PriceState):
15-
self.hermes_urls = config["hermes"]["hermes_urls"]
16-
self.base_feed_id = config["hermes"]["base_feed_id"]
17-
self.quote_feed_id = config["hermes"]["quote_feed_id"]
15+
def __init__(self, config: Config, price_state: PriceState):
16+
self.hermes_urls = config.hermes.hermes_urls
17+
self.base_feed_id = config.hermes.base_feed_id
18+
self.quote_feed_id = config.hermes.quote_feed_id
1819
self.price_state = price_state
1920

2021
def get_subscribe_request(self):
@@ -71,10 +72,10 @@ def parse_hermes_message(self, data):
7172
expo = price_object["expo"]
7273
publish_time = price_object["publish_time"]
7374
logger.debug("Hermes update: {} {} {} {}", id, price, expo, publish_time)
75+
now = time.time()
7476
if id == self.base_feed_id:
75-
self.price_state.hermes_base_price = price
77+
self.price_state.hermes_base_price = PriceUpdate(price, now)
7678
if id == self.quote_feed_id:
77-
self.price_state.hermes_quote_price = price
78-
self.price_state.latest_hermes_timestamp = time.time()
79+
self.price_state.hermes_quote_price = PriceUpdate(price, now)
7980
except Exception as e:
8081
logger.error("parse_hermes_message error: {}", e)
Lines changed: 64 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,77 @@
1+
import asyncio
2+
import json
3+
import websockets
14
from loguru import logger
25
import time
36

4-
from hyperliquid.info import Info
5-
from hyperliquid.utils.constants import TESTNET_API_URL, MAINNET_API_URL
7+
from config import Config
8+
from price_state import PriceState, PriceUpdate
69

7-
from price_state import PriceState
10+
# This will be in config, but note here.
11+
# Other RPC providers exist but so far we've seen their support is incomplete.
12+
HYPERLIQUID_MAINNET_WS_URL = "wss://api.hyperliquid.xyz/ws"
13+
HYPERLIQUID_TESTNET_WS_URL = "wss://api.hyperliquid-testnet.xyz/ws"
814

915

1016
class HyperliquidListener:
1117
"""
1218
Subscribe to any relevant Hyperliquid websocket streams
1319
See https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/api/websocket
1420
"""
15-
def __init__(self, config: dict, price_state: PriceState):
16-
self.market_symbol = config["hyperliquid"]["market_symbol"]
17-
url = TESTNET_API_URL if config["hyperliquid"].get("use_testnet", True) else MAINNET_API_URL
18-
self.info = Info(base_url=url)
21+
def __init__(self, config: Config, price_state: PriceState):
22+
self.hyperliquid_ws_urls = config.hyperliquid.hyperliquid_ws_urls
23+
self.market_symbol = config.hyperliquid.market_symbol
1924
self.price_state = price_state
2025

21-
def subscribe(self):
22-
self.info.subscribe({"type": "activeAssetCtx", "coin": self.market_symbol}, self.on_activeAssetCtx)
23-
24-
def on_activeAssetCtx(self, message):
25-
"""
26-
Parse oraclePx and markPx from perp context update
27-
28-
:param message: activeAssetCtx websocket update message
29-
:return: None
30-
"""
31-
ctx = message["data"]["ctx"]
32-
self.price_state.hl_oracle_price = ctx["oraclePx"]
33-
self.price_state.hl_mark_price = ctx["markPx"]
34-
logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.hl_oracle_price, self.price_state.hl_mark_price)
35-
self.price_state.latest_hl_timestamp = time.time()
26+
def get_subscribe_request(self, asset):
27+
return {
28+
"method": "subscribe",
29+
"subscription": {"type": "activeAssetCtx", "coin": asset}
30+
}
31+
32+
async def subscribe_all(self):
33+
await asyncio.gather(*(self.subscribe_single(hyperliquid_ws_url) for hyperliquid_ws_url in self.hyperliquid_ws_urls))
34+
35+
async def subscribe_single(self, url):
36+
while True:
37+
try:
38+
await self.subscribe_single_inner(url)
39+
except websockets.ConnectionClosed:
40+
logger.error("Connection to {} closed; retrying", url)
41+
except Exception as e:
42+
logger.exception("Error on {}: {}", url, e)
43+
44+
async def subscribe_single_inner(self, url):
45+
async with websockets.connect(url) as ws:
46+
subscribe_request = self.get_subscribe_request(self.market_symbol)
47+
await ws.send(json.dumps(subscribe_request))
48+
logger.info("Sent subscribe request to {}", url)
49+
50+
# listen for updates
51+
async for message in ws:
52+
try:
53+
data = json.loads(message)
54+
channel = data.get("channel", None)
55+
if not channel:
56+
logger.error("No channel in message: {}", data)
57+
elif channel == "subscriptionResponse":
58+
logger.debug("Received subscription response: {}", data)
59+
elif channel == "error":
60+
logger.error("Received Hyperliquid error response: {}", data)
61+
elif channel == "activeAssetCtx":
62+
self.parse_hyperliquid_ws_message(data)
63+
else:
64+
logger.error("Received unknown channel: {}", channel)
65+
except json.JSONDecodeError as e:
66+
logger.error("Failed to decode JSON message: {} error: {}", message, e)
67+
68+
def parse_hyperliquid_ws_message(self, message):
69+
try:
70+
ctx = message["data"]["ctx"]
71+
now = time.time()
72+
self.price_state.hl_oracle_price = PriceUpdate(ctx["oraclePx"], now)
73+
self.price_state.hl_mark_price = PriceUpdate(ctx["markPx"], now)
74+
logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.hl_oracle_price,
75+
self.price_state.hl_mark_price)
76+
except Exception as e:
77+
logger.error("parse_hyperliquid_ws_message error: message: {} e: {}", message, e)
Lines changed: 60 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,33 @@
11
import boto3
2-
from asn1crypto import core
2+
from cryptography.hazmat.primitives import serialization
3+
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
34
from eth_account.messages import encode_typed_data, _hash_eip191_message
5+
from eth_keys.backends.native.ecdsa import N as SECP256K1_N
46
from eth_keys.datatypes import Signature
57
from eth_utils import keccak, to_hex
68
from hyperliquid.exchange import Exchange
79
from hyperliquid.utils.constants import TESTNET_API_URL, MAINNET_API_URL
810
from hyperliquid.utils.signing import get_timestamp_ms, action_hash, construct_phantom_agent, l1_payload
911
from loguru import logger
1012

13+
from config import Config
14+
15+
SECP256K1_N_HALF = SECP256K1_N // 2
16+
1117

1218
class KMSSigner:
13-
def __init__(self, key_id, aws_region_name, use_testnet):
19+
def __init__(self, config: Config):
20+
use_testnet = config.hyperliquid.use_testnet
1421
url = TESTNET_API_URL if use_testnet else MAINNET_API_URL
1522
self.oracle_publisher_exchange: Exchange = Exchange(wallet=None, base_url=url)
23+
self.client = self._init_client(config)
1624

17-
self.key_id = key_id
18-
self.client = boto3.client("kms", region_name=aws_region_name)
1925
# Fetch public key once so we can derive address and check recovery id
20-
pub_der = self.client.get_public_key(KeyId=key_id)["PublicKey"]
21-
22-
from cryptography.hazmat.primitives import serialization
23-
pub = serialization.load_der_public_key(pub_der)
26+
key_path = config.kms.key_path
27+
self.key_id = open(key_path, "r").read().strip()
28+
self.pubkey_der = self.client.get_public_key(KeyId=self.key_id)["PublicKey"]
29+
# Construct eth address to log
30+
pub = serialization.load_der_public_key(self.pubkey_der)
2431
numbers = pub.public_numbers()
2532
x = numbers.x.to_bytes(32, "big")
2633
y = numbers.y.to_bytes(32, "big")
@@ -29,6 +36,22 @@ def __init__(self, key_id, aws_region_name, use_testnet):
2936
self.address = "0x" + keccak(uncompressed[1:])[-20:].hex()
3037
logger.info("KMSSigner address: {}", self.address)
3138

39+
def _init_client(self, config):
40+
aws_region_name = config.kms.aws_region_name
41+
access_key_id_path = config.kms.access_key_id_path
42+
access_key_id = open(access_key_id_path, "r").read().strip()
43+
secret_access_key_path = config.kms.secret_access_key_path
44+
secret_access_key = open(secret_access_key_path, "r").read().strip()
45+
46+
return boto3.client(
47+
"kms",
48+
region_name=aws_region_name,
49+
aws_access_key_id=access_key_id,
50+
aws_secret_access_key=secret_access_key,
51+
# can specify an endpoint for e.g. LocalStack
52+
# endpoint_url="http://localhost:4566"
53+
)
54+
3255
def set_oracle(self, dex, oracle_pxs, all_mark_pxs, external_perp_pxs):
3356
timestamp = get_timestamp_ms()
3457
oracle_pxs_wire = sorted(list(oracle_pxs.items()))
@@ -60,34 +83,39 @@ def sign_l1_action(self, action, nonce, is_mainnet):
6083
data = l1_payload(phantom_agent)
6184
structured_data = encode_typed_data(full_message=data)
6285
message_hash = _hash_eip191_message(structured_data)
63-
signed = self.sign_message(message_hash)
64-
return {"r": to_hex(signed["r"]), "s": to_hex(signed["s"]), "v": signed["v"]}
86+
return self.sign_message(message_hash)
6587

66-
def sign_message(self, message_hash: bytes):
88+
def sign_message(self, message_hash: bytes) -> dict:
89+
# Send message hash to KMS for signing
6790
resp = self.client.sign(
6891
KeyId=self.key_id,
6992
Message=message_hash,
7093
MessageType="DIGEST",
7194
SigningAlgorithm="ECDSA_SHA_256", # required for secp256k1
7295
)
73-
der_sig = resp["Signature"]
74-
75-
seq = core.Sequence.load(der_sig)
76-
r = int(seq[0].native)
77-
s = int(seq[1].native)
78-
79-
for recovery_id in (0, 1):
80-
candidate = Signature(vrs=(recovery_id, r, s))
81-
pubkey = candidate.recover_public_key_from_msg_hash(message_hash)
82-
if pubkey.to_bytes() == self.public_key_bytes:
83-
v = recovery_id + 27
84-
break
85-
else:
86-
raise ValueError("Failed to determine recovery id")
87-
88-
return {
89-
"r": r,
90-
"s": s,
91-
"v": v,
92-
"signature": Signature(vrs=(v, r, s)).to_bytes().hex(),
93-
}
96+
kms_signature = resp["Signature"]
97+
# Decode the KMS DER signature -> (r, s)
98+
r, s = decode_dss_signature(kms_signature)
99+
# Ethereum requires low-s form
100+
if s > SECP256K1_N_HALF:
101+
s = SECP256K1_N - s
102+
# Parse KMS public key into uncompressed secp256k1 bytes
103+
# TODO: Pull this into init
104+
pubkey = serialization.load_der_public_key(self.pubkey_der)
105+
pubkey_bytes = pubkey.public_bytes(
106+
serialization.Encoding.X962,
107+
serialization.PublicFormat.UncompressedPoint,
108+
)
109+
# Strip leading 0x04 (uncompressed point indicator)
110+
raw_pubkey_bytes = pubkey_bytes[1:]
111+
# Try both recovery ids
112+
for v in (0, 1):
113+
sig_obj = Signature(vrs=(v, r, s))
114+
recovered_pub = sig_obj.recover_public_key_from_msg_hash(message_hash)
115+
if recovered_pub.to_bytes() == raw_pubkey_bytes:
116+
return {
117+
"r": to_hex(r),
118+
"s": to_hex(s),
119+
"v": v + 27,
120+
}
121+
raise ValueError("Could not recover public key; signature mismatch")

0 commit comments

Comments
 (0)