1
1
import asyncio
2
2
from typing import TypedDict
3
-
4
3
import httpx
5
4
import os
5
+ import json
6
+ import websockets
6
7
7
8
from .price_feeds import Price
8
9
9
- HERMES_ENDPOINT_HTTPS = "https://hermes.pyth.network/api/ "
10
+ HERMES_ENDPOINT_HTTPS = "https://hermes.pyth.network/"
10
11
HERMES_ENDPOINT_WSS = "wss://hermes.pyth.network/ws"
11
12
12
13
13
14
class PriceFeed (TypedDict ):
14
15
feed_id : str
15
16
price : Price
16
17
ema_price : Price
17
- vaa : str
18
+ update_data : list [str ]
19
+
18
20
21
+ def parse_unsupported_version (version ):
22
+ if isinstance (version , int ):
23
+ raise ValueError ("Version number {version} not supported" )
24
+ else :
25
+ raise TypeError ("Version must be an integer" )
19
26
20
27
21
28
class HermesClient :
22
- def __init__ (self , feed_ids : list [str ], endpoint = HERMES_ENDPOINT_HTTPS , ws_endpoint = HERMES_ENDPOINT_WSS ):
29
+ def __init__ (self , feed_ids : list [str ], endpoint = HERMES_ENDPOINT_HTTPS , ws_endpoint = HERMES_ENDPOINT_WSS , feed_batch_size = 100 ):
23
30
self .feed_ids = feed_ids
24
31
self .pending_feed_ids = feed_ids
25
32
self .prices_dict : dict [str , PriceFeed ] = {}
26
33
self .client = httpx .AsyncClient ()
27
34
self .endpoint = endpoint
28
35
self .ws_endpoint = ws_endpoint
36
+ self .feed_batch_size = feed_batch_size
29
37
30
38
async def get_price_feed_ids (self ) -> list [str ]:
31
39
"""
32
40
Queries the Hermes https endpoint for a list of the IDs of all Pyth price feeds.
33
41
"""
34
42
35
- url = os .path .join (self .endpoint , "price_feed_ids" )
36
-
37
- client = httpx .AsyncClient ()
43
+ url = os .path .join (self .endpoint , "api/price_feed_ids" )
38
44
39
- data = (await client .get (url )).json ()
45
+ data = (await self . client .get (url )).json ()
40
46
41
47
return data
42
48
@@ -46,74 +52,113 @@ def add_feed_ids(self, feed_ids: list[str]):
46
52
self .pending_feed_ids += feed_ids
47
53
48
54
@staticmethod
49
- def extract_price_feed (data : dict ) -> PriceFeed :
55
+ def extract_price_feed_v1 (data : dict ) -> PriceFeed :
50
56
"""
51
- Extracts a PriceFeed object from the JSON response from Hermes.
57
+ Extracts PriceFeed object from the v1 JSON response (individual price feed) from Hermes.
52
58
"""
53
59
price = Price .from_dict (data ["price" ])
54
60
ema_price = Price .from_dict (data ["ema_price" ])
55
- vaa = data ["vaa" ]
61
+ update_data = data ["vaa" ]
56
62
price_feed = {
57
63
"feed_id" : data ["id" ],
58
64
"price" : price ,
59
65
"ema_price" : ema_price ,
60
- "vaa " : vaa ,
66
+ "update_data " : [ update_data ] ,
61
67
}
62
68
return price_feed
69
+
70
+ @staticmethod
71
+ def extract_price_feed_v2 (data : dict ) -> list [PriceFeed ]:
72
+ """
73
+ Extracts PriceFeed objects from the v2 JSON response (multiple price feeds) from Hermes.
74
+ """
75
+ update_data = data ["binary" ]["data" ]
63
76
64
- async def get_pyth_prices_latest (self , feedIds : list [str ]) -> list [PriceFeed ]:
77
+ price_feeds = []
78
+
79
+ for feed in data ["parsed" ]:
80
+ price = Price .from_dict (feed ["price" ])
81
+ ema_price = Price .from_dict (feed ["ema_price" ])
82
+ price_feed = {
83
+ "feed_id" : feed ["id" ],
84
+ "price" : price ,
85
+ "ema_price" : ema_price ,
86
+ "update_data" : update_data ,
87
+ }
88
+ price_feeds .append (price_feed )
89
+
90
+ return price_feeds
91
+
92
+ async def get_pyth_prices_latest (self , feedIds : list [str ], version = 2 ) -> list [PriceFeed ]:
65
93
"""
66
94
Queries the Hermes https endpoint for the latest price feeds for a list of Pyth feed IDs.
67
95
"""
68
- url = os .path .join (self .endpoint , "latest_price_feeds?" )
69
- params = {"ids[]" : feedIds , "binary" : "true" }
96
+ if version == 1 :
97
+ url = os .path .join (self .endpoint , "api/latest_price_feeds" )
98
+ params = {"ids[]" : feedIds , "binary" : "true" }
99
+ elif version == 2 :
100
+ url = os .path .join (self .endpoint , "v2/updates/price/latest" )
101
+ params = {"ids[]" : feedIds , "encoding" : "base64" , "parsed" : "true" }
102
+ else :
103
+ parse_unsupported_version (version )
70
104
71
105
data = (await self .client .get (url , params = params )).json ()
72
106
73
- results = []
74
- for res in data :
75
- price_feed = self .extract_price_feed (res )
76
- results .append (price_feed )
107
+ if version == 1 :
108
+ results = []
109
+ for res in data :
110
+ price_feed = self .extract_price_feed_v1 (res )
111
+ results .append (price_feed )
112
+ elif version == 2 :
113
+ results = self .extract_price_feed_v2 (data )
77
114
78
115
return results
79
116
80
- async def get_pyth_price_at_time (self , feed_id : str , timestamp : int ) -> PriceFeed :
117
+ async def get_pyth_price_at_time (self , feed_id : str , timestamp : int , version = 2 ) -> PriceFeed :
81
118
"""
82
119
Queries the Hermes https endpoint for the price feed for a Pyth feed ID at a given timestamp.
83
120
"""
84
- url = os .path .join (self .endpoint , "get_price_feed" )
85
- params = {"id" : feed_id , "publish_time" : timestamp , "binary" : "true" }
121
+ if version == 1 :
122
+ url = os .path .join (self .endpoint , "api/get_price_feed" )
123
+ params = {"id" : feed_id , "publish_time" : timestamp , "binary" : "true" }
124
+ elif version == 2 :
125
+ url = os .path .join (self .endpoint , f"v2/updates/price/{ timestamp } " )
126
+ params = {"ids[]" : [feed_id ], "encoding" : "base64" , "parsed" : "true" }
127
+ else :
128
+ parse_unsupported_version (version )
86
129
87
130
data = (await self .client .get (url , params = params )).json ()
88
131
89
- price_feed = self .extract_price_feed (data )
132
+ if version == 1 :
133
+ price_feed = self .extract_price_feed_v1 (data )
134
+ elif version == 2 :
135
+ price_feed = self .extract_price_feed_v2 (data )[0 ]
90
136
91
137
return price_feed
92
138
93
- async def get_all_prices (self ) -> dict [str , PriceFeed ]:
139
+ async def get_all_prices (self , version = 2 ) -> dict [str , PriceFeed ]:
94
140
"""
95
141
Queries the Hermes http endpoint for the latest price feeds for all feed IDs in the class object.
96
142
97
143
There are limitations on the number of feed IDs that can be queried at once, so this function queries the feed IDs in batches.
98
144
"""
99
145
pyth_prices_latest = []
100
146
i = 0
101
- batch_size = 100
102
- while len (self .feed_ids [i : i + batch_size ]) > 0 :
147
+ while len (self .feed_ids [i : i + self .feed_batch_size ]) > 0 :
103
148
pyth_prices_latest += await self .get_pyth_prices_latest (
104
- self .feed_ids [i : i + batch_size ]
149
+ self .feed_ids [i : i + self .feed_batch_size ],
150
+ version = version ,
105
151
)
106
- i += batch_size
152
+ i += self . feed_batch_size
107
153
108
154
return dict ([(feed ['feed_id' ], feed ) for feed in pyth_prices_latest ])
109
155
110
- async def ws_pyth_prices (self ):
156
+ async def ws_pyth_prices (self , version = 1 ):
111
157
"""
112
158
Opens a websocket connection to Hermes for latest prices for all feed IDs in the class object.
113
159
"""
114
- import json
115
-
116
- import websockets
160
+ if version != 1 :
161
+ parse_unsupported_version (version )
117
162
118
163
async with websockets .connect (self .ws_endpoint ) as ws :
119
164
while True :
@@ -139,30 +184,32 @@ async def ws_pyth_prices(self):
139
184
feed_id = msg ["price_feed" ]["id" ]
140
185
new_feed = msg ["price_feed" ]
141
186
142
- self .prices_dict [feed_id ] = self .extract_price_feed (new_feed )
187
+ self .prices_dict [feed_id ] = self .extract_price_feed_v1 (new_feed )
143
188
144
- except :
145
- raise Exception ("Error in price_update message" , msg )
189
+ except Exception as e :
190
+ raise Exception (f "Error in price_update message: { msg } " ) from e
146
191
147
192
148
193
async def main ():
149
194
hermes_client = HermesClient ([])
150
195
feed_ids = await hermes_client .get_price_feed_ids ()
151
196
feed_ids_rel = feed_ids [:50 ]
197
+ version = 2
152
198
153
199
hermes_client .add_feed_ids (feed_ids_rel )
154
-
155
- prices_latest = await hermes_client .get_pyth_prices_latest (feed_ids_rel )
200
+
201
+ prices_latest = await hermes_client .get_pyth_prices_latest (feed_ids [: 50 ], version = version )
156
202
157
203
try :
158
- price_at_time = await hermes_client .get_pyth_price_at_time (feed_ids [0 ], 1_700_000_000 )
204
+ price_at_time = await hermes_client .get_pyth_price_at_time (feed_ids [0 ], 1_700_000_000 , version = version )
205
+ print (price_at_time )
159
206
except Exception as e :
160
207
print (f"Error in get_pyth_price_at_time, { e } " )
161
208
162
- all_prices = await hermes_client .get_all_prices ()
209
+ all_prices = await hermes_client .get_all_prices (version = version )
163
210
164
211
print ("Starting web socket..." )
165
- ws_call = hermes_client .ws_pyth_prices ()
212
+ ws_call = hermes_client .ws_pyth_prices (version = version )
166
213
asyncio .create_task (ws_call )
167
214
168
215
while True :
0 commit comments