1
- use std:: sync:: Arc ;
2
- use std:: time:: Duration ;
3
- use anyhow:: Result ;
4
- use futures_util:: SinkExt ;
5
- use futures_util:: stream:: { SplitSink , SplitStream , StreamExt } ;
6
- use http:: HeaderValue ;
7
- use pyth_lazer_protocol:: publisher:: PriceFeedDataV1 ;
8
- use reqwest:: Client ;
9
- use serde:: Deserialize ;
10
- use tokio:: net:: TcpStream ;
11
- use tokio:: task:: JoinHandle ;
12
- use tracing:: instrument;
13
- use tracing;
14
- use tokio_tungstenite:: WebSocketStream ;
15
- use tokio_tungstenite:: {
16
- connect_async_with_config,
17
- tungstenite:: { client:: IntoClientRequest , Message } ,
18
- MaybeTlsStream ,
1
+ use {
2
+ crate :: agent:: state,
3
+ anyhow:: Result ,
4
+ futures_util:: {
5
+ stream:: {
6
+ SplitSink ,
7
+ SplitStream ,
8
+ StreamExt ,
9
+ } ,
10
+ SinkExt ,
11
+ } ,
12
+ http:: HeaderValue ,
13
+ pyth_lazer_protocol:: publisher:: PriceFeedDataV1 ,
14
+ reqwest:: Client ,
15
+ serde:: Deserialize ,
16
+ std:: {
17
+ sync:: Arc ,
18
+ time:: Duration ,
19
+ } ,
20
+ tokio:: {
21
+ net:: TcpStream ,
22
+ task:: JoinHandle ,
23
+ } ,
24
+ tokio_tungstenite:: {
25
+ connect_async_with_config,
26
+ tungstenite:: {
27
+ client:: IntoClientRequest ,
28
+ Message ,
29
+ } ,
30
+ MaybeTlsStream ,
31
+ WebSocketStream ,
32
+ } ,
33
+ tokio_util:: bytes:: {
34
+ BufMut ,
35
+ BytesMut ,
36
+ } ,
37
+ tracing:: {
38
+ self ,
39
+ instrument,
40
+ } ,
41
+ url:: Url ,
19
42
} ;
20
- use tokio_util:: bytes:: { BufMut , BytesMut } ;
21
- use url:: Url ;
22
- use crate :: agent:: state;
23
43
24
44
#[ derive( Clone , Debug , Deserialize ) ]
25
45
pub struct Config {
26
- pub history_url : Url ,
27
- pub relayer_urls : Vec < Url > ,
28
- pub authorization_token : String ,
46
+ pub history_url : Url ,
47
+ pub relayer_urls : Vec < Url > ,
48
+ pub authorization_token : String ,
29
49
#[ serde( with = "humantime_serde" ) ]
30
50
pub publish_interval_duration : Duration ,
31
51
}
@@ -84,61 +104,77 @@ async fn connect_to_relayers(
84
104
relayer_senders. push ( relayer_sender) ;
85
105
relayer_receivers. push ( relayer_receiver) ;
86
106
}
87
- let sender = RelayerSender { ws_senders : relayer_senders } ;
107
+ let sender = RelayerSender {
108
+ ws_senders : relayer_senders,
109
+ } ;
88
110
tracing:: info!( "connected to relayers: {:?}" , config. relayer_urls) ;
89
111
Ok ( ( sender, relayer_receivers) )
90
112
}
91
113
92
114
#[ derive( Deserialize ) ]
93
115
struct SymbolResponse {
94
- pub pyth_lazer_id : u32 ,
95
- pub name : String ,
96
- pub symbol : String ,
97
- pub description : String ,
98
- pub asset_type : String ,
99
- pub exponent : i32 ,
100
- pub cmc_id : Option < u32 > ,
101
- pub interval : Option < String > ,
116
+ pub pyth_lazer_id : u32 ,
117
+ pub name : String ,
118
+ pub symbol : String ,
119
+ pub description : String ,
120
+ pub asset_type : String ,
121
+ pub exponent : i32 ,
122
+ pub cmc_id : Option < u32 > ,
123
+ pub interval : Option < String > ,
102
124
pub min_publishers : u16 ,
103
- pub min_channel : String ,
104
- pub state : String ,
105
- pub hermes_id : Option < String > ,
125
+ pub min_channel : String ,
126
+ pub state : String ,
127
+ pub hermes_id : Option < String > ,
106
128
}
107
129
108
130
async fn fetch_symbols ( history_url : & Url ) -> Result < Vec < SymbolResponse > > {
109
131
let mut url = history_url. clone ( ) ;
110
132
url. set_scheme ( "http" ) . unwrap ( ) ;
111
133
url. set_path ( "/history/v1/symbols" ) ;
112
134
let client = Client :: new ( ) ;
113
- let response = client
114
- . get ( url)
115
- . send ( )
116
- . await ?
117
- . text ( )
118
- . await ?;
135
+ let response = client. get ( url) . send ( ) . await ?. text ( ) . await ?;
119
136
Ok ( serde_json:: from_str ( & response) ?)
120
137
}
121
138
122
139
#[ instrument( skip( config, state) ) ]
123
- pub fn lazer_exporter ( config : Config , state : Arc < state:: State > ) -> Vec < JoinHandle < ( ) > >
124
- {
140
+ pub fn lazer_exporter ( config : Config , state : Arc < state:: State > ) -> Vec < JoinHandle < ( ) > > {
125
141
// TODO: add loop to handle relayer failure/retry
126
142
let mut handles = Vec :: new ( ) ;
127
- handles. push ( tokio:: spawn ( lazer_exporter:: lazer_exporter ( config. clone ( ) , state) ) ) ;
143
+ handles. push ( tokio:: spawn ( lazer_exporter:: lazer_exporter (
144
+ config. clone ( ) ,
145
+ state,
146
+ ) ) ) ;
128
147
handles
129
148
}
130
149
131
150
mod lazer_exporter {
132
- use std:: collections:: HashMap ;
133
- use std:: num:: NonZeroI64 ;
134
- use std:: sync:: Arc ;
135
- use std:: time:: Duration ;
136
- use futures_util:: StreamExt ;
137
- use pyth_lazer_protocol:: publisher:: PriceFeedDataV1 ;
138
- use pyth_lazer_protocol:: router:: { Price , PriceFeedId , TimestampUs } ;
139
- use tokio_stream:: StreamMap ;
140
- use crate :: agent:: services:: lazer_exporter:: { Config , connect_to_relayers, fetch_symbols, SymbolResponse } ;
141
- use crate :: agent:: state:: local:: LocalStore ;
151
+ use {
152
+ crate :: agent:: {
153
+ services:: lazer_exporter:: {
154
+ connect_to_relayers,
155
+ fetch_symbols,
156
+ Config ,
157
+ SymbolResponse ,
158
+ } ,
159
+ state:: local:: LocalStore ,
160
+ } ,
161
+ futures_util:: StreamExt ,
162
+ pyth_lazer_protocol:: {
163
+ publisher:: PriceFeedDataV1 ,
164
+ router:: {
165
+ Price ,
166
+ PriceFeedId ,
167
+ TimestampUs ,
168
+ } ,
169
+ } ,
170
+ std:: {
171
+ collections:: HashMap ,
172
+ num:: NonZeroI64 ,
173
+ sync:: Arc ,
174
+ time:: Duration ,
175
+ } ,
176
+ tokio_stream:: StreamMap ,
177
+ } ;
142
178
143
179
pub async fn lazer_exporter < S > ( config : Config , state : Arc < S > )
144
180
where
@@ -152,7 +188,11 @@ mod lazer_exporter {
152
188
run ( & config, state. clone ( ) ) . await ;
153
189
154
190
failure_count += 1 ;
155
- tracing:: error!( "Lazer exporter failed {} times; retrying in {:?}" , failure_count, retry_duration) ;
191
+ tracing:: error!(
192
+ "Lazer exporter failed {} times; retrying in {:?}" ,
193
+ failure_count,
194
+ retry_duration
195
+ ) ;
156
196
tokio:: time:: sleep ( retry_duration) . await ;
157
197
158
198
// TODO: Back off or crash altogether on persistent failure
@@ -165,20 +205,23 @@ mod lazer_exporter {
165
205
S : Send + Sync + ' static ,
166
206
{
167
207
// TODO: Re-fetch on an interval?
168
- let lazer_symbols: HashMap < String , SymbolResponse > = match fetch_symbols ( & config. history_url ) . await {
169
- Ok ( symbols) => symbols. into_iter ( ) . filter_map ( |symbol| {
170
- symbol. hermes_id . clone ( ) . map ( |id| ( id, symbol) )
171
- } ) . collect ( ) ,
172
- Err ( e) => {
173
- tracing:: error!( "Failed to fetch Lazer symbols: {e:?}" ) ;
174
- return ;
175
- }
176
- } ;
208
+ let lazer_symbols: HashMap < String , SymbolResponse > =
209
+ match fetch_symbols ( & config. history_url ) . await {
210
+ Ok ( symbols) => symbols
211
+ . into_iter ( )
212
+ . filter_map ( |symbol| symbol. hermes_id . clone ( ) . map ( |id| ( id, symbol) ) )
213
+ . collect ( ) ,
214
+ Err ( e) => {
215
+ tracing:: error!( "Failed to fetch Lazer symbols: {e:?}" ) ;
216
+ return ;
217
+ }
218
+ } ;
177
219
178
220
// Establish relayer connections
179
221
// Relayer will drop the connection if no data received in 5s
180
222
let ( mut relayer_sender, relayer_receivers) = connect_to_relayers ( & config)
181
- . await . expect ( "failed to connect to relayers" ) ;
223
+ . await
224
+ . expect ( "failed to connect to relayers" ) ;
182
225
let mut stream_map = StreamMap :: new ( ) ;
183
226
for ( i, receiver) in relayer_receivers. into_iter ( ) . enumerate ( ) {
184
227
stream_map. insert ( config. relayer_urls [ i] . clone ( ) , receiver) ;
0 commit comments