Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/hip-3-pusher/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --locked --no-dev

# Run the app by default
CMD ["uv", "run", "src/main.py", "--config", "config/config.toml"]
CMD ["uv", "run", "src/pusher/main.py", "--config", "config/config.toml"]
8 changes: 7 additions & 1 deletion apps/hip-3-pusher/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@ description = "Hyperliquid HIP-3 market oracle pusher"
readme = "README.md"
requires-python = "==3.13.*"
dependencies = [
"boto3~=1.40.34",
"boto3~=1.40.38",
"cryptography~=46.0.1",
"hyperliquid-python-sdk~=0.19.0",
"loguru~=0.7.3",
"opentelemetry-exporter-prometheus~=0.58b0",
"opentelemetry-sdk~=1.37.0",
"prometheus-client~=0.23.1",
"tenacity~=9.1.2",
"websockets~=15.0.1",
]

[dependency-groups]
dev = [
"pytest~=8.4.2",
]
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from pydantic import BaseModel

STALE_TIMEOUT_SECONDS = 5


class KMSConfig(BaseModel):
enable_kms: bool
Expand Down
2 changes: 2 additions & 0 deletions apps/hip-3-pusher/src/pusher/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class StaleConnection(Exception):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from loguru import logger
import time
import websockets
from tenacity import retry, retry_if_exception_type, wait_exponential

from config import Config
from price_state import PriceState, PriceUpdate
from pusher.config import Config, STALE_TIMEOUT_SECONDS
from pusher.exception import StaleConnection
from pusher.price_state import PriceState, PriceUpdate


class HermesListener:
Expand All @@ -31,14 +33,13 @@ def get_subscribe_request(self):
async def subscribe_all(self):
await asyncio.gather(*(self.subscribe_single(url) for url in self.hermes_urls))

@retry(
retry=retry_if_exception_type((StaleConnection, websockets.ConnectionClosed)),
wait=wait_exponential(multiplier=1, min=1, max=10),
reraise=True,
)
async def subscribe_single(self, url):
while True:
try:
await self.subscribe_single_inner(url)
except websockets.ConnectionClosed:
logger.error("Connection to {} closed; retrying", url)
except Exception as e:
logger.exception("Error on {}: {}", url, e)
return await self.subscribe_single_inner(url)

async def subscribe_single_inner(self, url):
async with websockets.connect(url) as ws:
Expand All @@ -48,12 +49,19 @@ async def subscribe_single_inner(self, url):
logger.info("Sent Hermes subscribe request to {}", url)

# listen for updates
async for message in ws:
while True:
try:
message = await asyncio.wait_for(ws.recv(), timeout=STALE_TIMEOUT_SECONDS)
data = json.loads(message)
self.parse_hermes_message(data)
except asyncio.TimeoutError:
raise StaleConnection(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
except websockets.ConnectionClosed:
raise
except json.JSONDecodeError as e:
logger.error("Failed to decode JSON message: {}", e)
except Exception as e:
logger.error("Unexpected exception: {}", e)

def parse_hermes_message(self, data):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
import json
import websockets
from loguru import logger
from tenacity import retry, retry_if_exception_type, wait_exponential
import time

from config import Config
from price_state import PriceState, PriceUpdate
from pusher.config import Config, STALE_TIMEOUT_SECONDS
from pusher.exception import StaleConnection
from pusher.price_state import PriceState, PriceUpdate

# This will be in config, but note here.
# Other RPC providers exist but so far we've seen their support is incomplete.
Expand All @@ -32,14 +34,13 @@ def get_subscribe_request(self, asset):
async def subscribe_all(self):
await asyncio.gather(*(self.subscribe_single(hyperliquid_ws_url) for hyperliquid_ws_url in self.hyperliquid_ws_urls))

@retry(
retry=retry_if_exception_type((StaleConnection, websockets.ConnectionClosed)),
wait=wait_exponential(multiplier=1, min=1, max=10),
reraise=True,
)
async def subscribe_single(self, url):
while True:
try:
await self.subscribe_single_inner(url)
except websockets.ConnectionClosed:
logger.error("Connection to {} closed; retrying", url)
except Exception as e:
logger.exception("Error on {}: {}", url, e)
return await self.subscribe_single_inner(url)

async def subscribe_single_inner(self, url):
async with websockets.connect(url) as ws:
Expand All @@ -48,8 +49,9 @@ async def subscribe_single_inner(self, url):
logger.info("Sent subscribe request to {}", url)

# listen for updates
async for message in ws:
while True:
try:
message = await asyncio.wait_for(ws.recv(), timeout=STALE_TIMEOUT_SECONDS)
data = json.loads(message)
channel = data.get("channel", None)
if not channel:
Expand All @@ -62,8 +64,14 @@ async def subscribe_single_inner(self, url):
self.parse_hyperliquid_ws_message(data)
else:
logger.error("Received unknown channel: {}", channel)
except asyncio.TimeoutError:
raise StaleConnection(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...")
except websockets.ConnectionClosed:
raise
except json.JSONDecodeError as e:
logger.error("Failed to decode JSON message: {} error: {}", message, e)
except Exception as e:
logger.error("Unexpected exception: {}", e)

def parse_hyperliquid_ws_message(self, message):
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from hyperliquid.utils.signing import get_timestamp_ms, action_hash, construct_phantom_agent, l1_payload
from loguru import logger

from config import Config
from pusher.config import Config

SECP256K1_N_HALF = SECP256K1_N // 2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
from loguru import logger
import time
import websockets
from tenacity import retry, retry_if_exception_type, wait_exponential

from config import Config
from price_state import PriceState, PriceUpdate
from pusher.config import Config, STALE_TIMEOUT_SECONDS
from pusher.exception import StaleConnection
from pusher.price_state import PriceState, PriceUpdate


class LazerListener:
Expand Down Expand Up @@ -35,14 +37,13 @@ def get_subscribe_request(self, subscription_id: int):
async def subscribe_all(self):
await asyncio.gather(*(self.subscribe_single(router_url) for router_url in self.lazer_urls))

@retry(
retry=retry_if_exception_type((StaleConnection, websockets.ConnectionClosed)),
wait=wait_exponential(multiplier=1, min=1, max=10),
reraise=True,
)
async def subscribe_single(self, router_url):
while True:
try:
await self.subscribe_single_inner(router_url)
except websockets.ConnectionClosed:
logger.error("Connection to {} closed; retrying", router_url)
except Exception as e:
logger.exception("Error on {}: {}", router_url, e)
return await self.subscribe_single_inner(router_url)

async def subscribe_single_inner(self, router_url):
headers = {
Expand All @@ -56,12 +57,19 @@ async def subscribe_single_inner(self, router_url):
logger.info("Sent Lazer subscribe request to {}", router_url)

# listen for updates
async for message in ws:
while True:
try:
message = await asyncio.wait_for(ws.recv(), timeout=STALE_TIMEOUT_SECONDS)
data = json.loads(message)
self.parse_lazer_message(data)
except asyncio.TimeoutError:
raise StaleConnection(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
except websockets.ConnectionClosed:
raise
except json.JSONDecodeError as e:
logger.error("Failed to decode JSON message: {}", e)
except Exception as e:
logger.error("Unexpected exception: {}", e)

def parse_lazer_message(self, data):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
import sys
import tomllib

from config import Config
from hyperliquid_listener import HyperliquidListener
from lazer_listener import LazerListener
from hermes_listener import HermesListener
from price_state import PriceState
from publisher import Publisher
from metrics import Metrics
from pusher.config import Config
from pusher.hyperliquid_listener import HyperliquidListener
from pusher.lazer_listener import LazerListener
from pusher.hermes_listener import HermesListener
from pusher.price_state import PriceState
from pusher.publisher import Publisher
from pusher.metrics import Metrics


def load_config():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from opentelemetry.metrics import get_meter_provider, set_meter_provider
from opentelemetry.sdk.metrics import MeterProvider

from config import Config
from pusher.config import Config

METER_NAME = "hip3pusher"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from loguru import logger
import time

from config import Config
from pusher.config import Config

DEFAULT_STALE_PRICE_THRESHOLD_SECONDS = 5

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import asyncio

from loguru import logger

from eth_account import Account
from eth_account.signers.local import LocalAccount
from hyperliquid.exchange import Exchange
from hyperliquid.utils.constants import TESTNET_API_URL, MAINNET_API_URL

from config import Config
from kms_signer import KMSSigner
from metrics import Metrics
from price_state import PriceState
from pusher.config import Config
from pusher.kms_signer import KMSSigner
from pusher.metrics import Metrics
from pusher.price_state import PriceState


class Publisher:
Expand Down
70 changes: 70 additions & 0 deletions apps/hip-3-pusher/tests/test_price_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import time

from pusher.config import Config, LazerConfig, HermesConfig
from pusher.price_state import PriceState, PriceUpdate


def get_config():
config: Config = Config.model_construct()
config.stale_price_threshold_seconds = 5
config.lazer = LazerConfig.model_construct()
config.lazer.base_feed_exponent = -8
config.lazer.quote_feed_exponent = -8
config.hermes = HermesConfig.model_construct()
config.hermes.base_feed_exponent = -8
config.hermes.quote_feed_exponent = -8
return config


def test_good_hl_price():
config = get_config()
price_state = PriceState(config)
now = time.time()
price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds / 2.0)

oracle_px = price_state.get_current_oracle_price()
assert oracle_px == price_state.hl_oracle_price.price
assert oracle_px == "110000.0"



def test_fallback_lazer():
config = get_config()
price_state = PriceState(config)
now = time.time()
price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0)
price_state.lazer_base_price = PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds / 2.0)
price_state.lazer_quote_price = PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0)

oracle_px = price_state.get_current_oracle_price()
assert oracle_px == price_state.get_lazer_price()
assert oracle_px == "111616.16"



def test_fallback_hermes():
config = get_config()
price_state = PriceState(config)
now = time.time()
price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0)
price_state.lazer_base_price = PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds - 1.0)
price_state.lazer_quote_price = PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds / 2.0)
price_state.hermes_base_price = PriceUpdate("11100000000000", now - price_state.stale_price_threshold_seconds / 2.0)
price_state.hermes_quote_price = PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds / 2.0)

oracle_px = price_state.get_current_oracle_price()
assert oracle_px == price_state.get_hermes_price()
assert oracle_px == "113265.31"


def test_all_fail():
config = get_config()
price_state = PriceState(config)
now = time.time()
price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0)
price_state.hl_oracle_price = PriceUpdate("110000.0", now - price_state.stale_price_threshold_seconds - 1.0)
price_state.lazer_base_price = PriceUpdate("11050000000000", now - price_state.stale_price_threshold_seconds - 1.0)
price_state.lazer_quote_price = PriceUpdate("99000000", now - price_state.stale_price_threshold_seconds - 1.0)
price_state.hermes_base_price = PriceUpdate("11100000000000", now - price_state.stale_price_threshold_seconds - 1.0)
price_state.hermes_quote_price = PriceUpdate("98000000", now - price_state.stale_price_threshold_seconds - 1.0)
assert price_state.get_current_oracle_price() is None
Loading