|
| 1 | +import asyncio |
| 2 | +import json |
| 3 | +import websockets |
1 | 4 | from loguru import logger
|
2 | 5 | import time
|
3 | 6 |
|
4 |
| -from hyperliquid.info import Info |
5 |
| -from hyperliquid.utils.constants import TESTNET_API_URL, MAINNET_API_URL |
6 |
| - |
| 7 | +from config import Config |
7 | 8 | from price_state import PriceState
|
8 | 9 |
|
| 10 | +# This will be in config, but note here. |
| 11 | +# Other RPC providers exist but so far we've seen their support is incomplete. |
| 12 | +HYPERLIQUID_MAINNET_WS_URL = "wss://api.hyperliquid.xyz/ws" |
| 13 | +HYPERLIQUID_TESTNET_WS_URL = "wss://api.hyperliquid-testnet.xyz/ws" |
| 14 | + |
9 | 15 |
|
10 | 16 | class HyperliquidListener:
|
11 | 17 | """
|
12 | 18 | Subscribe to any relevant Hyperliquid websocket streams
|
13 | 19 | See https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/api/websocket
|
14 | 20 | """
|
15 |
| - def __init__(self, config: dict, price_state: PriceState): |
16 |
| - self.market_symbol = config["hyperliquid"]["market_symbol"] |
17 |
| - url = TESTNET_API_URL if config["hyperliquid"].get("use_testnet", True) else MAINNET_API_URL |
18 |
| - self.info = Info(base_url=url) |
| 21 | + def __init__(self, config: Config, price_state: PriceState): |
| 22 | + self.hyperliquid_ws_urls = config.hyperliquid.hyperliquid_ws_urls |
| 23 | + self.market_symbol = config.hyperliquid.market_symbol |
19 | 24 | self.price_state = price_state
|
20 | 25 |
|
21 |
| - def subscribe(self): |
22 |
| - self.info.subscribe({"type": "activeAssetCtx", "coin": self.market_symbol}, self.on_activeAssetCtx) |
23 |
| - |
24 |
| - def on_activeAssetCtx(self, message): |
25 |
| - """ |
26 |
| - Parse oraclePx and markPx from perp context update |
27 |
| -
|
28 |
| - :param message: activeAssetCtx websocket update message |
29 |
| - :return: None |
30 |
| - """ |
31 |
| - ctx = message["data"]["ctx"] |
32 |
| - self.price_state.hl_oracle_price = ctx["oraclePx"] |
33 |
| - self.price_state.hl_mark_price = ctx["markPx"] |
34 |
| - logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.hl_oracle_price, self.price_state.hl_mark_price) |
35 |
| - self.price_state.latest_hl_timestamp = time.time() |
| 26 | + def get_subscribe_request(self, asset): |
| 27 | + return { |
| 28 | + "method": "subscribe", |
| 29 | + "subscription": {"type": "activeAssetCtx", "coin": asset} |
| 30 | + } |
| 31 | + |
| 32 | + async def subscribe_all(self): |
| 33 | + await asyncio.gather(*(self.subscribe_single(hyperliquid_ws_url) for hyperliquid_ws_url in self.hyperliquid_ws_urls)) |
| 34 | + |
| 35 | + async def subscribe_single(self, url): |
| 36 | + while True: |
| 37 | + try: |
| 38 | + await self.subscribe_single_inner(url) |
| 39 | + except websockets.ConnectionClosed: |
| 40 | + logger.error("Connection to {} closed; retrying", url) |
| 41 | + except Exception as e: |
| 42 | + logger.exception("Error on {}: {}", url, e) |
| 43 | + |
| 44 | + async def subscribe_single_inner(self, url): |
| 45 | + async with websockets.connect(url) as ws: |
| 46 | + subscribe_request = self.get_subscribe_request(self.market_symbol) |
| 47 | + await ws.send(json.dumps(subscribe_request)) |
| 48 | + logger.info("Sent subscribe request to {}", url) |
| 49 | + |
| 50 | + # listen for updates |
| 51 | + async for message in ws: |
| 52 | + try: |
| 53 | + data = json.loads(message) |
| 54 | + if "ctx" not in message: |
| 55 | + # TODO: Should check subscription status response |
| 56 | + logger.debug("HL update without ctx: {}", message) |
| 57 | + continue |
| 58 | + self.parse_hyperliquid_ws_message(data) |
| 59 | + except json.JSONDecodeError as e: |
| 60 | + logger.error("Failed to decode JSON message: {}", e) |
| 61 | + |
| 62 | + def parse_hyperliquid_ws_message(self, message): |
| 63 | + try: |
| 64 | + ctx = message["data"]["ctx"] |
| 65 | + self.price_state.hl_oracle_price = ctx["oraclePx"] |
| 66 | + self.price_state.hl_mark_price = ctx["markPx"] |
| 67 | + logger.debug("on_activeAssetCtx: oraclePx: {} marketPx: {}", self.price_state.hl_oracle_price, |
| 68 | + self.price_state.hl_mark_price) |
| 69 | + self.price_state.latest_hl_timestamp = time.time() |
| 70 | + except Exception as e: |
| 71 | + logger.error("parse_hyperliquid_ws_message error: message: {} e: {}", e) |
0 commit comments