Skip to content

Commit e365920

Browse files
RomanHodulakmattsse
authored andcommitted
refactor(optimism): Extract pending block building responsibility out of FlashBlockService (paradigmxyz#18247)
Co-authored-by: Matthias Seitz <[email protected]>
1 parent 9f02330 commit e365920

File tree

6 files changed

+285
-149
lines changed

6 files changed

+285
-149
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/optimism/flashblocks/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ reth-revm.workspace = true
2222
reth-rpc-eth-types.workspace = true
2323
reth-errors.workspace = true
2424
reth-storage-api.workspace = true
25+
reth-tasks.workspace = true
2526

2627
# alloy
2728
alloy-eips = { workspace = true, features = ["serde"] }

crates/optimism/flashblocks/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub use ws::{WsConnect, WsFlashBlockStream};
1010
mod payload;
1111
mod sequence;
1212
mod service;
13+
mod worker;
1314
mod ws;
1415

1516
/// Receiver of the most recent [`PendingBlock`] built out of [`FlashBlock`]s.
Lines changed: 149 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,26 @@
1-
use crate::{sequence::FlashBlockSequence, ExecutionPayloadBaseV1, FlashBlock};
2-
use alloy_eips::BlockNumberOrTag;
1+
use crate::{
2+
sequence::FlashBlockSequence,
3+
worker::{BuildArgs, FlashBlockBuilder},
4+
ExecutionPayloadBaseV1, FlashBlock,
5+
};
6+
use alloy_eips::eip2718::WithEncoded;
37
use alloy_primitives::B256;
48
use futures_util::{FutureExt, Stream, StreamExt};
5-
use reth_chain_state::{
6-
CanonStateNotification, CanonStateNotifications, CanonStateSubscriptions, ExecutedBlock,
7-
};
8-
use reth_errors::RethError;
9-
use reth_evm::{
10-
execute::{BlockBuilder, BlockBuilderOutcome},
11-
ConfigureEvm,
9+
use reth_chain_state::{CanonStateNotification, CanonStateNotifications, CanonStateSubscriptions};
10+
use reth_evm::ConfigureEvm;
11+
use reth_primitives_traits::{
12+
AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, Recovered,
1213
};
13-
use reth_execution_types::ExecutionOutcome;
14-
use reth_primitives_traits::{AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy};
15-
use reth_revm::{cached::CachedReads, database::StateProviderDatabase, db::State};
16-
use reth_rpc_eth_types::{EthApiError, PendingBlock};
17-
use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, StateProviderFactory};
14+
use reth_revm::cached::CachedReads;
15+
use reth_rpc_eth_types::PendingBlock;
16+
use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
17+
use reth_tasks::TaskExecutor;
1818
use std::{
1919
pin::Pin,
20-
sync::Arc,
21-
task::{Context, Poll},
22-
time::{Duration, Instant},
20+
task::{ready, Context, Poll},
21+
time::Instant,
2322
};
24-
use tokio::pin;
23+
use tokio::{pin, sync::oneshot};
2524
use tracing::{debug, trace, warn};
2625

2726
/// The `FlashBlockService` maintains an in-memory [`PendingBlock`] built out of a sequence of
@@ -37,9 +36,10 @@ pub struct FlashBlockService<
3736
current: Option<PendingBlock<N>>,
3837
blocks: FlashBlockSequence<N::SignedTx>,
3938
rebuild: bool,
40-
evm_config: EvmConfig,
41-
provider: Provider,
39+
builder: FlashBlockBuilder<EvmConfig, Provider>,
4240
canon_receiver: CanonStateNotifications<N>,
41+
spawner: TaskExecutor,
42+
job: Option<BuildJob<N>>,
4343
/// Cached state reads for the current block.
4444
/// Current `PendingBlock` is built out of a sequence of `FlashBlocks`, and executed again when
4545
/// fb received on top of the same block. Avoid redundant I/O across multiple executions
@@ -50,28 +50,33 @@ pub struct FlashBlockService<
5050
impl<N, S, EvmConfig, Provider> FlashBlockService<N, S, EvmConfig, Provider>
5151
where
5252
N: NodePrimitives,
53-
S: Stream<Item = eyre::Result<FlashBlock>> + Unpin,
54-
EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>,
53+
S: Stream<Item = eyre::Result<FlashBlock>> + Unpin + 'static,
54+
EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>
55+
+ Clone
56+
+ 'static,
5557
Provider: StateProviderFactory
5658
+ CanonStateSubscriptions<Primitives = N>
5759
+ BlockReaderIdExt<
5860
Header = HeaderTy<N>,
5961
Block = BlockTy<N>,
6062
Transaction = N::SignedTx,
6163
Receipt = ReceiptTy<N>,
62-
> + Unpin,
64+
> + Unpin
65+
+ Clone
66+
+ 'static,
6367
{
6468
/// Constructs a new `FlashBlockService` that receives [`FlashBlock`]s from `rx` stream.
65-
pub fn new(rx: S, evm_config: EvmConfig, provider: Provider) -> Self {
69+
pub fn new(rx: S, evm_config: EvmConfig, provider: Provider, spawner: TaskExecutor) -> Self {
6670
Self {
6771
rx,
6872
current: None,
6973
blocks: FlashBlockSequence::new(),
70-
evm_config,
7174
canon_receiver: provider.subscribe_to_canonical_state(),
72-
provider,
73-
cached_state: None,
75+
builder: FlashBlockBuilder::new(evm_config, provider),
7476
rebuild: false,
77+
spawner,
78+
job: None,
79+
cached_state: None,
7580
}
7681
}
7782

@@ -88,86 +93,35 @@ where
8893
warn!("Flashblock service has stopped");
8994
}
9095

91-
/// Returns the cached reads at the given head hash.
96+
/// Returns the [`BuildArgs`] made purely out of [`FlashBlock`]s that were received earlier.
9297
///
93-
/// Returns a new cache instance if this is new `head` hash.
94-
fn cached_reads(&mut self, head: B256) -> CachedReads {
95-
if let Some((tracked, cache)) = self.cached_state.take() {
96-
if tracked == head {
97-
return cache
98-
}
99-
}
100-
101-
// instantiate a new cache instance
102-
CachedReads::default()
103-
}
104-
105-
/// Updates the cached reads at the given head hash
106-
fn update_cached_reads(&mut self, head: B256, cached_reads: CachedReads) {
107-
self.cached_state = Some((head, cached_reads));
108-
}
109-
110-
/// Returns the [`ExecutedBlock`] made purely out of [`FlashBlock`]s that were received earlier.
111-
///
112-
/// Returns None if the flashblock doesn't attach to the latest header.
113-
fn execute(&mut self) -> eyre::Result<Option<PendingBlock<N>>> {
114-
trace!("Attempting new flashblock");
115-
116-
let latest = self
117-
.provider
118-
.latest_header()?
119-
.ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?;
120-
let latest_hash = latest.hash();
121-
122-
let Some(attrs) = self.blocks.payload_base() else {
123-
trace!(flashblock_number = ?self.blocks.block_number(), count = %self.blocks.count(), "Missing flashblock payload base");
124-
return Ok(None)
98+
/// Returns `None` if the flashblock have no `base` or the base is not a child block of latest.
99+
fn build_args(
100+
&mut self,
101+
) -> Option<BuildArgs<impl IntoIterator<Item = WithEncoded<Recovered<N::SignedTx>>>>> {
102+
let Some(base) = self.blocks.payload_base() else {
103+
trace!(
104+
flashblock_number = ?self.blocks.block_number(),
105+
count = %self.blocks.count(),
106+
"Missing flashblock payload base"
107+
);
108+
109+
return None
125110
};
126111

127-
if attrs.parent_hash != latest_hash {
128-
trace!(flashblock_parent = ?attrs.parent_hash, local_latest=?latest.num_hash(),"Skipping non consecutive flashblock");
129-
// doesn't attach to the latest block
130-
return Ok(None)
131-
}
132-
133-
let state_provider = self.provider.history_by_block_hash(latest.hash())?;
134-
135-
let mut request_cache = self.cached_reads(latest_hash);
136-
let cached_db = request_cache.as_db_mut(StateProviderDatabase::new(&state_provider));
137-
let mut state = State::builder().with_database(cached_db).with_bundle_update().build();
138-
139-
let mut builder = self
140-
.evm_config
141-
.builder_for_next_block(&mut state, &latest, attrs.into())
142-
.map_err(RethError::other)?;
143-
144-
builder.apply_pre_execution_changes()?;
145-
146-
for tx in self.blocks.ready_transactions() {
147-
let _gas_used = builder.execute_transaction(tx)?;
112+
// attempt an initial consecutive check
113+
if let Some(latest) = self.builder.provider().latest_header().ok().flatten() {
114+
if latest.hash() != base.parent_hash {
115+
trace!(flashblock_parent=?base.parent_hash, flashblock_number=base.block_number, local_latest=?latest.num_hash(), "Skipping non consecutive build attempt");
116+
return None;
117+
}
148118
}
149119

150-
let BlockBuilderOutcome { execution_result, block, hashed_state, .. } =
151-
builder.finish(NoopProvider::default())?;
152-
153-
let execution_outcome = ExecutionOutcome::new(
154-
state.take_bundle(),
155-
vec![execution_result.receipts],
156-
block.number(),
157-
vec![execution_result.requests],
158-
);
159-
160-
// update cached reads
161-
self.update_cached_reads(latest_hash, request_cache);
162-
163-
Ok(Some(PendingBlock::with_executed_block(
164-
Instant::now() + Duration::from_secs(1),
165-
ExecutedBlock {
166-
recovered_block: block.into(),
167-
execution_output: Arc::new(execution_outcome),
168-
hashed_state: Arc::new(hashed_state),
169-
},
170-
)))
120+
Some(BuildArgs {
121+
base,
122+
transactions: self.blocks.ready_transactions().collect::<Vec<_>>(),
123+
cached_state: self.cached_state.take(),
124+
})
171125
}
172126

173127
/// Takes out `current` [`PendingBlock`] if `state` is not preceding it.
@@ -180,72 +134,119 @@ where
180134
impl<N, S, EvmConfig, Provider> Stream for FlashBlockService<N, S, EvmConfig, Provider>
181135
where
182136
N: NodePrimitives,
183-
S: Stream<Item = eyre::Result<FlashBlock>> + Unpin,
184-
EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>,
137+
S: Stream<Item = eyre::Result<FlashBlock>> + Unpin + 'static,
138+
EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>
139+
+ Clone
140+
+ 'static,
185141
Provider: StateProviderFactory
186142
+ CanonStateSubscriptions<Primitives = N>
187143
+ BlockReaderIdExt<
188144
Header = HeaderTy<N>,
189145
Block = BlockTy<N>,
190146
Transaction = N::SignedTx,
191147
Receipt = ReceiptTy<N>,
192-
> + Unpin,
148+
> + Unpin
149+
+ Clone
150+
+ 'static,
193151
{
194152
type Item = eyre::Result<Option<PendingBlock<N>>>;
195153

196154
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
197155
let this = self.get_mut();
198156

199-
// consume new flashblocks while they're ready
200-
while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) {
201-
match result {
202-
Ok(flashblock) => match this.blocks.insert(flashblock) {
203-
Ok(_) => this.rebuild = true,
204-
Err(err) => debug!(%err, "Failed to prepare flashblock"),
205-
},
206-
Err(err) => return Poll::Ready(Some(Err(err))),
157+
loop {
158+
// drive pending build job to completion
159+
let result = match this.job.as_mut() {
160+
Some((now, rx)) => {
161+
let result = ready!(rx.poll_unpin(cx));
162+
result.ok().map(|res| (*now, res))
163+
}
164+
None => None,
165+
};
166+
// reset job
167+
this.job.take();
168+
169+
if let Some((now, result)) = result {
170+
match result {
171+
Ok(Some((new_pending, cached_reads))) => {
172+
// built a new pending block
173+
this.current = Some(new_pending.clone());
174+
// cache reads
175+
this.cached_state = Some((new_pending.parent_hash(), cached_reads));
176+
this.rebuild = false;
177+
178+
trace!(
179+
parent_hash = %new_pending.block().parent_hash(),
180+
block_number = new_pending.block().number(),
181+
flash_blocks = this.blocks.count(),
182+
elapsed = ?now.elapsed(),
183+
"Built new block with flashblocks"
184+
);
185+
186+
return Poll::Ready(Some(Ok(Some(new_pending))));
187+
}
188+
Ok(None) => {
189+
// nothing to do because tracked flashblock doesn't attach to latest
190+
}
191+
Err(err) => {
192+
// we can ignore this error
193+
debug!(%err, "failed to execute flashblock");
194+
}
195+
}
207196
}
208-
}
209197

210-
if let Poll::Ready(Ok(state)) = {
211-
let fut = this.canon_receiver.recv();
212-
pin!(fut);
213-
fut.poll_unpin(cx)
214-
} {
215-
if let Some(current) = this.on_new_tip(state) {
216-
trace!(
217-
parent_hash = %current.block().parent_hash(),
218-
block_number = current.block().number(),
219-
"Clearing current flashblock on new canonical block"
220-
);
221-
222-
return Poll::Ready(Some(Ok(None)))
198+
// consume new flashblocks while they're ready
199+
while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) {
200+
match result {
201+
Ok(flashblock) => match this.blocks.insert(flashblock) {
202+
Ok(_) => this.rebuild = true,
203+
Err(err) => debug!(%err, "Failed to prepare flashblock"),
204+
},
205+
Err(err) => return Poll::Ready(Some(Err(err))),
206+
}
223207
}
224-
}
225208

226-
if !this.rebuild && this.current.is_some() {
227-
return Poll::Pending
228-
}
229-
230-
let now = Instant::now();
231-
// try to build a block on top of latest
232-
match this.execute() {
233-
Ok(Some(new_pending)) => {
234-
// built a new pending block
235-
this.current = Some(new_pending.clone());
236-
this.rebuild = false;
237-
trace!(parent_hash=%new_pending.block().parent_hash(), block_number=new_pending.block().number(), flash_blocks=this.blocks.count(), elapsed=?now.elapsed(), "Built new block with flashblocks");
238-
return Poll::Ready(Some(Ok(Some(new_pending))));
209+
// update on new head block
210+
if let Poll::Ready(Ok(state)) = {
211+
let fut = this.canon_receiver.recv();
212+
pin!(fut);
213+
fut.poll_unpin(cx)
214+
} {
215+
if let Some(current) = this.on_new_tip(state) {
216+
trace!(
217+
parent_hash = %current.block().parent_hash(),
218+
block_number = current.block().number(),
219+
"Clearing current flashblock on new canonical block"
220+
);
221+
222+
return Poll::Ready(Some(Ok(None)))
223+
}
239224
}
240-
Ok(None) => {
241-
// nothing to do because tracked flashblock doesn't attach to latest
225+
226+
if !this.rebuild && this.current.is_some() {
227+
return Poll::Pending
242228
}
243-
Err(err) => {
244-
// we can ignore this error
245-
debug!(%err, "failed to execute flashblock");
229+
230+
// try to build a block on top of latest
231+
if let Some(args) = this.build_args() {
232+
let now = Instant::now();
233+
234+
let (tx, rx) = oneshot::channel();
235+
let builder = this.builder.clone();
236+
237+
this.spawner.spawn_blocking(async move {
238+
let _ = tx.send(builder.execute(args));
239+
});
240+
this.job.replace((now, rx));
241+
242+
// continue and poll the spawned job
243+
continue
246244
}
247-
}
248245

249-
Poll::Pending
246+
return Poll::Pending
247+
}
250248
}
251249
}
250+
251+
type BuildJob<N> =
252+
(Instant, oneshot::Receiver<eyre::Result<Option<(PendingBlock<N>, CachedReads)>>>);

0 commit comments

Comments
 (0)