Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions apps/hip-3-pusher/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# HIP-3 Pusher

`hip-3-pusher` is intended to serve as an oracle updater for
[HIP-3 markets](https://hyperliquid.gitbook.io/hyperliquid-docs/hyperliquid-improvement-proposals-hips/hip-3-builder-deployed-perpetuals).

Currently it:
- Sources market data from Hyperliquid, Pyth Lazer, and Pythnet
- Supports KMS for signing oracle updates
- Provides telemetry to Pyth's internal observability system
22 changes: 10 additions & 12 deletions apps/hip-3-pusher/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
[project]
name = "hip-3-pusher"
version = "0.1.3"
version = "0.1.4"
description = "Hyperliquid HIP-3 market oracle pusher"
readme = "README.md"
requires-python = ">=3.13"
requires-python = "==3.13.*"
dependencies = [
"asn1crypto>=1.5.1",
"boto3>=1.40.31",
"cryptography>=45.0.7",
"hyperliquid-python-sdk>=0.19.0",
"loguru>=0.7.3",
"opentelemetry-exporter-prometheus>=0.58b0",
"opentelemetry-sdk>=1.37.0",
"prometheus-client>=0.23.1",
"toml>=0.10.2",
"websockets>=15.0.1",
"boto3~=1.40.34",
"cryptography~=46.0.1",
"hyperliquid-python-sdk~=0.19.0",
"loguru~=0.7.3",
"opentelemetry-exporter-prometheus~=0.58b0",
"opentelemetry-sdk~=1.37.0",
"prometheus-client~=0.23.1",
"websockets~=15.0.1",
Copy link

Copilot AI Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing pydantic dependency which is required for the new Config classes. This will cause runtime import errors.

Suggested change
"websockets~=15.0.1",
"websockets~=15.0.1",
"pydantic~=2.7.1",

Copilot uses AI. Check for mistakes.

]
45 changes: 45 additions & 0 deletions apps/hip-3-pusher/src/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from pydantic import BaseModel


class KMSConfig(BaseModel):
enable_kms: bool
aws_region_name: str
key_path: str
access_key_id_path: str
secret_access_key_path: str


class LazerConfig(BaseModel):
lazer_urls: list[str]
lazer_api_key: str
base_feed_id: int
base_feed_exponent: int
quote_feed_id: int
quote_feed_exponent: int


class HermesConfig(BaseModel):
hermes_urls: list[str]
base_feed_id: str
base_feed_exponent: int
quote_feed_id: str
quote_feed_exponent: int


class HyperliquidConfig(BaseModel):
hyperliquid_ws_urls: list[str]
market_name: str
market_symbol: str
use_testnet: bool
oracle_pusher_key_path: str
publish_interval: float
enable_publish: bool


class Config(BaseModel):
stale_price_threshold_seconds: int
prometheus_port: int
hyperliquid: HyperliquidConfig
kms: KMSConfig
lazer: LazerConfig
hermes: HermesConfig
17 changes: 9 additions & 8 deletions apps/hip-3-pusher/src/hermes_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@
import time
import websockets

from price_state import PriceState
from config import Config
from price_state import PriceState, PriceUpdate


class HermesListener:
"""
Subscribe to Hermes price updates for needed feeds.
"""
def __init__(self, config, price_state: PriceState):
self.hermes_urls = config["hermes"]["hermes_urls"]
self.base_feed_id = config["hermes"]["base_feed_id"]
self.quote_feed_id = config["hermes"]["quote_feed_id"]
def __init__(self, config: Config, price_state: PriceState):
self.hermes_urls = config.hermes.hermes_urls
self.base_feed_id = config.hermes.base_feed_id
self.quote_feed_id = config.hermes.quote_feed_id
self.price_state = price_state

def get_subscribe_request(self):
Expand Down Expand Up @@ -71,10 +72,10 @@ def parse_hermes_message(self, data):
expo = price_object["expo"]
publish_time = price_object["publish_time"]
logger.debug("Hermes update: {} {} {} {}", id, price, expo, publish_time)
now = time.time()
if id == self.base_feed_id:
self.price_state.hermes_base_price = price
self.price_state.hermes_base_price = PriceUpdate(price, now)
if id == self.quote_feed_id:
self.price_state.hermes_quote_price = price
self.price_state.latest_hermes_timestamp = time.time()
self.price_state.hermes_quote_price = PriceUpdate(price, now)
except Exception as e:
logger.error("parse_hermes_message error: {}", e)
86 changes: 64 additions & 22 deletions apps/hip-3-pusher/src/hyperliquid_listener.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,77 @@
import asyncio
import json
import websockets
from loguru import logger
import time

from hyperliquid.info import Info
from hyperliquid.utils.constants import TESTNET_API_URL, MAINNET_API_URL
from config import Config
from price_state import PriceState, PriceUpdate

from price_state import PriceState
# This will be in config, but note here.
# Other RPC providers exist but so far we've seen their support is incomplete.
HYPERLIQUID_MAINNET_WS_URL = "wss://api.hyperliquid.xyz/ws"
HYPERLIQUID_TESTNET_WS_URL = "wss://api.hyperliquid-testnet.xyz/ws"


class HyperliquidListener:
"""
Subscribe to any relevant Hyperliquid websocket streams
See https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/api/websocket
"""
def __init__(self, config: dict, price_state: PriceState):
self.market_symbol = config["hyperliquid"]["market_symbol"]
url = TESTNET_API_URL if config["hyperliquid"].get("use_testnet", True) else MAINNET_API_URL
self.info = Info(base_url=url)
def __init__(self, config: Config, price_state: PriceState):
self.hyperliquid_ws_urls = config.hyperliquid.hyperliquid_ws_urls
self.market_symbol = config.hyperliquid.market_symbol
self.price_state = price_state

def subscribe(self):
self.info.subscribe({"type": "activeAssetCtx", "coin": self.market_symbol}, self.on_activeAssetCtx)

def on_activeAssetCtx(self, message):
"""
Parse oraclePx and markPx from perp context update

:param message: activeAssetCtx websocket update message
:return: None
"""
ctx = message["data"]["ctx"]
self.price_state.hl_oracle_price = ctx["oraclePx"]
self.price_state.hl_mark_price = ctx["markPx"]
logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.hl_oracle_price, self.price_state.hl_mark_price)
self.price_state.latest_hl_timestamp = time.time()
def get_subscribe_request(self, asset):
return {
"method": "subscribe",
"subscription": {"type": "activeAssetCtx", "coin": asset}
}

async def subscribe_all(self):
await asyncio.gather(*(self.subscribe_single(hyperliquid_ws_url) for hyperliquid_ws_url in self.hyperliquid_ws_urls))

async def subscribe_single(self, url):
while True:
try:
await self.subscribe_single_inner(url)
except websockets.ConnectionClosed:
logger.error("Connection to {} closed; retrying", url)
except Exception as e:
logger.exception("Error on {}: {}", url, e)

async def subscribe_single_inner(self, url):
async with websockets.connect(url) as ws:
subscribe_request = self.get_subscribe_request(self.market_symbol)
await ws.send(json.dumps(subscribe_request))
logger.info("Sent subscribe request to {}", url)

# listen for updates
async for message in ws:
try:
data = json.loads(message)
channel = data.get("channel", None)
if not channel:
logger.error("No channel in message: {}", data)
elif channel == "subscriptionResponse":
logger.debug("Received subscription response: {}", data)
elif channel == "error":
logger.error("Received Hyperliquid error response: {}", data)
elif channel == "activeAssetCtx":
self.parse_hyperliquid_ws_message(data)
else:
logger.error("Received unknown channel: {}", channel)
except json.JSONDecodeError as e:
logger.error("Failed to decode JSON message: {} error: {}", message, e)

def parse_hyperliquid_ws_message(self, message):
try:
ctx = message["data"]["ctx"]
now = time.time()
self.price_state.hl_oracle_price = PriceUpdate(ctx["oraclePx"], now)
self.price_state.hl_mark_price = PriceUpdate(ctx["markPx"], now)
logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.hl_oracle_price,
self.price_state.hl_mark_price)
except Exception as e:
logger.error("parse_hyperliquid_ws_message error: message: {} e: {}", message, e)
92 changes: 60 additions & 32 deletions apps/hip-3-pusher/src/kms_signer.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
import boto3
from asn1crypto import core
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
from eth_account.messages import encode_typed_data, _hash_eip191_message
from eth_keys.backends.native.ecdsa import N as SECP256K1_N
from eth_keys.datatypes import Signature
from eth_utils import keccak, to_hex
from hyperliquid.exchange import Exchange
from hyperliquid.utils.constants import TESTNET_API_URL, MAINNET_API_URL
from hyperliquid.utils.signing import get_timestamp_ms, action_hash, construct_phantom_agent, l1_payload
from loguru import logger

from config import Config

SECP256K1_N_HALF = SECP256K1_N // 2


class KMSSigner:
def __init__(self, key_id, aws_region_name, use_testnet):
def __init__(self, config: Config):
use_testnet = config.hyperliquid.use_testnet
url = TESTNET_API_URL if use_testnet else MAINNET_API_URL
self.oracle_publisher_exchange: Exchange = Exchange(wallet=None, base_url=url)
self.client = self._init_client(config)

self.key_id = key_id
self.client = boto3.client("kms", region_name=aws_region_name)
# Fetch public key once so we can derive address and check recovery id
pub_der = self.client.get_public_key(KeyId=key_id)["PublicKey"]

from cryptography.hazmat.primitives import serialization
pub = serialization.load_der_public_key(pub_der)
key_path = config.kms.key_path
self.key_id = open(key_path, "r").read().strip()
self.pubkey_der = self.client.get_public_key(KeyId=self.key_id)["PublicKey"]
# Construct eth address to log
pub = serialization.load_der_public_key(self.pubkey_der)
numbers = pub.public_numbers()
x = numbers.x.to_bytes(32, "big")
y = numbers.y.to_bytes(32, "big")
Expand All @@ -29,6 +36,22 @@ def __init__(self, key_id, aws_region_name, use_testnet):
self.address = "0x" + keccak(uncompressed[1:])[-20:].hex()
logger.info("KMSSigner address: {}", self.address)

def _init_client(self, config):
aws_region_name = config.kms.aws_region_name
access_key_id_path = config.kms.access_key_id_path
access_key_id = open(access_key_id_path, "r").read().strip()
Copy link

Copilot AI Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AWS credentials are loaded from files but not cleared from memory after use. Consider using secure memory handling or environment variables to reduce the risk of credential exposure in memory dumps.

Copilot uses AI. Check for mistakes.

secret_access_key_path = config.kms.secret_access_key_path
secret_access_key = open(secret_access_key_path, "r").read().strip()
Copy link

Copilot AI Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AWS credentials are loaded from files but not cleared from memory after use. Consider using secure memory handling or environment variables to reduce the risk of credential exposure in memory dumps.

Copilot uses AI. Check for mistakes.


return boto3.client(
"kms",
region_name=aws_region_name,
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
# can specify an endpoint for e.g. LocalStack
# endpoint_url="http://localhost:4566"
)

def set_oracle(self, dex, oracle_pxs, all_mark_pxs, external_perp_pxs):
timestamp = get_timestamp_ms()
oracle_pxs_wire = sorted(list(oracle_pxs.items()))
Expand Down Expand Up @@ -60,34 +83,39 @@ def sign_l1_action(self, action, nonce, is_mainnet):
data = l1_payload(phantom_agent)
structured_data = encode_typed_data(full_message=data)
message_hash = _hash_eip191_message(structured_data)
signed = self.sign_message(message_hash)
return {"r": to_hex(signed["r"]), "s": to_hex(signed["s"]), "v": signed["v"]}
return self.sign_message(message_hash)

def sign_message(self, message_hash: bytes):
def sign_message(self, message_hash: bytes) -> dict:
# Send message hash to KMS for signing
resp = self.client.sign(
KeyId=self.key_id,
Message=message_hash,
MessageType="DIGEST",
SigningAlgorithm="ECDSA_SHA_256", # required for secp256k1
)
der_sig = resp["Signature"]

seq = core.Sequence.load(der_sig)
r = int(seq[0].native)
s = int(seq[1].native)

for recovery_id in (0, 1):
candidate = Signature(vrs=(recovery_id, r, s))
pubkey = candidate.recover_public_key_from_msg_hash(message_hash)
if pubkey.to_bytes() == self.public_key_bytes:
v = recovery_id + 27
break
else:
raise ValueError("Failed to determine recovery id")

return {
"r": r,
"s": s,
"v": v,
"signature": Signature(vrs=(v, r, s)).to_bytes().hex(),
}
kms_signature = resp["Signature"]
# Decode the KMS DER signature -> (r, s)
r, s = decode_dss_signature(kms_signature)
# Ethereum requires low-s form
if s > SECP256K1_N_HALF:
s = SECP256K1_N - s
# Parse KMS public key into uncompressed secp256k1 bytes
# TODO: Pull this into init
pubkey = serialization.load_der_public_key(self.pubkey_der)
pubkey_bytes = pubkey.public_bytes(
serialization.Encoding.X962,
serialization.PublicFormat.UncompressedPoint,
)
# Strip leading 0x04 (uncompressed point indicator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there nothing in KMS that'll do this for us?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit over my head here, but I believe there are a couple things at play: KMS not providing this "recovery id" (v) as part of signing because in normal cryptography usage it's not needed, whereas here in crypto land we need it to determine the pubkey from two possible choices; and a quirk of the encoding (X9.62). If you dig in the eth libraries you'll see they have to strip this byte in various places.

Is there a cleaner way to implement this function? Good question.

raw_pubkey_bytes = pubkey_bytes[1:]
# Try both recovery ids
for v in (0, 1):
sig_obj = Signature(vrs=(v, r, s))
recovered_pub = sig_obj.recover_public_key_from_msg_hash(message_hash)
if recovered_pub.to_bytes() == raw_pubkey_bytes:
return {
"r": to_hex(r),
"s": to_hex(s),
"v": v + 27,
}
raise ValueError("Could not recover public key; signature mismatch")
Loading