Skip to content

Commit 88fe2a3

Browse files
authored
feat(utils): BlockWatcher (#108)
* feat(utils): `BlockWatcher` * feat: add helper type for subscribing to block number upates * chore: do not use tokio streams, handle loop * chore: code review changes * chore: comments
1 parent fc7182b commit 88fe2a3

File tree

3 files changed

+125
-0
lines changed

3 files changed

+125
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ default = ["alloy", "rustls"]
7272
alloy = ["dep:alloy"]
7373
aws = ["alloy", "alloy?/signer-aws", "dep:async-trait", "dep:aws-config", "dep:aws-sdk-kms"]
7474
perms = ["dep:oauth2", "dep:tokio", "dep:reqwest", "dep:signet-tx-cache"]
75+
block_watcher = ["dep:tokio"]
7576
rustls = ["dep:rustls", "rustls/aws-lc-rs"]
7677

7778
[[example]]

src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ pub mod utils {
4747

4848
/// Tracing utilities.
4949
pub mod tracing;
50+
51+
/// Block watcher utilities.
52+
#[cfg(feature = "block_watcher")]
53+
pub mod block_watcher;
5054
}
5155

5256
/// Re-exports of common dependencies.

src/utils/block_watcher.rs

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
//! Host chain block watcher that subscribes to new blocks and tracks the
2+
//! current host block number.
3+
4+
use alloy::{
5+
network::Ethereum,
6+
providers::{Provider, RootProvider},
7+
transports::TransportError,
8+
};
9+
use tokio::{
10+
sync::{broadcast::error::RecvError, watch},
11+
task::JoinHandle,
12+
};
13+
use tracing::{debug, error, trace};
14+
15+
/// Host chain block watcher that subscribes to new blocks and broadcasts
16+
/// updates via a watch channel.
17+
#[derive(Debug)]
18+
pub struct BlockWatcher {
19+
/// Watch channel responsible for broadcasting block number updates.
20+
block_number: watch::Sender<u64>,
21+
22+
/// Host chain provider.
23+
host_provider: RootProvider<Ethereum>,
24+
}
25+
26+
impl BlockWatcher {
27+
/// Creates a new [`BlockWatcher`] with the given provider and initial
28+
/// block number.
29+
pub fn new(host_provider: RootProvider<Ethereum>, initial: u64) -> Self {
30+
Self {
31+
block_number: watch::channel(initial).0,
32+
host_provider,
33+
}
34+
}
35+
36+
/// Creates a new [`BlockWatcher`], fetching the current block number first.
37+
pub async fn with_current_block(
38+
host_provider: RootProvider<Ethereum>,
39+
) -> Result<Self, TransportError> {
40+
let block_number = host_provider.get_block_number().await?;
41+
Ok(Self::new(host_provider, block_number))
42+
}
43+
44+
/// Subscribe to block number updates.
45+
pub fn subscribe(&self) -> SharedBlockNumber {
46+
self.block_number.subscribe().into()
47+
}
48+
49+
/// Spawns the block watcher task.
50+
pub fn spawn(self) -> (SharedBlockNumber, JoinHandle<()>) {
51+
(self.subscribe(), tokio::spawn(self.task_future()))
52+
}
53+
54+
async fn task_future(self) {
55+
let mut sub = match self.host_provider.subscribe_blocks().await {
56+
Ok(sub) => sub,
57+
Err(error) => {
58+
error!(%error);
59+
return;
60+
}
61+
};
62+
63+
debug!("subscribed to host chain blocks");
64+
65+
loop {
66+
match sub.recv().await {
67+
Ok(header) => {
68+
let block_number = header.number;
69+
self.block_number.send_replace(block_number);
70+
trace!(block_number, "updated host block number");
71+
}
72+
Err(RecvError::Lagged(missed)) => {
73+
debug!(%missed, "block subscription lagged");
74+
}
75+
Err(RecvError::Closed) => {
76+
debug!("block subscription closed");
77+
break;
78+
}
79+
}
80+
}
81+
}
82+
}
83+
84+
/// A shared block number, wrapped in a [`tokio::sync::watch`] Receiver.
85+
///
86+
/// The block number is periodically updated by a [`BlockWatcher`] task, and
87+
/// can be read or awaited for changes. This allows multiple tasks to observe
88+
/// block number updates.
89+
#[derive(Debug, Clone)]
90+
pub struct SharedBlockNumber(watch::Receiver<u64>);
91+
92+
impl From<watch::Receiver<u64>> for SharedBlockNumber {
93+
fn from(inner: watch::Receiver<u64>) -> Self {
94+
Self(inner)
95+
}
96+
}
97+
98+
impl SharedBlockNumber {
99+
/// Get the current block number.
100+
pub fn get(&self) -> u64 {
101+
*self.0.borrow()
102+
}
103+
104+
/// Wait for the block number to change, then return the new value.
105+
///
106+
/// This is implemented using [`Receiver::changed`].
107+
///
108+
/// [`Receiver::changed`]: tokio::sync::watch::Receiver::changed
109+
pub async fn changed(&mut self) -> Result<u64, watch::error::RecvError> {
110+
self.0.changed().await?;
111+
Ok(*self.0.borrow_and_update())
112+
}
113+
114+
/// Wait for the block number to reach at least `target`.
115+
///
116+
/// Returns the block number once it is >= `target`.
117+
pub async fn wait_until(&mut self, target: u64) -> Result<u64, watch::error::RecvError> {
118+
self.0.wait_for(|&n| n >= target).await.map(|r| *r)
119+
}
120+
}

0 commit comments

Comments
 (0)