-
Notifications
You must be signed in to change notification settings - Fork 298
fix(hip-3-pusher): KMS support, various enhancements #3072
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1942884
6c285be
57bd3b8
fbe1fd3
46c383b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
# HIP-3 Pusher | ||
|
||
`hip-3-pusher` is intended to serve as an oracle updater for | ||
[HIP-3 markets](https://hyperliquid.gitbook.io/hyperliquid-docs/hyperliquid-improvement-proposals-hips/hip-3-builder-deployed-perpetuals). | ||
|
||
Currently it: | ||
- Sources market data from Hyperliquid, Pyth Lazer, and Pythnet | ||
- Supports KMS for signing oracle updates | ||
- Provides telemetry to Pyth's internal observability system |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,18 +1,16 @@ | ||
[project] | ||
name = "hip-3-pusher" | ||
version = "0.1.3" | ||
version = "0.1.4" | ||
description = "Hyperliquid HIP-3 market oracle pusher" | ||
readme = "README.md" | ||
requires-python = ">=3.13" | ||
requires-python = "==3.13.*" | ||
dependencies = [ | ||
"asn1crypto>=1.5.1", | ||
"boto3>=1.40.31", | ||
"cryptography>=45.0.7", | ||
"hyperliquid-python-sdk>=0.19.0", | ||
"loguru>=0.7.3", | ||
"opentelemetry-exporter-prometheus>=0.58b0", | ||
"opentelemetry-sdk>=1.37.0", | ||
"prometheus-client>=0.23.1", | ||
"toml>=0.10.2", | ||
"websockets>=15.0.1", | ||
"boto3~=1.40.34", | ||
"cryptography~=46.0.1", | ||
"hyperliquid-python-sdk~=0.19.0", | ||
"loguru~=0.7.3", | ||
"opentelemetry-exporter-prometheus~=0.58b0", | ||
"opentelemetry-sdk~=1.37.0", | ||
"prometheus-client~=0.23.1", | ||
"websockets~=15.0.1", | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
from pydantic import BaseModel | ||
|
||
|
||
class KMSConfig(BaseModel): | ||
enable_kms: bool | ||
aws_region_name: str | ||
key_path: str | ||
access_key_id_path: str | ||
secret_access_key_path: str | ||
|
||
|
||
class LazerConfig(BaseModel): | ||
lazer_urls: list[str] | ||
lazer_api_key: str | ||
base_feed_id: int | ||
base_feed_exponent: int | ||
quote_feed_id: int | ||
quote_feed_exponent: int | ||
|
||
|
||
class HermesConfig(BaseModel): | ||
hermes_urls: list[str] | ||
base_feed_id: str | ||
base_feed_exponent: int | ||
quote_feed_id: str | ||
quote_feed_exponent: int | ||
|
||
|
||
class HyperliquidConfig(BaseModel): | ||
hyperliquid_ws_urls: list[str] | ||
market_name: str | ||
market_symbol: str | ||
use_testnet: bool | ||
oracle_pusher_key_path: str | ||
publish_interval: float | ||
enable_publish: bool | ||
|
||
|
||
class Config(BaseModel): | ||
stale_price_threshold_seconds: int | ||
prometheus_port: int | ||
hyperliquid: HyperliquidConfig | ||
kms: KMSConfig | ||
lazer: LazerConfig | ||
hermes: HermesConfig |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,35 +1,77 @@ | ||
import asyncio | ||
import json | ||
import websockets | ||
from loguru import logger | ||
import time | ||
|
||
from hyperliquid.info import Info | ||
from hyperliquid.utils.constants import TESTNET_API_URL, MAINNET_API_URL | ||
from config import Config | ||
from price_state import PriceState, PriceUpdate | ||
|
||
from price_state import PriceState | ||
# This will be in config, but note here. | ||
# Other RPC providers exist but so far we've seen their support is incomplete. | ||
HYPERLIQUID_MAINNET_WS_URL = "wss://api.hyperliquid.xyz/ws" | ||
HYPERLIQUID_TESTNET_WS_URL = "wss://api.hyperliquid-testnet.xyz/ws" | ||
|
||
|
||
class HyperliquidListener: | ||
""" | ||
Subscribe to any relevant Hyperliquid websocket streams | ||
See https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/api/websocket | ||
""" | ||
def __init__(self, config: dict, price_state: PriceState): | ||
self.market_symbol = config["hyperliquid"]["market_symbol"] | ||
url = TESTNET_API_URL if config["hyperliquid"].get("use_testnet", True) else MAINNET_API_URL | ||
self.info = Info(base_url=url) | ||
def __init__(self, config: Config, price_state: PriceState): | ||
self.hyperliquid_ws_urls = config.hyperliquid.hyperliquid_ws_urls | ||
self.market_symbol = config.hyperliquid.market_symbol | ||
self.price_state = price_state | ||
|
||
def subscribe(self): | ||
self.info.subscribe({"type": "activeAssetCtx", "coin": self.market_symbol}, self.on_activeAssetCtx) | ||
|
||
def on_activeAssetCtx(self, message): | ||
""" | ||
Parse oraclePx and markPx from perp context update | ||
|
||
:param message: activeAssetCtx websocket update message | ||
:return: None | ||
""" | ||
ctx = message["data"]["ctx"] | ||
self.price_state.hl_oracle_price = ctx["oraclePx"] | ||
self.price_state.hl_mark_price = ctx["markPx"] | ||
logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.hl_oracle_price, self.price_state.hl_mark_price) | ||
self.price_state.latest_hl_timestamp = time.time() | ||
def get_subscribe_request(self, asset): | ||
return { | ||
"method": "subscribe", | ||
"subscription": {"type": "activeAssetCtx", "coin": asset} | ||
} | ||
|
||
async def subscribe_all(self): | ||
await asyncio.gather(*(self.subscribe_single(hyperliquid_ws_url) for hyperliquid_ws_url in self.hyperliquid_ws_urls)) | ||
|
||
async def subscribe_single(self, url): | ||
while True: | ||
try: | ||
await self.subscribe_single_inner(url) | ||
except websockets.ConnectionClosed: | ||
logger.error("Connection to {} closed; retrying", url) | ||
except Exception as e: | ||
logger.exception("Error on {}: {}", url, e) | ||
|
||
async def subscribe_single_inner(self, url): | ||
async with websockets.connect(url) as ws: | ||
subscribe_request = self.get_subscribe_request(self.market_symbol) | ||
await ws.send(json.dumps(subscribe_request)) | ||
logger.info("Sent subscribe request to {}", url) | ||
|
||
# listen for updates | ||
async for message in ws: | ||
try: | ||
data = json.loads(message) | ||
channel = data.get("channel", None) | ||
if not channel: | ||
logger.error("No channel in message: {}", data) | ||
elif channel == "subscriptionResponse": | ||
logger.debug("Received subscription response: {}", data) | ||
elif channel == "error": | ||
logger.error("Received Hyperliquid error response: {}", data) | ||
elif channel == "activeAssetCtx": | ||
self.parse_hyperliquid_ws_message(data) | ||
else: | ||
logger.error("Received unknown channel: {}", channel) | ||
except json.JSONDecodeError as e: | ||
logger.error("Failed to decode JSON message: {} error: {}", message, e) | ||
|
||
def parse_hyperliquid_ws_message(self, message): | ||
try: | ||
ctx = message["data"]["ctx"] | ||
now = time.time() | ||
self.price_state.hl_oracle_price = PriceUpdate(ctx["oraclePx"], now) | ||
self.price_state.hl_mark_price = PriceUpdate(ctx["markPx"], now) | ||
logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.hl_oracle_price, | ||
self.price_state.hl_mark_price) | ||
except Exception as e: | ||
logger.error("parse_hyperliquid_ws_message error: message: {} e: {}", message, e) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,26 +1,33 @@ | ||
import boto3 | ||
from asn1crypto import core | ||
from cryptography.hazmat.primitives import serialization | ||
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature | ||
from eth_account.messages import encode_typed_data, _hash_eip191_message | ||
from eth_keys.backends.native.ecdsa import N as SECP256K1_N | ||
from eth_keys.datatypes import Signature | ||
from eth_utils import keccak, to_hex | ||
from hyperliquid.exchange import Exchange | ||
from hyperliquid.utils.constants import TESTNET_API_URL, MAINNET_API_URL | ||
from hyperliquid.utils.signing import get_timestamp_ms, action_hash, construct_phantom_agent, l1_payload | ||
from loguru import logger | ||
|
||
from config import Config | ||
|
||
SECP256K1_N_HALF = SECP256K1_N // 2 | ||
|
||
|
||
class KMSSigner: | ||
def __init__(self, key_id, aws_region_name, use_testnet): | ||
def __init__(self, config: Config): | ||
use_testnet = config.hyperliquid.use_testnet | ||
url = TESTNET_API_URL if use_testnet else MAINNET_API_URL | ||
self.oracle_publisher_exchange: Exchange = Exchange(wallet=None, base_url=url) | ||
self.client = self._init_client(config) | ||
|
||
self.key_id = key_id | ||
self.client = boto3.client("kms", region_name=aws_region_name) | ||
# Fetch public key once so we can derive address and check recovery id | ||
pub_der = self.client.get_public_key(KeyId=key_id)["PublicKey"] | ||
|
||
from cryptography.hazmat.primitives import serialization | ||
pub = serialization.load_der_public_key(pub_der) | ||
key_path = config.kms.key_path | ||
self.key_id = open(key_path, "r").read().strip() | ||
self.pubkey_der = self.client.get_public_key(KeyId=self.key_id)["PublicKey"] | ||
# Construct eth address to log | ||
pub = serialization.load_der_public_key(self.pubkey_der) | ||
numbers = pub.public_numbers() | ||
x = numbers.x.to_bytes(32, "big") | ||
y = numbers.y.to_bytes(32, "big") | ||
|
@@ -29,6 +36,22 @@ def __init__(self, key_id, aws_region_name, use_testnet): | |
self.address = "0x" + keccak(uncompressed[1:])[-20:].hex() | ||
logger.info("KMSSigner address: {}", self.address) | ||
|
||
def _init_client(self, config): | ||
aws_region_name = config.kms.aws_region_name | ||
access_key_id_path = config.kms.access_key_id_path | ||
access_key_id = open(access_key_id_path, "r").read().strip() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AWS credentials are loaded from files but not cleared from memory after use. Consider using secure memory handling or environment variables to reduce the risk of credential exposure in memory dumps. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||
secret_access_key_path = config.kms.secret_access_key_path | ||
secret_access_key = open(secret_access_key_path, "r").read().strip() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AWS credentials are loaded from files but not cleared from memory after use. Consider using secure memory handling or environment variables to reduce the risk of credential exposure in memory dumps. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||
|
||
return boto3.client( | ||
"kms", | ||
region_name=aws_region_name, | ||
aws_access_key_id=access_key_id, | ||
aws_secret_access_key=secret_access_key, | ||
# can specify an endpoint for e.g. LocalStack | ||
# endpoint_url="http://localhost:4566" | ||
) | ||
|
||
def set_oracle(self, dex, oracle_pxs, all_mark_pxs, external_perp_pxs): | ||
timestamp = get_timestamp_ms() | ||
oracle_pxs_wire = sorted(list(oracle_pxs.items())) | ||
|
@@ -60,34 +83,39 @@ def sign_l1_action(self, action, nonce, is_mainnet): | |
data = l1_payload(phantom_agent) | ||
structured_data = encode_typed_data(full_message=data) | ||
message_hash = _hash_eip191_message(structured_data) | ||
signed = self.sign_message(message_hash) | ||
return {"r": to_hex(signed["r"]), "s": to_hex(signed["s"]), "v": signed["v"]} | ||
return self.sign_message(message_hash) | ||
|
||
def sign_message(self, message_hash: bytes): | ||
def sign_message(self, message_hash: bytes) -> dict: | ||
# Send message hash to KMS for signing | ||
resp = self.client.sign( | ||
KeyId=self.key_id, | ||
Message=message_hash, | ||
MessageType="DIGEST", | ||
SigningAlgorithm="ECDSA_SHA_256", # required for secp256k1 | ||
) | ||
der_sig = resp["Signature"] | ||
|
||
seq = core.Sequence.load(der_sig) | ||
r = int(seq[0].native) | ||
s = int(seq[1].native) | ||
|
||
for recovery_id in (0, 1): | ||
candidate = Signature(vrs=(recovery_id, r, s)) | ||
pubkey = candidate.recover_public_key_from_msg_hash(message_hash) | ||
if pubkey.to_bytes() == self.public_key_bytes: | ||
v = recovery_id + 27 | ||
break | ||
else: | ||
raise ValueError("Failed to determine recovery id") | ||
|
||
return { | ||
"r": r, | ||
"s": s, | ||
"v": v, | ||
"signature": Signature(vrs=(v, r, s)).to_bytes().hex(), | ||
} | ||
kms_signature = resp["Signature"] | ||
# Decode the KMS DER signature -> (r, s) | ||
r, s = decode_dss_signature(kms_signature) | ||
# Ethereum requires low-s form | ||
if s > SECP256K1_N_HALF: | ||
s = SECP256K1_N - s | ||
# Parse KMS public key into uncompressed secp256k1 bytes | ||
bplatak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# TODO: Pull this into init | ||
pubkey = serialization.load_der_public_key(self.pubkey_der) | ||
pubkey_bytes = pubkey.public_bytes( | ||
serialization.Encoding.X962, | ||
serialization.PublicFormat.UncompressedPoint, | ||
) | ||
# Strip leading 0x04 (uncompressed point indicator) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there nothing in KMS that'll do this for us? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a bit over my head here, but I believe there are a couple things at play: KMS not providing this "recovery id" (v) as part of signing because in normal cryptography usage it's not needed, whereas here in crypto land we need it to determine the pubkey from two possible choices; and a quirk of the encoding (X9.62). If you dig in the eth libraries you'll see they have to strip this byte in various places. Is there a cleaner way to implement this function? Good question. |
||
raw_pubkey_bytes = pubkey_bytes[1:] | ||
# Try both recovery ids | ||
for v in (0, 1): | ||
sig_obj = Signature(vrs=(v, r, s)) | ||
recovered_pub = sig_obj.recover_public_key_from_msg_hash(message_hash) | ||
if recovered_pub.to_bytes() == raw_pubkey_bytes: | ||
return { | ||
"r": to_hex(r), | ||
"s": to_hex(s), | ||
"v": v + 27, | ||
} | ||
raise ValueError("Could not recover public key; signature mismatch") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing pydantic dependency which is required for the new Config classes. This will cause runtime import errors.
Copilot uses AI. Check for mistakes.