Skip to content

Commit 4fb7646

Browse files
committed
chore: do not use tokio streams, handle loop
1 parent 2a9df76 commit 4fb7646

File tree

2 files changed

+24
-13
lines changed

2 files changed

+24
-13
lines changed

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ chrono = "0.4.40"
4545
# OAuth
4646
oauth2 = { version = "5.0.0", optional = true }
4747
tokio = { version = "1.36.0", optional = true }
48-
tokio-stream = { version = "0.1", optional = true }
4948

5049
# Other
5150
axum = "0.8.1"
@@ -73,7 +72,7 @@ default = ["alloy", "rustls"]
7372
alloy = ["dep:alloy"]
7473
aws = ["alloy", "alloy?/signer-aws", "dep:async-trait", "dep:aws-config", "dep:aws-sdk-kms"]
7574
perms = ["dep:oauth2", "dep:tokio", "dep:reqwest", "dep:signet-tx-cache"]
76-
block_watcher = ["dep:tokio", "dep:tokio-stream"]
75+
block_watcher = ["dep:tokio"]
7776
rustls = ["dep:rustls", "rustls/aws-lc-rs"]
7877

7978
[[example]]

src/utils/block_watcher.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ use alloy::{
66
providers::{Provider, RootProvider},
77
transports::TransportError,
88
};
9-
use tokio::{sync::watch, task::JoinHandle};
10-
use tokio_stream::StreamExt;
11-
use tracing::{debug, error, trace};
9+
use tokio::{
10+
sync::{broadcast::error::RecvError, watch},
11+
task::JoinHandle,
12+
};
13+
use tracing::{debug, error, trace, warn};
1214

1315
/// Errors that can occur on the [`BlockWatcher`] task.
1416
#[derive(Debug, thiserror::Error)]
@@ -64,21 +66,31 @@ impl BlockWatcher {
6466
}
6567

6668
async fn task_future(self) {
67-
let sub = match self.host_provider.subscribe_blocks().await {
69+
let mut sub = match self.host_provider.subscribe_blocks().await {
6870
Ok(sub) => sub,
69-
Err(err) => {
70-
error!(error = ?err, "failed to subscribe to host chain blocks");
71+
Err(error) => {
72+
error!(%error);
7173
return;
7274
}
7375
};
74-
let mut stream = sub.into_stream();
7576

7677
debug!("subscribed to host chain blocks");
7778

78-
while let Some(header) = stream.next().await {
79-
let block_number = header.number;
80-
self.block_number.send_replace(block_number);
81-
trace!(block_number, "updated host block number");
79+
loop {
80+
match sub.recv().await {
81+
Ok(header) => {
82+
let block_number = header.number;
83+
self.block_number.send_replace(block_number);
84+
trace!(block_number, "updated host block number");
85+
}
86+
Err(RecvError::Lagged(missed)) => {
87+
warn!(%missed, "block subscription lagged");
88+
}
89+
Err(RecvError::Closed) => {
90+
error!("block subscription closed");
91+
break;
92+
}
93+
}
8294
}
8395
}
8496
}

0 commit comments

Comments
 (0)