-
Notifications
You must be signed in to change notification settings - Fork 299
feat: HIP-3 oracle pusher #3060
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2896e43
462179a
e8a255d
a57a941
66ad097
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
name: Build and Push hip-3-pusher Image | ||
on: | ||
push: | ||
tags: | ||
- hip-3-pusher-v* | ||
pull_request: | ||
paths: | ||
- "apps/hip-3-pusher/**" | ||
workflow_dispatch: | ||
inputs: | ||
dispatch_description: | ||
description: "Dispatch description" | ||
required: true | ||
type: string | ||
permissions: | ||
contents: read | ||
id-token: write | ||
packages: write | ||
env: | ||
REGISTRY: ghcr.io | ||
IMAGE_NAME: pyth-network/hip-3-pusher | ||
jobs: | ||
hip-3-pusher-image: | ||
runs-on: ubuntu-latest | ||
steps: | ||
- uses: actions/checkout@v2 | ||
- name: Set image tag to version of the git tag | ||
if: ${{ startsWith(github.ref, 'refs/tags/hip-3-pusher-v') }} | ||
run: | | ||
PREFIX="refs/tags/hip-3-pusher-" | ||
VERSION="${GITHUB_REF:${#PREFIX}}" | ||
echo "IMAGE_TAG=${VERSION}" >> "${GITHUB_ENV}" | ||
- name: Set image tag to the git commit hash | ||
if: ${{ !startsWith(github.ref, 'refs/tags/hip-3-pusher-v') }} | ||
run: | | ||
echo "IMAGE_TAG=${{ github.sha }}" >> "${GITHUB_ENV}" | ||
- name: Log in to the Container registry | ||
uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1 | ||
with: | ||
registry: ${{ env.REGISTRY }} | ||
username: ${{ github.actor }} | ||
password: ${{ secrets.GITHUB_TOKEN }} | ||
- name: Extract metadata (tags, labels) for Docker | ||
id: metadata_hip_3_pusher | ||
uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7 | ||
with: | ||
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} | ||
- name: Build and push server docker image | ||
uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4 | ||
with: | ||
context: . | ||
file: "./apps/hip-3-pusher/Dockerfile" | ||
push: ${{ github.event_name != 'pull_request' }} | ||
tags: ${{ steps.metadata_hip_3_pusher.outputs.tags }} | ||
labels: ${{ steps.metadata_hip_3_pusher.outputs.labels }} |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Intent is for the config.toml in git to be an example. Locally I use separately named config files, and of course k8s mounts our deployment configs. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
# Python-generated files | ||
__pycache__/ | ||
*.py[oc] | ||
build/ | ||
dist/ | ||
wheels/ | ||
*.egg-info | ||
|
||
# Virtual environments | ||
.venv |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
# Use a Python image with uv pre-installed | ||
FROM ghcr.io/astral-sh/uv:python3.13-trixie-slim | ||
|
||
# Install the project into `/app` | ||
WORKDIR /app | ||
|
||
# Enable bytecode compilation | ||
ENV UV_COMPILE_BYTECODE=1 | ||
|
||
# Copy from the cache instead of linking since it's a mounted volume | ||
ENV UV_LINK_MODE=copy | ||
|
||
# Ensure installed tools can be executed out of the box | ||
ENV UV_TOOL_BIN_DIR=/usr/local/bin | ||
|
||
COPY apps/hip-3-pusher/uv.lock . | ||
COPY apps/hip-3-pusher/pyproject.toml . | ||
|
||
# Install the project's dependencies using the lockfile and settings | ||
RUN --mount=type=cache,target=/root/.cache/uv \ | ||
--mount=type=bind,source=apps/hip-3-pusher/uv.lock,target=uv.lock \ | ||
--mount=type=bind,source=apps/hip-3-pusher/pyproject.toml,target=pyproject.toml \ | ||
uv sync --locked --no-install-project --no-dev | ||
|
||
# Then, add the rest of the project source code and install it | ||
# Installing separately from its dependencies allows optimal layer caching | ||
COPY apps/hip-3-pusher/src/ ./src/ | ||
COPY apps/hip-3-pusher/config/ ./config/ | ||
RUN --mount=type=cache,target=/root/.cache/uv \ | ||
uv sync --locked --no-dev | ||
Comment on lines
+19
to
+30
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great that we have proper layer caching :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that was courtesy of the uv docker example. Astral is crushing it. |
||
|
||
# Run the app by default | ||
CMD ["uv", "run", "src/main.py", "--config", "config/config.toml"] |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pls dont forget to add a readme! |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
[hyperliquid] | ||
market_name = "" | ||
market_symbol = "BTC" | ||
use_testnet = false | ||
oracle_pusher_key_path = "/path/to/oracle_pusher_key.txt" | ||
publish_interval = 3.0 | ||
enable_publish = false | ||
|
||
[kms] | ||
enable_kms = false | ||
key_path = "/path/to/kms_key.txt" | ||
aws_region_name = "ap-northeast-1" | ||
|
||
[lazer] | ||
router_urls = ["wss://pyth-lazer-0.dourolabs.app/v1/stream", "wss://pyth-lazer-1.dourolabs.app/v1/stream"] | ||
api_key = "lazer_api_key" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. consider loading this from the env There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mean env var or from a 1pw secret? |
||
base_feed_id = 1 # BTC | ||
quote_feed_id = 8 # USDT | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the quote asset used for? |
||
|
||
[hermes] | ||
urls = ["wss://hermes.pyth.network/ws"] | ||
base_id = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43" # BTC | ||
quote_id = "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b" # USDT |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
[project] | ||
name = "hip-3-pusher" | ||
version = "0.1.0" | ||
description = "Add your description here" | ||
merolish marked this conversation as resolved.
Show resolved
Hide resolved
|
||
readme = "README.md" | ||
requires-python = ">=3.13" | ||
dependencies = [ | ||
"asn1crypto>=1.5.1", | ||
"boto3>=1.40.31", | ||
"cryptography>=45.0.7", | ||
"hyperliquid-python-sdk>=0.19.0", | ||
"loguru>=0.7.3", | ||
"toml>=0.10.2", | ||
"websockets>=15.0.1", | ||
] | ||
Comment on lines
+7
to
+15
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using >= opens us to to breaking major version changes. Let's use ^ (minor version updates allowed) or ~= (patch version updates allowed). |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
import asyncio | ||
import json | ||
from loguru import logger | ||
import websockets | ||
|
||
from price_state import PriceState | ||
|
||
|
||
class HermesListener: | ||
""" | ||
Subscribe to Hermes price updates for needed feeds. | ||
TODO: Will need to handle specific conversions/factors and exponents. | ||
""" | ||
def __init__(self, config, price_state: PriceState): | ||
self.urls = config["hermes"]["urls"] | ||
self.base_id = config["hermes"]["base_id"] | ||
self.quote_id = config["hermes"]["quote_id"] | ||
self.price_state = price_state | ||
|
||
def get_subscribe_request(self): | ||
return { | ||
"type": "subscribe", | ||
"ids": [self.base_id, self.quote_id], | ||
"verbose": False, | ||
"binary": True, | ||
"allow_out_of_order": False, | ||
"ignore_invalid_price_ids": False, | ||
} | ||
|
||
async def subscribe_all(self): | ||
await asyncio.gather(*(self.subscribe_single(url) for url in self.urls)) | ||
|
||
async def subscribe_single(self, url): | ||
while True: | ||
try: | ||
await self.subscribe_single_inner(url) | ||
except websockets.ConnectionClosed: | ||
logger.error("Connection to {} closed; retrying", url) | ||
except Exception as e: | ||
logger.exception("Error on {}: {}", url, e) | ||
|
||
async def subscribe_single_inner(self, url): | ||
async with websockets.connect(url) as ws: | ||
subscribe_request = self.get_subscribe_request() | ||
|
||
await ws.send(json.dumps(subscribe_request)) | ||
logger.info("Sent Hermes subscribe request to {}", url) | ||
|
||
# listen for updates | ||
async for message in ws: | ||
try: | ||
data = json.loads(message) | ||
self.parse_hermes_message(data) | ||
except json.JSONDecodeError as e: | ||
logger.error("Failed to decode JSON message: {}", e) | ||
|
||
def parse_hermes_message(self, data): | ||
""" | ||
For now, simply insert received prices into price_state | ||
|
||
:param data: Hermes price update json message | ||
:return: None (update price_state) | ||
""" | ||
try: | ||
if data.get("type", "") != "price_update": | ||
return | ||
price_feed = data["price_feed"] | ||
id = price_feed["id"] | ||
price_object = data["price_feed"]["price"] | ||
price = price_object["price"] | ||
expo = price_object["expo"] | ||
publish_time = price_object["publish_time"] | ||
logger.debug("Hermes update: {} {} {} {}", id, price, expo, publish_time) | ||
if id == self.base_id: | ||
self.price_state.hermes_base_price = price | ||
if id == self.quote_id: | ||
self.price_state.hermes_quote_price = price | ||
except Exception as e: | ||
logger.error("parse_hermes_message error: {}", e) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
from loguru import logger | ||
|
||
from hyperliquid.info import Info | ||
from hyperliquid.utils.constants import TESTNET_API_URL, MAINNET_API_URL | ||
|
||
from price_state import PriceState | ||
|
||
|
||
class HyperliquidListener: | ||
""" | ||
Subscribe to any relevant Hyperliquid websocket streams | ||
See https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/api/websocket | ||
""" | ||
def __init__(self, config: dict, price_state: PriceState): | ||
self.market_symbol = config["hyperliquid"]["market_symbol"] | ||
url = TESTNET_API_URL if config["hyperliquid"].get("use_testnet", True) else MAINNET_API_URL | ||
self.info = Info(base_url=url) | ||
self.price_state = price_state | ||
|
||
def subscribe(self): | ||
self.info.subscribe({"type": "activeAssetCtx", "coin": self.market_symbol}, self.on_activeAssetCtx) | ||
Comment on lines
+20
to
+21
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Needs the retry wrapper that exists for the Lazer and Hermes feeds. It might be worthwhile to extract all that out into a LIstener superclass. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, I think I need to scrap the sdk here and handle the connect/subscription manually, don't think this sdk function is resilient or even gives feedback at all. |
||
|
||
def on_activeAssetCtx(self, message): | ||
""" | ||
Parse oraclePx and markPx from perp context update | ||
|
||
:param message: activeAssetCtx websocket update message | ||
:return: None | ||
""" | ||
ctx = message["data"]["ctx"] | ||
self.price_state.latest_oracle_price = ctx["oraclePx"] | ||
self.price_state.latest_mark_price = ctx["markPx"] | ||
Comment on lines
+31
to
+32
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should also store the timestamp so we know how stale the price is in case the data source disconnects and have to serve stale prices. It could also inform fallback behavior (it can choose the most recent price from the backup data sources.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nvm, i see you do this in #3061 ✅ |
||
logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.latest_oracle_price, self.price_state.latest_mark_price) |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will hold off on reviewing this file since its WIP There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately yes, hopefully I can get it to match, it's unfortunately a bit of reverse engineering both KMS and the sdk and eth libraries. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
import boto3 | ||
from asn1crypto import core | ||
from eth_account.messages import encode_typed_data, _hash_eip191_message | ||
from eth_keys.datatypes import Signature | ||
from eth_utils import keccak, to_hex | ||
from hyperliquid.exchange import Exchange | ||
from hyperliquid.utils.constants import TESTNET_API_URL, MAINNET_API_URL | ||
from hyperliquid.utils.signing import get_timestamp_ms, action_hash, construct_phantom_agent, l1_payload | ||
from loguru import logger | ||
|
||
|
||
class KMSSigner: | ||
def __init__(self, key_id, aws_region_name, use_testnet): | ||
url = TESTNET_API_URL if use_testnet else MAINNET_API_URL | ||
self.oracle_publisher_exchange: Exchange = Exchange(wallet=None, base_url=url) | ||
|
||
self.key_id = key_id | ||
self.client = boto3.client("kms", region_name=aws_region_name) | ||
# Fetch public key once so we can derive address and check recovery id | ||
pub_der = self.client.get_public_key(KeyId=key_id)["PublicKey"] | ||
|
||
from cryptography.hazmat.primitives import serialization | ||
pub = serialization.load_der_public_key(pub_der) | ||
numbers = pub.public_numbers() | ||
x = numbers.x.to_bytes(32, "big") | ||
y = numbers.y.to_bytes(32, "big") | ||
uncompressed = b"\x04" + x + y | ||
self.public_key_bytes = uncompressed | ||
self.address = "0x" + keccak(uncompressed[1:])[-20:].hex() | ||
logger.info("KMSSigner address: {}", self.address) | ||
|
||
def set_oracle(self, dex, oracle_pxs, all_mark_pxs, external_perp_pxs): | ||
timestamp = get_timestamp_ms() | ||
oracle_pxs_wire = sorted(list(oracle_pxs.items())) | ||
mark_pxs_wire = [sorted(list(mark_pxs.items())) for mark_pxs in all_mark_pxs] | ||
external_perp_pxs_wire = sorted(list(external_perp_pxs.items())) | ||
action = { | ||
"type": "perpDeploy", | ||
"setOracle": { | ||
"dex": dex, | ||
"oraclePxs": oracle_pxs_wire, | ||
"markPxs": mark_pxs_wire, | ||
"externalPerpPxs": external_perp_pxs_wire, | ||
}, | ||
} | ||
signature = self.sign_l1_action( | ||
action, | ||
timestamp, | ||
self.oracle_publisher_exchange.base_url == MAINNET_API_URL, | ||
) | ||
return self.oracle_publisher_exchange._post_action( | ||
action, | ||
signature, | ||
timestamp, | ||
) | ||
|
||
def sign_l1_action(self, action, nonce, is_mainnet): | ||
hash = action_hash(action, vault_address=None, nonce=nonce, expires_after=None) | ||
phantom_agent = construct_phantom_agent(hash, is_mainnet) | ||
data = l1_payload(phantom_agent) | ||
structured_data = encode_typed_data(full_message=data) | ||
message_hash = _hash_eip191_message(structured_data) | ||
signed = self.sign_message(message_hash) | ||
return {"r": to_hex(signed["r"]), "s": to_hex(signed["s"]), "v": signed["v"]} | ||
|
||
def sign_message(self, message_hash: bytes): | ||
resp = self.client.sign( | ||
KeyId=self.key_id, | ||
Message=message_hash, | ||
MessageType="DIGEST", | ||
SigningAlgorithm="ECDSA_SHA_256", # required for secp256k1 | ||
) | ||
der_sig = resp["Signature"] | ||
|
||
seq = core.Sequence.load(der_sig) | ||
r = int(seq[0].native) | ||
s = int(seq[1].native) | ||
|
||
for recovery_id in (0, 1): | ||
candidate = Signature(vrs=(recovery_id, r, s)) | ||
pubkey = candidate.recover_public_key_from_msg_hash(message_hash) | ||
if pubkey.to_bytes() == self.public_key_bytes: | ||
v = recovery_id + 27 | ||
break | ||
else: | ||
raise ValueError("Failed to determine recovery id") | ||
|
||
return { | ||
"r": r, | ||
"s": s, | ||
"v": v, | ||
"signature": Signature(vrs=(v, r, s)).to_bytes().hex(), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably don't want to build and push whenever a PR is opened/updated. Would remove this trigger
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The push part actually doesn't happen on PRs for whatever reason. Somebody on the team explained it to me at some point when I was working on agent.