Skip to content

Commit 0a4a099

Browse files
committed
Parallelize channel send; exponential backoff
1 parent 0496481 commit 0a4a099

File tree

3 files changed

+39
-12
lines changed

3 files changed

+39
-12
lines changed

Cargo.lock

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ path = "src/bin/agent.rs"
99

1010
[dependencies]
1111
anyhow = "1.0.81"
12+
backoff = "0.4.0"
1213
ed25519-dalek = "2.1.1"
1314
serde = { version = "1.0.197", features = ["derive", "rc"] }
1415
async-trait = "0.1.79"

src/agent/services/lazer_exporter.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ use {
55
anyhow,
66
bail,
77
},
8+
backoff::{
9+
ExponentialBackoffBuilder,
10+
backoff::Backoff,
11+
},
812
futures_util::{
913
SinkExt,
1014
stream::{
@@ -112,7 +116,13 @@ struct RelayerSessionTask {
112116
impl RelayerSessionTask {
113117
pub async fn run(&mut self) {
114118
let mut failure_count = 0;
115-
let retry_duration = Duration::from_secs(1);
119+
let initial_interval = Duration::from_millis(100);
120+
let max_interval = Duration::from_secs(5);
121+
let mut backoff = ExponentialBackoffBuilder::new()
122+
.with_initial_interval(initial_interval)
123+
.with_max_interval(max_interval)
124+
.with_max_elapsed_time(None)
125+
.build();
116126

117127
loop {
118128
match self.run_relayer_connection().await {
@@ -122,13 +132,14 @@ impl RelayerSessionTask {
122132
}
123133
Err(e) => {
124134
failure_count += 1;
135+
let next_backoff = backoff.next_backoff().unwrap_or(max_interval);
125136
tracing::error!(
126137
"relayer session failed with error: {:?}, failure_count: {}; retrying in {:?}",
127138
e,
128139
failure_count,
129-
retry_duration
140+
next_backoff
130141
);
131-
tokio::time::sleep(retry_duration).await;
142+
tokio::time::sleep(next_backoff).await;
132143
}
133144
}
134145
}
@@ -405,15 +416,9 @@ mod lazer_exporter {
405416
payload: Some(buf),
406417
special_fields: Default::default(),
407418
};
408-
for relayer_sender in relayer_senders.iter() {
409-
if let Err(e) = relayer_sender
410-
.send(signed_lazer_transaction.clone())
411-
.await
412-
{
413-
tracing::error!("Error sending transaction to Lazer relayer session: {e:?}");
414-
// TODO: Under what circumstances would the channel be hosed and is it worth retry?
415-
}
416-
}
419+
futures::future::join_all(relayer_senders.iter().map(|relayer_sender|
420+
relayer_sender.send(signed_lazer_transaction.clone()))
421+
).await;
417422
}
418423
}
419424
}

0 commit comments

Comments
 (0)