Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions .github/workflows/docker-hip-3-pusher.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
name: Build and Push hip-3-pusher Image
on:
push:
tags:
- hip-3-pusher-v*
pull_request:
paths:
- "apps/hip-3-pusher/**"
Comment on lines +6 to +8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably don't want to build and push whenever a PR is opened/updated. Would remove this trigger

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The push part actually doesn't happen on PRs for whatever reason. Somebody on the team explained it to me at some point when I was working on agent.

workflow_dispatch:
inputs:
dispatch_description:
description: "Dispatch description"
required: true
type: string
permissions:
contents: read
id-token: write
packages: write
env:
REGISTRY: ghcr.io
IMAGE_NAME: pyth-network/hip-3-pusher
jobs:
hip-3-pusher-image:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set image tag to version of the git tag
if: ${{ startsWith(github.ref, 'refs/tags/hip-3-pusher-v') }}
run: |
PREFIX="refs/tags/hip-3-pusher-"
VERSION="${GITHUB_REF:${#PREFIX}}"
echo "IMAGE_TAG=${VERSION}" >> "${GITHUB_ENV}"
- name: Set image tag to the git commit hash
if: ${{ !startsWith(github.ref, 'refs/tags/hip-3-pusher-v') }}
run: |
echo "IMAGE_TAG=${{ github.sha }}" >> "${GITHUB_ENV}"
- name: Log in to the Container registry
uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata (tags, labels) for Docker
id: metadata_hip_3_pusher
uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
- name: Build and push server docker image
uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4
with:
context: .
file: "./apps/hip-3-pusher/Dockerfile"
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.metadata_hip_3_pusher.outputs.tags }}
labels: ${{ steps.metadata_hip_3_pusher.outputs.labels }}
10 changes: 10 additions & 0 deletions apps/hip-3-pusher/.gitignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add **/config.toml to avoid accidentally pushing secrets

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intent is for the config.toml in git to be an example. Locally I use separately named config files, and of course k8s mounts our deployment configs.

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info

# Virtual environments
.venv
33 changes: 33 additions & 0 deletions apps/hip-3-pusher/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Use a Python image with uv pre-installed
FROM ghcr.io/astral-sh/uv:python3.13-trixie-slim

# Install the project into `/app`
WORKDIR /app

# Enable bytecode compilation
ENV UV_COMPILE_BYTECODE=1

# Copy from the cache instead of linking since it's a mounted volume
ENV UV_LINK_MODE=copy

# Ensure installed tools can be executed out of the box
ENV UV_TOOL_BIN_DIR=/usr/local/bin

COPY apps/hip-3-pusher/uv.lock .
COPY apps/hip-3-pusher/pyproject.toml .

# Install the project's dependencies using the lockfile and settings
RUN --mount=type=cache,target=/root/.cache/uv \
--mount=type=bind,source=apps/hip-3-pusher/uv.lock,target=uv.lock \
--mount=type=bind,source=apps/hip-3-pusher/pyproject.toml,target=pyproject.toml \
uv sync --locked --no-install-project --no-dev

# Then, add the rest of the project source code and install it
# Installing separately from its dependencies allows optimal layer caching
COPY apps/hip-3-pusher/src/ ./src/
COPY apps/hip-3-pusher/config/ ./config/
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --locked --no-dev
Comment on lines +19 to +30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great that we have proper layer caching :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that was courtesy of the uv docker example. Astral is crushing it.


# Run the app by default
CMD ["uv", "run", "src/main.py", "--config", "config/config.toml"]
Empty file added apps/hip-3-pusher/README.md
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls dont forget to add a readme!

Empty file.
23 changes: 23 additions & 0 deletions apps/hip-3-pusher/config/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[hyperliquid]
market_name = ""
market_symbol = "BTC"
use_testnet = false
oracle_pusher_key_path = "/path/to/oracle_pusher_key.txt"
publish_interval = 3.0
enable_publish = false

[kms]
enable_kms = false
key_path = "/path/to/kms_key.txt"
aws_region_name = "ap-northeast-1"

[lazer]
router_urls = ["wss://pyth-lazer-0.dourolabs.app/v1/stream", "wss://pyth-lazer-1.dourolabs.app/v1/stream"]
api_key = "lazer_api_key"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider loading this from the env

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean env var or from a 1pw secret?

base_feed_id = 1 # BTC
quote_feed_id = 8 # USDT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the quote asset used for?


[hermes]
urls = ["wss://hermes.pyth.network/ws"]
base_id = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43" # BTC
quote_id = "2b89b9dc8fdf9f34709a5b106b472f0f39bb6ca9ce04b0fd7f2e971688e2e53b" # USDT
15 changes: 15 additions & 0 deletions apps/hip-3-pusher/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[project]
name = "hip-3-pusher"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"asn1crypto>=1.5.1",
"boto3>=1.40.31",
"cryptography>=45.0.7",
"hyperliquid-python-sdk>=0.19.0",
"loguru>=0.7.3",
"toml>=0.10.2",
"websockets>=15.0.1",
]
Comment on lines +7 to +15
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using >= opens us to to breaking major version changes. Let's use ^ (minor version updates allowed) or ~= (patch version updates allowed).

79 changes: 79 additions & 0 deletions apps/hip-3-pusher/src/hermes_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import asyncio
import json
from loguru import logger
import websockets

from price_state import PriceState


class HermesListener:
"""
Subscribe to Hermes price updates for needed feeds.
TODO: Will need to handle specific conversions/factors and exponents.
"""
def __init__(self, config, price_state: PriceState):
self.urls = config["hermes"]["urls"]
self.base_id = config["hermes"]["base_id"]
self.quote_id = config["hermes"]["quote_id"]
self.price_state = price_state

def get_subscribe_request(self):
return {
"type": "subscribe",
"ids": [self.base_id, self.quote_id],
"verbose": False,
"binary": True,
"allow_out_of_order": False,
"ignore_invalid_price_ids": False,
}

async def subscribe_all(self):
await asyncio.gather(*(self.subscribe_single(url) for url in self.urls))

async def subscribe_single(self, url):
while True:
try:
await self.subscribe_single_inner(url)
except websockets.ConnectionClosed:
logger.error("Connection to {} closed; retrying", url)
except Exception as e:
logger.exception("Error on {}: {}", url, e)

async def subscribe_single_inner(self, url):
async with websockets.connect(url) as ws:
subscribe_request = self.get_subscribe_request()

await ws.send(json.dumps(subscribe_request))
logger.info("Sent Hermes subscribe request to {}", url)

# listen for updates
async for message in ws:
try:
data = json.loads(message)
self.parse_hermes_message(data)
except json.JSONDecodeError as e:
logger.error("Failed to decode JSON message: {}", e)

def parse_hermes_message(self, data):
"""
For now, simply insert received prices into price_state

:param data: Hermes price update json message
:return: None (update price_state)
"""
try:
if data.get("type", "") != "price_update":
return
price_feed = data["price_feed"]
id = price_feed["id"]
price_object = data["price_feed"]["price"]
price = price_object["price"]
expo = price_object["expo"]
publish_time = price_object["publish_time"]
logger.debug("Hermes update: {} {} {} {}", id, price, expo, publish_time)
if id == self.base_id:
self.price_state.hermes_base_price = price
if id == self.quote_id:
self.price_state.hermes_quote_price = price
except Exception as e:
logger.error("parse_hermes_message error: {}", e)
33 changes: 33 additions & 0 deletions apps/hip-3-pusher/src/hyperliquid_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from loguru import logger

from hyperliquid.info import Info
from hyperliquid.utils.constants import TESTNET_API_URL, MAINNET_API_URL

from price_state import PriceState


class HyperliquidListener:
"""
Subscribe to any relevant Hyperliquid websocket streams
See https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/api/websocket
"""
def __init__(self, config: dict, price_state: PriceState):
self.market_symbol = config["hyperliquid"]["market_symbol"]
url = TESTNET_API_URL if config["hyperliquid"].get("use_testnet", True) else MAINNET_API_URL
self.info = Info(base_url=url)
self.price_state = price_state

def subscribe(self):
self.info.subscribe({"type": "activeAssetCtx", "coin": self.market_symbol}, self.on_activeAssetCtx)
Comment on lines +20 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs the retry wrapper that exists for the Lazer and Hermes feeds. It might be worthwhile to extract all that out into a LIstener superclass.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I think I need to scrap the sdk here and handle the connect/subscription manually, don't think this sdk function is resilient or even gives feedback at all.


def on_activeAssetCtx(self, message):
"""
Parse oraclePx and markPx from perp context update

:param message: activeAssetCtx websocket update message
:return: None
"""
ctx = message["data"]["ctx"]
self.price_state.latest_oracle_price = ctx["oraclePx"]
self.price_state.latest_mark_price = ctx["markPx"]
Comment on lines +31 to +32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also store the timestamp so we know how stale the price is in case the data source disconnects and have to serve stale prices. It could also inform fallback behavior (it can choose the most recent price from the backup data sources.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm, i see you do this in #3061

logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.latest_oracle_price, self.price_state.latest_mark_price)
93 changes: 93 additions & 0 deletions apps/hip-3-pusher/src/kms_signer.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will hold off on reviewing this file since its WIP

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately yes, hopefully I can get it to match, it's unfortunately a bit of reverse engineering both KMS and the sdk and eth libraries.

Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import boto3
from asn1crypto import core
from eth_account.messages import encode_typed_data, _hash_eip191_message
from eth_keys.datatypes import Signature
from eth_utils import keccak, to_hex
from hyperliquid.exchange import Exchange
from hyperliquid.utils.constants import TESTNET_API_URL, MAINNET_API_URL
from hyperliquid.utils.signing import get_timestamp_ms, action_hash, construct_phantom_agent, l1_payload
from loguru import logger


class KMSSigner:
def __init__(self, key_id, aws_region_name, use_testnet):
url = TESTNET_API_URL if use_testnet else MAINNET_API_URL
self.oracle_publisher_exchange: Exchange = Exchange(wallet=None, base_url=url)

self.key_id = key_id
self.client = boto3.client("kms", region_name=aws_region_name)
# Fetch public key once so we can derive address and check recovery id
pub_der = self.client.get_public_key(KeyId=key_id)["PublicKey"]

from cryptography.hazmat.primitives import serialization
pub = serialization.load_der_public_key(pub_der)
numbers = pub.public_numbers()
x = numbers.x.to_bytes(32, "big")
y = numbers.y.to_bytes(32, "big")
uncompressed = b"\x04" + x + y
self.public_key_bytes = uncompressed
self.address = "0x" + keccak(uncompressed[1:])[-20:].hex()
logger.info("KMSSigner address: {}", self.address)

def set_oracle(self, dex, oracle_pxs, all_mark_pxs, external_perp_pxs):
timestamp = get_timestamp_ms()
oracle_pxs_wire = sorted(list(oracle_pxs.items()))
mark_pxs_wire = [sorted(list(mark_pxs.items())) for mark_pxs in all_mark_pxs]
external_perp_pxs_wire = sorted(list(external_perp_pxs.items()))
action = {
"type": "perpDeploy",
"setOracle": {
"dex": dex,
"oraclePxs": oracle_pxs_wire,
"markPxs": mark_pxs_wire,
"externalPerpPxs": external_perp_pxs_wire,
},
}
signature = self.sign_l1_action(
action,
timestamp,
self.oracle_publisher_exchange.base_url == MAINNET_API_URL,
)
return self.oracle_publisher_exchange._post_action(
action,
signature,
timestamp,
)

def sign_l1_action(self, action, nonce, is_mainnet):
hash = action_hash(action, vault_address=None, nonce=nonce, expires_after=None)
phantom_agent = construct_phantom_agent(hash, is_mainnet)
data = l1_payload(phantom_agent)
structured_data = encode_typed_data(full_message=data)
message_hash = _hash_eip191_message(structured_data)
signed = self.sign_message(message_hash)
return {"r": to_hex(signed["r"]), "s": to_hex(signed["s"]), "v": signed["v"]}

def sign_message(self, message_hash: bytes):
resp = self.client.sign(
KeyId=self.key_id,
Message=message_hash,
MessageType="DIGEST",
SigningAlgorithm="ECDSA_SHA_256", # required for secp256k1
)
der_sig = resp["Signature"]

seq = core.Sequence.load(der_sig)
r = int(seq[0].native)
s = int(seq[1].native)

for recovery_id in (0, 1):
candidate = Signature(vrs=(recovery_id, r, s))
pubkey = candidate.recover_public_key_from_msg_hash(message_hash)
if pubkey.to_bytes() == self.public_key_bytes:
v = recovery_id + 27
break
else:
raise ValueError("Failed to determine recovery id")

return {
"r": r,
"s": s,
"v": v,
"signature": Signature(vrs=(v, r, s)).to_bytes().hex(),
}
Loading