Skip to content

Commit a850e35

Browse files
committed
Optimize P2P blob fetching:
- fetch only missing blobs from peers and prioritize EL blob retrieval - reuse `received_blob_sidecars` both in `p2p` and `eth1_api` to avoid excessive blob fetching
1 parent 4cf0013 commit a850e35

File tree

27 files changed

+471
-217
lines changed

27 files changed

+471
-217
lines changed

Cargo.lock

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ crossbeam-skiplist = '0.1'
313313
crossbeam-utils = '0.8'
314314
ctr = { version = '0.9', features = ['zeroize'] }
315315
darling = '0.20'
316+
dashmap = '6.1'
316317
dedicated_executor = { path = 'dedicated_executor' }
317318
delay_map = '0.4'
318319
derivative = '2'

eth1_api/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ workspace = true
1010
anyhow = { workspace = true }
1111
arc-swap = { workspace = true }
1212
bls = { workspace = true }
13+
dashmap = { workspace = true }
1314
dedicated_executor = { workspace = true }
1415
derive_more = { workspace = true }
1516
either = { workspace = true }
1617
enum-iterator = { workspace = true }
18+
eth2_libp2p = { workspace = true }
1719
ethereum-types = { workspace = true }
1820
execution_engine = { workspace = true }
1921
features = { workspace = true }

eth1_api/src/eth1_execution_engine.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,24 @@ use std::sync::Arc;
33
use anyhow::Result;
44
use derive_more::Constructor;
55
use either::Either;
6-
use execution_engine::{ExecutionEngine, PayloadAttributes, PayloadId, PayloadStatusV1};
6+
use eth2_libp2p::PeerId;
7+
use execution_engine::{
8+
ExecutionEngine, ExecutionServiceMessage, PayloadAttributes, PayloadId, PayloadStatusV1,
9+
};
710
use futures::channel::{mpsc::UnboundedSender, oneshot::Sender};
811
use log::{info, warn};
912
use tokio::runtime::{Builder, Handle};
1013
use types::{
1114
combined::{ExecutionPayload, ExecutionPayloadParams, SignedBeaconBlock},
1215
config::Config,
13-
deneb::primitives::BlobIndex,
16+
deneb::containers::BlobIdentifier,
1417
nonstandard::{Phase, TimedPowBlock, WithBlobsAndMev},
1518
phase0::primitives::{ExecutionBlockHash, H256},
1619
preset::Preset,
1720
};
1821
use web3::types::U64;
1922

20-
use crate::{eth1_api::Eth1Api, messages::ExecutionServiceMessage};
23+
use crate::eth1_api::Eth1Api;
2124

2225
#[derive(Constructor)]
2326
pub struct Eth1ExecutionEngine<P: Preset> {
@@ -37,10 +40,16 @@ impl<P: Preset> ExecutionEngine<P> for Eth1ExecutionEngine<P> {
3740
ExecutionServiceMessage::ExchangeCapabilities.send(&self.execution_service_tx);
3841
}
3942

40-
fn get_blobs(&self, block: Arc<SignedBeaconBlock<P>>, blob_indices: Vec<BlobIndex>) {
43+
fn get_blobs(
44+
&self,
45+
block: Arc<SignedBeaconBlock<P>>,
46+
blob_identifiers: Vec<BlobIdentifier>,
47+
peer_id: Option<PeerId>,
48+
) {
4149
ExecutionServiceMessage::GetBlobs {
4250
block,
43-
blob_indices,
51+
blob_identifiers,
52+
peer_id,
4453
}
4554
.send(&self.execution_service_tx);
4655
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
use std::{collections::HashSet, sync::Arc};
2+
3+
use anyhow::Result;
4+
use dashmap::DashMap;
5+
use derive_more::derive::Constructor;
6+
use eth2_libp2p::PeerId;
7+
use execution_engine::BlobAndProofV1;
8+
use fork_choice_control::Wait;
9+
use futures::{
10+
channel::mpsc::{UnboundedReceiver, UnboundedSender},
11+
StreamExt as _,
12+
};
13+
use helper_functions::misc;
14+
use log::{debug, warn};
15+
use types::{
16+
combined::SignedBeaconBlock,
17+
deneb::{containers::BlobIdentifier, primitives::BlobIndex},
18+
phase0::primitives::Slot,
19+
preset::Preset,
20+
traits::SignedBeaconBlock as _,
21+
};
22+
23+
use crate::{
24+
messages::{BlobFetcherToP2p, Eth1ApiToBlobFetcher},
25+
ApiController, Eth1Api,
26+
};
27+
28+
#[derive(Constructor)]
29+
pub struct ExecutionBlobFetcher<P: Preset, W: Wait> {
30+
api: Arc<Eth1Api>,
31+
controller: ApiController<P, W>,
32+
received_blob_sidecars: Arc<DashMap<BlobIdentifier, Slot>>,
33+
p2p_tx: UnboundedSender<BlobFetcherToP2p>,
34+
rx: UnboundedReceiver<Eth1ApiToBlobFetcher<P>>,
35+
}
36+
37+
impl<P: Preset, W: Wait> ExecutionBlobFetcher<P, W> {
38+
pub async fn run(mut self) -> Result<()> {
39+
while let Some(message) = self.rx.next().await {
40+
match message {
41+
Eth1ApiToBlobFetcher::GetBlobs {
42+
block,
43+
blob_identifiers,
44+
peer_id,
45+
} => {
46+
self.get_blobs(block, blob_identifiers, peer_id).await;
47+
}
48+
}
49+
}
50+
51+
Ok(())
52+
}
53+
54+
async fn get_blobs(
55+
&self,
56+
block: Arc<SignedBeaconBlock<P>>,
57+
blob_identifiers: Vec<BlobIdentifier>,
58+
peer_id: Option<PeerId>,
59+
) {
60+
let slot = block.message().slot();
61+
62+
if let Some(body) = block.message().body().post_deneb() {
63+
let missing_blob_indices = blob_identifiers
64+
.iter()
65+
.filter(|identifier| !self.received_blob_sidecars.contains_key(identifier))
66+
.map(|identifier| identifier.index)
67+
.collect::<HashSet<BlobIndex>>();
68+
69+
let kzg_commitments = body
70+
.blob_kzg_commitments()
71+
.iter()
72+
.zip(0..)
73+
.filter(|(_, index)| missing_blob_indices.contains(index))
74+
.collect::<Vec<_>>();
75+
76+
if kzg_commitments.is_empty() {
77+
debug!(
78+
"cannot fetch blobs from EL: all requested blob sidecars have been received"
79+
);
80+
return;
81+
}
82+
83+
let versioned_hashes = kzg_commitments
84+
.iter()
85+
.copied()
86+
.map(|(commitment, _)| misc::kzg_commitment_to_versioned_hash(*commitment))
87+
.collect();
88+
89+
let mut blob_sidecars = vec![];
90+
let block_root = block.message().hash_tree_root();
91+
92+
match self.api.get_blobs::<P>(versioned_hashes).await {
93+
Ok(blobs_and_proofs) => {
94+
let block_header = block.to_header();
95+
96+
for (blob_and_proof, kzg_commitment, index) in blobs_and_proofs
97+
.into_iter()
98+
.zip(kzg_commitments.into_iter())
99+
.filter_map(|(blob_and_proof, (kzg_commitment, index))| {
100+
blob_and_proof
101+
.map(|blob_and_proof| (blob_and_proof, kzg_commitment, index))
102+
})
103+
{
104+
let BlobAndProofV1 { blob, proof } = blob_and_proof;
105+
let blob_identifier = BlobIdentifier { block_root, index };
106+
107+
if self.received_blob_sidecars.contains_key(&blob_identifier) {
108+
debug!(
109+
"received blob from EL is already known: {blob_identifier:?}, \
110+
slot: {slot}"
111+
);
112+
} else {
113+
match misc::construct_blob_sidecar(
114+
&block,
115+
block_header,
116+
index,
117+
blob,
118+
*kzg_commitment,
119+
proof,
120+
) {
121+
Ok(blob_sidecar) => {
122+
debug!(
123+
"received blob sidecar from EL: {blob_identifier:?}, \
124+
slot: {slot}"
125+
);
126+
127+
// Record all blob_sidecars as received first and push to controller
128+
// on a second pass to avoid spawning extra `engine_getBlobs` calls.
129+
self.received_blob_sidecars.insert(blob_identifier, slot);
130+
blob_sidecars.push(Arc::new(blob_sidecar));
131+
}
132+
Err(error) => warn!(
133+
"failed to construct blob sidecar with blob and proof \
134+
received from execution layer: {error:?}"
135+
),
136+
}
137+
}
138+
}
139+
}
140+
Err(error) => warn!("engine_getBlobsV1 call failed: {error}"),
141+
}
142+
143+
for blob_sidecar in blob_sidecars {
144+
self.controller.on_el_blob_sidecar(blob_sidecar);
145+
}
146+
147+
// Request remaining missing blob sidecars from P2P
148+
let missing_blob_identifiers = blob_identifiers
149+
.into_iter()
150+
.filter(|identifier| !self.received_blob_sidecars.contains_key(identifier))
151+
.collect::<Vec<_>>();
152+
153+
debug!("missing blob sidecars after EL: {missing_blob_identifiers:?}");
154+
155+
if !missing_blob_identifiers.is_empty() {
156+
BlobFetcherToP2p::BlobsNeeded(missing_blob_identifiers, slot, peer_id)
157+
.send(&self.p2p_tx);
158+
}
159+
}
160+
}
161+
}

eth1_api/src/execution_service.rs

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@ use anyhow::Result;
44
use dedicated_executor::DedicatedExecutor;
55
use derive_more::Constructor;
66
use either::Either;
7-
use execution_engine::{ForkChoiceUpdatedResponse, PayloadAttributes, PayloadStatusV1};
7+
use execution_engine::{
8+
ExecutionServiceMessage, ForkChoiceUpdatedResponse, PayloadAttributes, PayloadStatusV1,
9+
};
810
use fork_choice_control::Wait;
9-
use futures::{channel::mpsc::UnboundedReceiver, StreamExt as _};
11+
use futures::{
12+
channel::mpsc::{UnboundedReceiver, UnboundedSender},
13+
StreamExt as _,
14+
};
1015
use log::warn;
1116
use std_ext::ArcExt as _;
1217
use types::{
@@ -17,8 +22,8 @@ use types::{
1722
};
1823

1924
use crate::{
20-
eth1_api::Eth1Api, messages::ExecutionServiceMessage, misc::ApiController,
21-
spawn_blobs_download_task, spawn_exchange_capabilities_task,
25+
eth1_api::Eth1Api, messages::Eth1ApiToBlobFetcher, misc::ApiController,
26+
spawn_exchange_capabilities_task,
2227
};
2328

2429
#[derive(Constructor)]
@@ -27,6 +32,7 @@ pub struct ExecutionService<P: Preset, W: Wait> {
2732
controller: ApiController<P, W>,
2833
dedicated_executor: Arc<DedicatedExecutor>,
2934
rx: UnboundedReceiver<ExecutionServiceMessage<P>>,
35+
blob_fetcher_tx: UnboundedSender<Eth1ApiToBlobFetcher<P>>,
3036
}
3137

3238
impl<P: Preset, W: Wait> ExecutionService<P, W> {
@@ -41,15 +47,22 @@ impl<P: Preset, W: Wait> ExecutionService<P, W> {
4147
}
4248
ExecutionServiceMessage::GetBlobs {
4349
block,
44-
blob_indices,
50+
blob_identifiers,
51+
peer_id,
4552
} => {
46-
spawn_blobs_download_task(
47-
self.api.clone_arc(),
48-
self.controller.clone_arc(),
49-
&self.dedicated_executor,
53+
// Fetch blobs from the EL in a separate task concurrently.
54+
// Blob fetching from the EL should not delay the 'engine_forkchoiceUpdated'
55+
// call, if all required blobs are received via gossip in the meantime.
56+
//
57+
// The message to trigger blob fetching should not be sent directly from
58+
// `Mutator` to `ExecutionBlobFetcher`, as fetching must occur only after
59+
// the execution payload is validated with the `engine_newPayload` call.
60+
Eth1ApiToBlobFetcher::GetBlobs {
5061
block,
51-
blob_indices,
52-
);
62+
blob_identifiers,
63+
peer_id,
64+
}
65+
.send(&self.blob_fetcher_tx);
5366
}
5467
ExecutionServiceMessage::NotifyForkchoiceUpdated {
5568
head_eth1_block_hash,

eth1_api/src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ pub use crate::{
44
eth1_api::Eth1Api,
55
eth1_block::Eth1Block,
66
eth1_execution_engine::Eth1ExecutionEngine,
7+
execution_blob_fetcher::ExecutionBlobFetcher,
78
execution_service::ExecutionService,
8-
messages::{Eth1ApiToMetrics, Eth1ConnectionData, Eth1Metrics, ExecutionServiceMessage},
9+
messages::{BlobFetcherToP2p, Eth1ApiToMetrics, Eth1ConnectionData, Eth1Metrics},
910
misc::{ApiController, RealController},
10-
tasks::{spawn_blobs_download_task, spawn_exchange_capabilities_task},
11+
tasks::spawn_exchange_capabilities_task,
1112
};
1213

1314
mod auth;
@@ -16,6 +17,7 @@ mod endpoints;
1617
mod eth1_api;
1718
mod eth1_block;
1819
mod eth1_execution_engine;
20+
mod execution_blob_fetcher;
1921
mod execution_service;
2022
mod messages;
2123
mod misc;

0 commit comments

Comments
 (0)