Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/client/flashblocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ base-client-node.workspace = true

# reth
reth.workspace = true
reth-exex.workspace = true
reth-evm.workspace = true
reth-primitives.workspace = true
reth-rpc.workspace = true
Expand Down
58 changes: 19 additions & 39 deletions crates/client/flashblocks/src/extension.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//! Contains the [`FlashblocksExtension`] which wires up the flashblocks feature
//! (both the canon ExEx and RPC surface) on the Base node builder.
//! (canonical block subscription and RPC surface) on the Base node builder.

use std::sync::Arc;

use base_client_node::{BaseNodeExtension, FromExtensionConfig, OpBuilder, OpProvider};
use futures_util::TryStreamExt;
use once_cell::sync::OnceCell;
use reth_exex::ExExEvent;
use reth::providers::CanonStateSubscriptions;
use tokio_stream::{StreamExt, wrappers::BroadcastStream};
use tracing::info;
use url::Url;

Expand Down Expand Up @@ -55,48 +55,17 @@ impl BaseNodeExtension for FlashblocksExtension {
};

let flashblocks_cell = self.cell;
let cfg_for_rpc = cfg.clone();
let flashblocks_cell_for_rpc = flashblocks_cell.clone();

// Install the canon ExEx
let builder = builder.install_exex("flashblocks-canon", move |mut ctx| {
let flashblocks_cell = flashblocks_cell.clone();
async move {
let fb = flashblocks_cell
.get_or_init(|| {
Arc::new(FlashblocksState::new(
ctx.provider().clone(),
cfg.max_pending_blocks_depth,
))
})
.clone();

Ok(async move {
while let Some(note) = ctx.notifications.try_next().await? {
if let Some(committed) = note.committed_chain() {
let tip = committed.tip().num_hash();
let chain = Arc::unwrap_or_clone(committed);
for (_, block) in chain.into_blocks() {
fb.on_canonical_block_received(block);
}
let _ = ctx.events.send(ExExEvent::FinishedHeight(tip));
}
}
Ok(())
})
}
});

// Extend with RPC modules
// Extend with RPC modules and start canonical block subscription
builder.extend_rpc_modules(move |ctx| {
info!(message = "Starting Flashblocks RPC");

let ws_url = Url::parse(cfg_for_rpc.websocket_url.as_str())?;
let fb = flashblocks_cell_for_rpc
let ws_url = Url::parse(cfg.websocket_url.as_str())?;
let fb = flashblocks_cell
.get_or_init(|| {
Arc::new(FlashblocksState::new(
ctx.provider().clone(),
cfg_for_rpc.max_pending_blocks_depth,
cfg.max_pending_blocks_depth,
))
})
.clone();
Expand All @@ -115,9 +84,20 @@ impl BaseNodeExtension for FlashblocksExtension {
// Register the eth_subscribe subscription endpoint for flashblocks
// Uses replace_configured since eth_subscribe already exists from reth's standard module
// Pass eth_api to enable proxying standard subscription types to reth's implementation
let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb);
let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb.clone());
ctx.modules.replace_configured(eth_pubsub.into_rpc())?;

let mut canonical_stream =
BroadcastStream::new(ctx.provider().subscribe_to_canonical_state());
tokio::spawn(async move {
while let Some(Ok(notification)) = canonical_stream.next().await {
let committed = notification.committed();
for block in committed.blocks_iter() {
fb.on_canonical_block_received(block.clone());
}
}
});

Ok(())
})
}
Expand Down
121 changes: 55 additions & 66 deletions crates/client/flashblocks/src/test_harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use once_cell::sync::OnceCell;
use reth::providers::CanonStateSubscriptions;
use reth_optimism_chainspec::OpChainSpec;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::StreamExt;
use tokio_stream::{StreamExt, wrappers::BroadcastStream};

use crate::{
EthApiExt, EthApiOverrideServer, EthPubSub, EthPubSubApiServer, FlashblocksReceiver,
Expand Down Expand Up @@ -120,76 +120,65 @@ impl BaseNodeExtension for FlashblocksTestExtension {
let receiver = self.inner.receiver.clone();
let process_canonical = self.inner.process_canonical;

let fb_cell_for_exex = fb_cell.clone();

builder
.install_exex("flashblocks-canon", move |mut ctx| {
let fb_cell = fb_cell_for_exex.clone();
async move {
let provider = ctx.provider().clone();
let fb = init_flashblocks_state(&fb_cell, &provider);
Ok(async move {
use reth_exex::ExExEvent;
while let Some(note) = ctx.notifications.try_next().await? {
if let Some(committed) = note.committed_chain() {
let hash = committed.tip().num_hash();
if process_canonical {
// Many suites drive canonical updates manually to reproduce race conditions, so
// allowing this to be disabled keeps canonical replay deterministic.
let chain = Arc::unwrap_or_clone(committed);
for (_, block) in chain.into_blocks() {
fb.on_canonical_block_received(block);
}
}
let _ = ctx.events.send(ExExEvent::FinishedHeight(hash));
}
}
Ok(())
})
builder.extend_rpc_modules(move |ctx| {
let provider = ctx.provider().clone();
let fb = init_flashblocks_state(&fb_cell, &provider);

// Spawn a task to forward canonical state notifications to the in-memory state
let provider_for_notify = provider.clone();
let mut canon_notify_stream =
BroadcastStream::new(ctx.provider().subscribe_to_canonical_state());
tokio::spawn(async move {
while let Some(Ok(notification)) = canon_notify_stream.next().await {
provider_for_notify
.canonical_in_memory_state()
.notify_canon_state(notification);
}
})
.extend_rpc_modules(move |ctx| {
let fb_cell = fb_cell.clone();
let provider = ctx.provider().clone();
let fb = init_flashblocks_state(&fb_cell, &provider);

let mut canon_stream = tokio_stream::wrappers::BroadcastStream::new(
ctx.provider().subscribe_to_canonical_state(),
);
tokio::spawn(async move {
use tokio_stream::StreamExt;
while let Some(Ok(notification)) = canon_stream.next().await {
provider.canonical_in_memory_state().notify_canon_state(notification);
}
});
let api_ext = EthApiExt::new(
ctx.registry.eth_api().clone(),
ctx.registry.eth_handlers().filter.clone(),
fb.clone(),
);
ctx.modules.replace_configured(api_ext.into_rpc())?;

// Register eth_subscribe subscription endpoint for flashblocks
// Uses replace_configured since eth_subscribe already exists from reth's standard module
// Pass eth_api to enable proxying standard subscription types to reth's implementation
let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb.clone());
ctx.modules.replace_configured(eth_pubsub.into_rpc())?;

let fb_for_task = fb.clone();
let mut receiver = receiver
.lock()
.expect("flashblock receiver mutex poisoned")
.take()
.expect("flashblock receiver should only be initialized once");
});

// If process_canonical is enabled, spawn a task to process canonical blocks
if process_canonical {
let fb_for_canonical = fb.clone();
let mut canonical_stream =
BroadcastStream::new(ctx.provider().subscribe_to_canonical_state());
tokio::spawn(async move {
while let Some((payload, tx)) = receiver.recv().await {
fb_for_task.on_flashblock_received(payload);
let _ = tx.send(());
while let Some(Ok(notification)) = canonical_stream.next().await {
let committed = notification.committed();
for block in committed.blocks_iter() {
fb_for_canonical.on_canonical_block_received(block.clone());
}
}
});
}

let api_ext = EthApiExt::new(
ctx.registry.eth_api().clone(),
ctx.registry.eth_handlers().filter.clone(),
fb.clone(),
);
ctx.modules.replace_configured(api_ext.into_rpc())?;

// Register eth_subscribe subscription endpoint for flashblocks
// Uses replace_configured since eth_subscribe already exists from reth's standard module
// Pass eth_api to enable proxying standard subscription types to reth's implementation
let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb.clone());
ctx.modules.replace_configured(eth_pubsub.into_rpc())?;

let fb_for_task = fb.clone();
let mut receiver = receiver
.lock()
.expect("flashblock receiver mutex poisoned")
.take()
.expect("flashblock receiver should only be initialized once");
tokio::spawn(async move {
while let Some((payload, tx)) = receiver.recv().await {
fb_for_task.on_flashblock_received(payload);
let _ = tx.send(());
}
});

Ok(())
})
Ok(())
})
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/client/txpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ base-client-node.workspace = true

# reth
reth.workspace = true
reth-exex.workspace = true
reth-tracing.workspace = true
reth-transaction-pool.workspace = true

Expand All @@ -29,18 +28,19 @@ jsonrpsee.workspace = true

# async
tokio.workspace = true
tokio-stream.workspace = true
futures.workspace = true

# misc
lru.workspace = true
eyre.workspace = true
chrono.workspace = true
metrics.workspace = true
derive_more = { workspace = true, features = ["display"] }
tracing.workspace = true
serde.workspace = true

[dev-dependencies]
eyre.workspace = true
httpmock.workspace = true
serde_json.workspace = true
reth-transaction-pool = { workspace = true, features = ["test-utils"] }
42 changes: 0 additions & 42 deletions crates/client/txpool/src/exex.rs

This file was deleted.

25 changes: 16 additions & 9 deletions crates/client/txpool/src/extension.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
//! Contains the [`TxPoolExtension`] which wires up the transaction pool features
//! (tracing ExEx and status RPC) on the Base node builder.
//! (tracing subscription and status RPC) on the Base node builder.

use base_client_node::{BaseNodeExtension, FromExtensionConfig, OpBuilder};
use reth::providers::CanonStateSubscriptions;
use tokio_stream::wrappers::BroadcastStream;
use tracing::info;

use crate::{TransactionStatusApiImpl, TransactionStatusApiServer, tracex_exex};
use crate::{TransactionStatusApiImpl, TransactionStatusApiServer, tracex_subscription};

/// Transaction pool configuration.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -36,20 +38,25 @@ impl BaseNodeExtension for TxPoolExtension {
fn apply(self: Box<Self>, builder: OpBuilder) -> OpBuilder {
let config = self.config;

// Install the tracing ExEx if enabled
// Extend with RPC modules and optionally start tracing subscription
let sequencer_rpc = config.sequencer_rpc;
let tracing_enabled = config.tracing_enabled;
let logs_enabled = config.tracing_logs_enabled;
let builder =
builder.install_exex_if(config.tracing_enabled, "tracex", move |ctx| async move {
Ok(tracex_exex(ctx, logs_enabled))
});

// Extend with RPC modules
let sequencer_rpc = config.sequencer_rpc;
builder.extend_rpc_modules(move |ctx| {
info!(message = "Starting Transaction Status RPC");
let proxy_api = TransactionStatusApiImpl::new(sequencer_rpc, ctx.pool().clone())
.expect("Failed to create transaction status proxy");
ctx.modules.merge_configured(proxy_api.into_rpc())?;

// Start the tracing subscription if enabled
if tracing_enabled {
let canonical_stream =
BroadcastStream::new(ctx.provider().subscribe_to_canonical_state());
let pool = ctx.pool().clone();
tokio::spawn(tracex_subscription(canonical_stream, pool, logs_enabled));
}

Ok(())
})
}
Expand Down
4 changes: 2 additions & 2 deletions crates/client/txpool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
mod events;
pub use events::{EventLog, Pool, TxEvent};

mod exex;
pub use exex::tracex_exex;
mod subscription;
pub use subscription::tracex_subscription;

mod rpc;
pub use rpc::{
Expand Down
Loading
Loading