1+ use crate :: config:: Config ;
2+ use crate :: lazer_publisher:: LazerPublisher ;
3+ use crate :: websocket_utils:: { handle_websocket_error, send_text} ;
4+ use anyhow:: Error ;
5+ use futures:: { AsyncRead , AsyncWrite } ;
6+ use futures_util:: io:: { BufReader , BufWriter } ;
7+ use hyper_util:: rt:: TokioIo ;
8+ use pyth_lazer_protocol:: jrpc:: {
9+ GetMetadataParams , JrpcCall , JrpcError , JrpcErrorResponse , JrpcResponse , JrpcSuccessResponse ,
10+ JsonRpcVersion , PythLazerAgentJrpcV1 , SymbolMetadata ,
11+ } ;
12+ use soketto:: Sender ;
13+ use soketto:: handshake:: http:: Server ;
14+ use std:: str:: FromStr ;
15+ use tokio:: { pin, select} ;
16+ use tokio_util:: compat:: TokioAsyncReadCompatExt ;
17+ use tracing:: { debug, error, instrument} ;
18+ use url:: Url ;
19+
20+ const DEFAULT_HISTORY_SERVICE_URL : & str =
21+ "https://history.pyth-lazer.dourolabs.app/history/v1/symbols" ;
22+
23+ pub struct JrpcConnectionContext { }
24+
25+ #[ instrument(
26+ skip( server, request, lazer_publisher, context) ,
27+ fields( component = "jrpc_ws" )
28+ ) ]
29+ pub async fn handle_jrpc (
30+ config : Config ,
31+ server : Server ,
32+ request : hyper:: Request < hyper:: body:: Incoming > ,
33+ context : JrpcConnectionContext ,
34+ lazer_publisher : LazerPublisher ,
35+ ) {
36+ if let Err ( err) = try_handle_jrpc ( config, server, request, context, lazer_publisher) . await {
37+ handle_websocket_error ( err) ;
38+ }
39+ }
40+
41+ #[ instrument(
42+ skip( server, request, lazer_publisher, _context) ,
43+ fields( component = "jrpc_ws" )
44+ ) ]
45+ async fn try_handle_jrpc (
46+ config : Config ,
47+ server : Server ,
48+ request : hyper:: Request < hyper:: body:: Incoming > ,
49+ _context : JrpcConnectionContext ,
50+ lazer_publisher : LazerPublisher ,
51+ ) -> anyhow:: Result < ( ) > {
52+ let stream = hyper:: upgrade:: on ( request) . await ?;
53+ let io = TokioIo :: new ( stream) ;
54+ let stream = BufReader :: new ( BufWriter :: new ( io. compat ( ) ) ) ;
55+ let ( mut ws_sender, mut ws_receiver) = server. into_builder ( stream) . finish ( ) ;
56+
57+ let mut receive_buf = Vec :: new ( ) ;
58+
59+ loop {
60+ receive_buf. clear ( ) ;
61+ {
62+ // soketto is not cancel-safe, so we need to store the future and poll it
63+ // in the inner loop.
64+ let receive = async { ws_receiver. receive ( & mut receive_buf) . await } ;
65+ pin ! ( receive) ;
66+ #[ allow( clippy:: never_loop, reason = "false positive" ) ] // false positive
67+ loop {
68+ select ! {
69+ _result = & mut receive => {
70+ break
71+ }
72+ }
73+ }
74+ }
75+
76+ match handle_jrpc_inner ( & config, & mut ws_sender, & mut receive_buf, & lazer_publisher) . await {
77+ Ok ( _) => { }
78+ Err ( err) => {
79+ debug ! ( "Error handling JRPC request: {}" , err) ;
80+ send_text (
81+ & mut ws_sender,
82+ serde_json:: to_string :: < JrpcResponse < ( ) > > ( & JrpcResponse :: Error (
83+ JrpcErrorResponse {
84+ jsonrpc : JsonRpcVersion :: V2 ,
85+ error : JrpcError :: InternalError . into ( ) ,
86+ id : None ,
87+ } ,
88+ ) ) ?
89+ . as_str ( ) ,
90+ )
91+ . await ?;
92+ }
93+ }
94+ }
95+ }
96+
97+ async fn handle_jrpc_inner < T : AsyncRead + AsyncWrite + Unpin > (
98+ config : & Config ,
99+ sender : & mut Sender < T > ,
100+ receive_buf : & mut Vec < u8 > ,
101+ lazer_publisher : & LazerPublisher ,
102+ ) -> anyhow:: Result < ( ) > {
103+ match serde_json:: from_slice :: < PythLazerAgentJrpcV1 > ( receive_buf. as_slice ( ) ) {
104+ Ok ( jrpc_request) => match jrpc_request. params {
105+ JrpcCall :: PushUpdate ( request_params) => {
106+ match lazer_publisher
107+ . push_feed_update ( request_params. into ( ) )
108+ . await
109+ {
110+ Ok ( _) => {
111+ send_text (
112+ sender,
113+ serde_json:: to_string :: < JrpcResponse < String > > ( & JrpcResponse :: Success (
114+ JrpcSuccessResponse :: < String > {
115+ jsonrpc : JsonRpcVersion :: V2 ,
116+ result : "success" . to_string ( ) ,
117+ id : jrpc_request. id ,
118+ } ,
119+ ) ) ?
120+ . as_str ( ) ,
121+ )
122+ . await ?;
123+ }
124+ Err ( err) => {
125+ debug ! ( "error while sending updates: {:?}" , err) ;
126+ send_text (
127+ sender,
128+ serde_json:: to_string :: < JrpcResponse < ( ) > > ( & JrpcResponse :: Error (
129+ JrpcErrorResponse {
130+ jsonrpc : JsonRpcVersion :: V2 ,
131+ error : JrpcError :: InternalError . into ( ) ,
132+ id : Some ( jrpc_request. id ) ,
133+ } ,
134+ ) ) ?
135+ . as_str ( ) ,
136+ )
137+ . await ?;
138+ }
139+ }
140+ }
141+ JrpcCall :: GetMetadata ( request_params) => match get_metadata ( config. clone ( ) ) . await {
142+ Ok ( symbols) => {
143+ let symbols = filter_symbols ( symbols. clone ( ) , request_params) ;
144+
145+ send_text (
146+ sender,
147+ serde_json:: to_string :: < JrpcResponse < Vec < SymbolMetadata > > > (
148+ & JrpcResponse :: Success ( JrpcSuccessResponse :: < Vec < SymbolMetadata > > {
149+ jsonrpc : JsonRpcVersion :: V2 ,
150+ result : symbols,
151+ id : jrpc_request. id ,
152+ } ) ,
153+ ) ?
154+ . as_str ( ) ,
155+ )
156+ . await ?;
157+ }
158+ Err ( err) => {
159+ error ! ( "error while retrieving metadata: {:?}" , err) ;
160+ send_text (
161+ sender,
162+ serde_json:: to_string :: < JrpcResponse < ( ) > > ( & JrpcResponse :: Error (
163+ JrpcErrorResponse {
164+ jsonrpc : JsonRpcVersion :: V2 ,
165+ // note: right now specifying an invalid method results in a parse error
166+ error : JrpcError :: InternalError . into ( ) ,
167+ id : None ,
168+ } ,
169+ ) ) ?
170+ . as_str ( ) ,
171+ )
172+ . await ?;
173+ }
174+ } ,
175+ } ,
176+ Err ( err) => {
177+ debug ! ( "Error parsing JRPC request: {}" , err) ;
178+ send_text (
179+ sender,
180+ serde_json:: to_string :: < JrpcResponse < ( ) > > ( & JrpcResponse :: Error (
181+ JrpcErrorResponse {
182+ jsonrpc : JsonRpcVersion :: V2 ,
183+ error : JrpcError :: ParseError ( err. to_string ( ) ) . into ( ) ,
184+ id : None ,
185+ } ,
186+ ) ) ?
187+ . as_str ( ) ,
188+ )
189+ . await ?;
190+ }
191+ }
192+ Ok ( ( ) )
193+ }
194+
195+ async fn get_metadata ( config : Config ) -> Result < Vec < SymbolMetadata > , Error > {
196+ let result = reqwest:: get (
197+ config
198+ . history_service_url
199+ . unwrap_or ( Url :: from_str ( DEFAULT_HISTORY_SERVICE_URL ) ?) ,
200+ )
201+ . await ?;
202+
203+ if result. status ( ) . is_success ( ) {
204+ Ok ( serde_json:: from_str :: < Vec < SymbolMetadata > > (
205+ & result. text ( ) . await ?,
206+ ) ?)
207+ } else {
208+ Err ( anyhow:: anyhow!(
209+ "Error getting metadata (status_code={}, body={})" ,
210+ result. status( ) ,
211+ result. text( ) . await . unwrap_or( "none" . to_string( ) )
212+ ) )
213+ }
214+ }
215+
216+ fn filter_symbols (
217+ symbols : Vec < SymbolMetadata > ,
218+ get_metadata_params : GetMetadataParams ,
219+ ) -> Vec < SymbolMetadata > {
220+ let names = & get_metadata_params. names . clone ( ) ;
221+ let asset_types = & get_metadata_params. asset_types . clone ( ) ;
222+
223+ let res: Vec < SymbolMetadata > = symbols
224+ . into_iter ( )
225+ . filter ( |symbol| {
226+ if let Some ( names) = names {
227+ if !names. contains ( & symbol. name ) {
228+ return false ;
229+ }
230+ }
231+
232+ if let Some ( asset_types) = asset_types {
233+ if !asset_types. contains ( & symbol. asset_type ) {
234+ return false ;
235+ }
236+ }
237+
238+ true
239+ } )
240+ . collect ( ) ;
241+
242+ res
243+ }
244+
245+ #[ cfg( test) ]
246+ pub mod tests {
247+ use super :: * ;
248+ use pyth_lazer_protocol:: router:: { Channel , FixedRate , PriceFeedId } ;
249+ use pyth_lazer_protocol:: symbol_state:: SymbolState ;
250+ use std:: net:: SocketAddr ;
251+
252+ fn gen_test_symbol ( name : String , asset_type : String ) -> SymbolMetadata {
253+ SymbolMetadata {
254+ pyth_lazer_id : PriceFeedId ( 1 ) ,
255+ name,
256+ symbol : "" . to_string ( ) ,
257+ description : "" . to_string ( ) ,
258+ asset_type,
259+ exponent : 0 ,
260+ cmc_id : None ,
261+ funding_rate_interval : None ,
262+ min_publishers : 0 ,
263+ min_channel : Channel :: FixedRate ( FixedRate :: MIN ) ,
264+ state : SymbolState :: Stable ,
265+ hermes_id : None ,
266+ quote_currency : None ,
267+ }
268+ }
269+
270+ #[ tokio:: test]
271+ #[ ignore]
272+ async fn test_try_get_metadata ( ) {
273+ let config = Config {
274+ listen_address : SocketAddr :: from ( ( [ 127 , 0 , 0 , 1 ] , 0 ) ) ,
275+ relayer_urls : vec ! [ ] ,
276+ authorization_token : None ,
277+ publish_keypair_path : Default :: default ( ) ,
278+ publish_interval_duration : Default :: default ( ) ,
279+ history_service_url : None ,
280+ } ;
281+
282+ println ! ( "{:?}" , get_metadata( config) . await . unwrap( ) ) ;
283+ }
284+
285+ #[ test]
286+ fn test_filter_symbols ( ) {
287+ let symbol1 = gen_test_symbol ( "BTC" . to_string ( ) , "crypto" . to_string ( ) ) ;
288+ let symbol2 = gen_test_symbol ( "XMR" . to_string ( ) , "crypto" . to_string ( ) ) ;
289+ let symbol3 = gen_test_symbol ( "BTCUSDT" . to_string ( ) , "funding-rate" . to_string ( ) ) ;
290+ let symbols = vec ! [ symbol1. clone( ) , symbol2. clone( ) , symbol3. clone( ) ] ;
291+
292+ // just a name filter
293+ assert_eq ! (
294+ filter_symbols(
295+ symbols. clone( ) ,
296+ GetMetadataParams {
297+ names: Some ( vec![ "XMR" . to_string( ) ] ) ,
298+ asset_types: None ,
299+ } ,
300+ ) ,
301+ vec![ symbol2. clone( ) ]
302+ ) ;
303+
304+ // just an asset type filter
305+ assert_eq ! (
306+ filter_symbols(
307+ symbols. clone( ) ,
308+ GetMetadataParams {
309+ names: None ,
310+ asset_types: Some ( vec![ "crypto" . to_string( ) ] ) ,
311+ } ,
312+ ) ,
313+ vec![ symbol1. clone( ) , symbol2. clone( ) ]
314+ ) ;
315+
316+ // name and asset type
317+ assert_eq ! (
318+ filter_symbols(
319+ symbols. clone( ) ,
320+ GetMetadataParams {
321+ names: Some ( vec![ "BTC" . to_string( ) ] ) ,
322+ asset_types: Some ( vec![ "crypto" . to_string( ) ] ) ,
323+ } ,
324+ ) ,
325+ vec![ symbol1. clone( ) ]
326+ ) ;
327+ }
328+ }
0 commit comments