Skip to content

Commit def7899

Browse files
committed
fixes as per review
1 parent c4fad12 commit def7899

File tree

1 file changed

+42
-32
lines changed

1 file changed

+42
-32
lines changed

src/agent/services/lazer_exporter.rs

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,14 @@ pub struct Config {
4646
pub history_url: Url,
4747
pub relayer_urls: Vec<Url>,
4848
pub authorization_token: String,
49-
#[serde(with = "humantime_serde")]
49+
#[serde(with = "humantime_serde", default = "default_publish_interval")]
5050
pub publish_interval_duration: Duration,
5151
}
5252

53+
fn default_publish_interval() -> Duration {
54+
Duration::from_millis(10)
55+
}
56+
5357
struct RelayerSender {
5458
ws_senders: Vec<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>,
5559
}
@@ -111,20 +115,22 @@ async fn connect_to_relayers(
111115
Ok((sender, relayer_receivers))
112116
}
113117

118+
// TODO: This is copied from history-service; move to Lazer protocol sdk.
114119
#[derive(Deserialize)]
115120
struct SymbolResponse {
116-
pub pyth_lazer_id: u32,
117-
pub name: String,
118-
pub symbol: String,
119-
pub description: String,
120-
pub asset_type: String,
121-
pub exponent: i32,
122-
pub cmc_id: Option<u32>,
123-
pub interval: Option<String>,
124-
pub min_publishers: u16,
125-
pub min_channel: String,
126-
pub state: String,
127-
pub hermes_id: Option<String>,
121+
pub pyth_lazer_id: u32,
122+
pub _name: String,
123+
pub _symbol: String,
124+
pub _description: String,
125+
pub _asset_type: String,
126+
pub _exponent: i16,
127+
pub _cmc_id: Option<u32>,
128+
pub _interval: Option<String>,
129+
pub _min_publishers: u16,
130+
pub _min_channel: String,
131+
pub _state: String,
132+
pub _schedule: String,
133+
pub hermes_id: Option<String>,
128134
}
129135

130136
async fn fetch_symbols(history_url: &Url) -> Result<Vec<SymbolResponse>> {
@@ -138,7 +144,6 @@ async fn fetch_symbols(history_url: &Url) -> Result<Vec<SymbolResponse>> {
138144

139145
#[instrument(skip(config, state))]
140146
pub fn lazer_exporter(config: Config, state: Arc<state::State>) -> Vec<JoinHandle<()>> {
141-
// TODO: add loop to handle relayer failure/retry
142147
let mut handles = Vec::new();
143148
handles.push(tokio::spawn(lazer_exporter::lazer_exporter(
144149
config.clone(),
@@ -158,6 +163,7 @@ mod lazer_exporter {
158163
},
159164
state::local::LocalStore,
160165
},
166+
anyhow::bail,
161167
futures_util::StreamExt,
162168
pyth_lazer_protocol::{
163169
publisher::PriceFeedDataV1,
@@ -185,21 +191,26 @@ mod lazer_exporter {
185191
let retry_duration = Duration::from_secs(1);
186192

187193
loop {
188-
run(&config, state.clone()).await;
189-
190-
failure_count += 1;
191-
tracing::error!(
192-
"Lazer exporter failed {} times; retrying in {:?}",
193-
failure_count,
194-
retry_duration
195-
);
196-
tokio::time::sleep(retry_duration).await;
197-
198-
// TODO: Back off or crash altogether on persistent failure
194+
match run(&config, state.clone()).await {
195+
Ok(()) => {
196+
tracing::info!("lazer_exporter graceful shutdown");
197+
return;
198+
}
199+
Err(e) => {
200+
failure_count += 1;
201+
tracing::error!(
202+
"lazer_exporter failed with error: {:?}, failure_count: {}; retrying in {:?}",
203+
e,
204+
failure_count,
205+
retry_duration
206+
);
207+
tokio::time::sleep(retry_duration).await;
208+
}
209+
}
199210
}
200211
}
201212

202-
async fn run<S>(config: &Config, state: Arc<S>)
213+
async fn run<S>(config: &Config, state: Arc<S>) -> anyhow::Result<()>
203214
where
204215
S: LocalStore,
205216
S: Send + Sync + 'static,
@@ -213,15 +224,13 @@ mod lazer_exporter {
213224
.collect(),
214225
Err(e) => {
215226
tracing::error!("Failed to fetch Lazer symbols: {e:?}");
216-
return;
227+
bail!("Failed to fetch Lazer symbols: {e:?}");
217228
}
218229
};
219230

220231
// Establish relayer connections
221232
// Relayer will drop the connection if no data received in 5s
222-
let (mut relayer_sender, relayer_receivers) = connect_to_relayers(&config)
223-
.await
224-
.expect("failed to connect to relayers");
233+
let (mut relayer_sender, relayer_receivers) = connect_to_relayers(&config).await?;
225234
let mut stream_map = StreamMap::new();
226235
for (i, receiver) in relayer_receivers.into_iter().enumerate() {
227236
stream_map.insert(config.relayer_urls[i].clone(), receiver);
@@ -232,6 +241,7 @@ mod lazer_exporter {
232241
loop {
233242
tokio::select! {
234243
_ = publish_interval.tick() => {
244+
// TODO: This read locks and clones local::Store::prices, which may not meet performance needs.
235245
for (identifier, price_info) in state.get_all_price_infos().await {
236246
if let Some(symbol) = lazer_symbols.get(&identifier.to_string()) {
237247
if let Err(e) = relayer_sender.send_price_update(&PriceFeedDataV1 {
@@ -243,7 +253,7 @@ mod lazer_exporter {
243253
source_timestamp_us: TimestampUs(price_info.timestamp.and_utc().timestamp_micros() as u64),
244254
}).await {
245255
tracing::error!("Error sending price update to relayer: {e:?}");
246-
return;
256+
bail!("Failed to send price update to relayer: {e:?}");
247257
}
248258
}
249259
}
@@ -260,7 +270,7 @@ mod lazer_exporter {
260270
None => {
261271
// TODO: Probably still appropriate to return here, but retry in caller.
262272
tracing::error!("relayer connection closed");
263-
return;
273+
bail!("relayer connection closed");
264274
}
265275
}
266276
}

0 commit comments

Comments
 (0)