Skip to content
This repository was archived by the owner on Jan 9, 2026. It is now read-only.

Commit 3c7301e

Browse files
authored
feat(optimism): Add launch_wss_flashblocks_service function spawning a task sending last pending block (paradigmxyz#18067)
1 parent 87647b2 commit 3c7301e

File tree

2 files changed

+57
-0
lines changed

2 files changed

+57
-0
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use crate::{FlashBlockService, FlashBlockWsStream};
2+
use futures_util::StreamExt;
3+
use reth_chain_state::ExecutedBlock;
4+
use reth_evm::ConfigureEvm;
5+
use reth_primitives_traits::{BlockTy, HeaderTy, NodePrimitives, ReceiptTy};
6+
use reth_rpc_eth_api::helpers::pending_block::BuildPendingEnv;
7+
use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
8+
use tokio::sync::watch;
9+
use url::Url;
10+
11+
/// Spawns a background task that subscribes over websocket to `ws_url`.
12+
///
13+
/// Returns a [`FlashBlockRx`] that receives the most recent [`ExecutedBlock`] built from
14+
/// [`FlashBlock`]s.
15+
///
16+
/// [`FlashBlock`]: crate::FlashBlock
17+
pub fn launch_wss_flashblocks_service<N, EvmConfig, Provider>(
18+
ws_url: Url,
19+
evm_config: EvmConfig,
20+
provider: Provider,
21+
) -> FlashBlockRx<N>
22+
where
23+
N: NodePrimitives,
24+
EvmConfig: ConfigureEvm<
25+
Primitives = N,
26+
NextBlockEnvCtx: BuildPendingEnv<N::BlockHeader> + Unpin + 'static,
27+
> + 'static,
28+
Provider: StateProviderFactory
29+
+ BlockReaderIdExt<
30+
Header = HeaderTy<N>,
31+
Block = BlockTy<N>,
32+
Transaction = N::SignedTx,
33+
Receipt = ReceiptTy<N>,
34+
> + Unpin
35+
+ 'static,
36+
{
37+
let stream = FlashBlockWsStream::new(ws_url);
38+
let mut service = FlashBlockService::new(stream, evm_config, provider, ());
39+
let (tx, rx) = watch::channel(None);
40+
41+
tokio::spawn(async move {
42+
while let Some(block) = service.next().await {
43+
if let Ok(block) = block.inspect_err(|e| tracing::error!("{e}")) {
44+
let _ = tx.send(Some(block)).inspect_err(|e| tracing::error!("{e}"));
45+
}
46+
}
47+
});
48+
49+
rx
50+
}
51+
52+
/// Receiver of the most recent [`ExecutedBlock`] built out of [`FlashBlock`]s.
53+
///
54+
/// [`FlashBlock`]: crate::FlashBlock
55+
pub type FlashBlockRx<N> = watch::Receiver<Option<ExecutedBlock<N>>>;
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
//! A downstream integration of Flashblocks.
22
3+
pub use app::{launch_wss_flashblocks_service, FlashBlockRx};
34
pub use payload::{
45
ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, FlashBlock, Metadata,
56
};
67
pub use service::FlashBlockService;
78
pub use ws::FlashBlockWsStream;
89

10+
mod app;
911
mod payload;
1012
mod service;
1113
mod ws;

0 commit comments

Comments
 (0)