Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
ffc2b97
Serve rpc by range and by root:
eserilev Feb 24, 2026
e966428
fix spelling issues
eserilev Feb 24, 2026
45912fb
Delete dead code
eserilev Feb 24, 2026
5127054
Merge branch 'unstable' of https://github.com/sigp/lighthouse into gl…
eserilev Feb 27, 2026
ff0c05e
Merge branch 'unstable' of https://github.com/sigp/lighthouse into gl…
eserilev Mar 3, 2026
0212e84
Use max request payloads
eserilev Mar 3, 2026
8378f7d
Fixes based on feedback
eserilev Mar 4, 2026
81e105d
more fixes
eserilev Mar 4, 2026
fbc5f0d
add kzg commitment var list filling to gloas block full impl
eserilev Mar 8, 2026
6cc4cd9
Add warn logs
eserilev Mar 8, 2026
5b4e3ad
add missing validation
eserilev Mar 8, 2026
e1dce7b
Merge branch 'unstable' of https://github.com/sigp/lighthouse into gl…
eserilev Mar 11, 2026
86ff82e
Ensure payloads are canonical
eserilev Mar 12, 2026
c651a32
Merge branch 'unstable' of https://github.com/sigp/lighthouse into gl…
eserilev Mar 12, 2026
30bfd23
Add some tests
eserilev Mar 12, 2026
7120d4b
spec.max_request_payloads
eserilev Mar 12, 2026
bfbb28b
fixes
eserilev Mar 12, 2026
d397e0f
Fixes
eserilev Mar 12, 2026
99d9925
Update envelope streamer
eserilev Mar 14, 2026
df931fd
Merge branch 'unstable' of https://github.com/sigp/lighthouse into gl…
eserilev Mar 14, 2026
3e6afae
avoid wrapping canonical_head in Arc, impl CanonicalHeadReader on Bea…
dapplion Mar 16, 2026
ed8c549
remove dead BeaconBlockGloas::full() impl
dapplion Mar 16, 2026
9c2ff0d
Merge branch 'unstable' of https://github.com/sigp/lighthouse into gl…
eserilev Mar 16, 2026
be9e32f
cleanup
eserilev Mar 16, 2026
0769ffc
Reffactor to use adapter pattern
eserilev Mar 17, 2026
54c062d
Merge branch 'unstable' of https://github.com/sigp/lighthouse into gl…
eserilev Mar 17, 2026
a9bc8a1
Fix comments
eserilev Mar 17, 2026
54dfa0c
ssz size calc refactor
eserilev Mar 17, 2026
12e86c3
Resolve merge conflicts
eserilev Mar 18, 2026
9f72751
Merge branch 'unstable' into gloas-serve-envelope-rpc
eserilev Mar 19, 2026
ba57a85
debug message instead of unreachable
eserilev Mar 24, 2026
9c28db2
Reject enveloeps by range req if start slot is pre-gloas
eserilev Mar 24, 2026
0e4e813
Rename to envrange and envroot
eserilev Mar 24, 2026
2ab336d
Comments and logs
eserilev Mar 24, 2026
79e9a7c
Revert
eserilev Mar 24, 2026
34fa9f3
Merge branch 'unstable' of https://github.com/sigp/lighthouse into gl…
eserilev Mar 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::early_attester_cache::EarlyAttesterCache;
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::events::ServerSentEventHandler;
use crate::execution_payload::{NotifyExecutionLayer, PreparePayloadHandle, get_execution_payload};
use crate::execution_payload_envelope_streamer::PayloadEnvelopeStreamer;
use crate::fetch_blobs::EngineGetBlobsOutput;
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx};
use crate::graffiti_calculator::{GraffitiCalculator, GraffitiSettings};
Expand Down Expand Up @@ -1124,6 +1125,58 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_or_else(|| self.get_blobs(block_root), Ok)
}

/// Returns the execution payload envelopes at the given roots, if any.
///
/// Will also check any associated caches. The expected use for this function is *only* for returning blocks requested
/// from P2P peers.
///
/// ## Errors
///
/// May return a database error.
#[allow(clippy::type_complexity)]
pub fn get_payload_envelopes_checking_caches(
self: &Arc<Self>,
block_roots: Vec<Hash256>,
) -> Result<
impl Stream<
Item = (
Hash256,
Arc<Result<Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>, Error>>,
),
>,
Error,
> {
Ok(PayloadEnvelopeStreamer::<T>::new(
self.execution_layer.clone(),
self.store.clone(),
self.task_executor.clone(),
CheckCaches::Yes,
)?
.launch_stream(block_roots))
}

#[allow(clippy::type_complexity)]
pub fn get_payload_envelopes(
self: &Arc<Self>,
block_roots: Vec<Hash256>,
) -> Result<
impl Stream<
Item = (
Hash256,
Arc<Result<Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>>, Error>>,
),
>,
Error,
> {
Ok(PayloadEnvelopeStreamer::<T>::new(
self.execution_layer.clone(),
self.store.clone(),
self.task_executor.clone(),
CheckCaches::No,
)?
.launch_stream(block_roots))
}

pub fn get_data_columns_checking_all_caches(
&self,
block_root: Hash256,
Expand Down
138 changes: 138 additions & 0 deletions beacon_node/beacon_chain/src/execution_payload_envelope_streamer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use std::sync::Arc;

use bls::Hash256;
use execution_layer::ExecutionLayer;
use futures::Stream;
use task_executor::TaskExecutor;
use tokio::sync::mpsc::{self, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::debug;
use types::{EthSpec, SignedExecutionPayloadEnvelope};

use crate::{BeaconChainError, BeaconChainTypes, BeaconStore, beacon_block_streamer::CheckCaches};

type PayloadEnvelopeResult<E> =
Result<Option<Arc<SignedExecutionPayloadEnvelope<E>>>, BeaconChainError>;

pub struct PayloadEnvelopeStreamer<T: BeaconChainTypes> {
execution_layer: ExecutionLayer<T::EthSpec>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we start with using the beacon chain here instead and do beacon_chain.execution_layer when we need to access it? that way we get to keep pretty similar logic to the beaocn block streamer.
We need the beacon_chain to access the db anyway

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If testing is the concern, we can take a similar approach to the fetch_blobs adapter.

Here, it seems like we need ExecutionLayer, CanonicalHead and the store all of which are present in the BeaconChain

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ive went with the adapter pattern

store: BeaconStore<T>,
task_executor: TaskExecutor,
_check_caches: CheckCaches,
}

// TODO(gloas) eventually we'll need to expand this to support loading blinded payload envelopes from the db
// and fetching the execution payload from the EL. See BlockStreamer impl as an example
impl<T: BeaconChainTypes> PayloadEnvelopeStreamer<T> {
pub fn new(
execution_layer_opt: Option<ExecutionLayer<T::EthSpec>>,
store: BeaconStore<T>,
task_executor: TaskExecutor,
check_caches: CheckCaches,
) -> Result<Arc<Self>, BeaconChainError> {
let execution_layer = execution_layer_opt
.as_ref()
.ok_or(BeaconChainError::ExecutionLayerMissing)?
.clone();

Ok(Arc::new(Self {
execution_layer,
store,
task_executor,
_check_caches: check_caches,
}))
}

// TODO(gloas) simply a stub impl for now. Should check some exec payload envelope cache
// and return the envelope if it exists in the cache
fn check_payload_envelope_cache(
&self,
_beacon_block_root: Hash256,
) -> Option<Arc<SignedExecutionPayloadEnvelope<T::EthSpec>>> {
// if self.check_caches == CheckCaches::Yes
None
}

// used when the execution engine doesn't support the payload bodies methods
async fn stream_payload_envelopes_fallback(
self: Arc<Self>,
beacon_block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<PayloadEnvelopeResult<T::EthSpec>>)>,
) {
debug!("Using slower fallback method of eth_getBlockByHash()");
for beacon_block_root in beacon_block_roots {
let cached_envelope = self.check_payload_envelope_cache(beacon_block_root);

let envelope_result = if cached_envelope.is_some() {
Ok(cached_envelope)
} else {
// TODO(gloas) we'll want to use the execution layer directly to call
// the engine api method eth_getBlockByHash()
self.store
.get_payload_envelope(&beacon_block_root)
.map(|opt_envelope| opt_envelope.map(Arc::new))
.map_err(BeaconChainError::DBError)
};

if sender
.send((beacon_block_root, Arc::new(envelope_result)))
.is_err()
{
break;
}
}
}

pub async fn stream(
self: Arc<Self>,
beacon_block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<PayloadEnvelopeResult<T::EthSpec>>)>,
) {
match self
.execution_layer
.get_engine_capabilities(None)
.await
.map_err(Box::new)
.map_err(BeaconChainError::EngineGetCapabilititesFailed)
{
Ok(_engine_capabilities) => {
// TODO(gloas) should check engine capabilities for get_payload_bodies_by_range_v1
self.stream_payload_envelopes_fallback(beacon_block_roots, sender)
.await;
}
Err(e) => {
send_errors(beacon_block_roots, sender, e).await;
}
}
}

pub fn launch_stream(
self: Arc<Self>,
beacon_block_roots: Vec<Hash256>,
) -> impl Stream<Item = (Hash256, Arc<PayloadEnvelopeResult<T::EthSpec>>)> {
let (envelope_tx, envelope_rx) = mpsc::unbounded_channel();
debug!(
envelopes = beacon_block_roots.len(),
"Launching a PayloadEnvelopeStreamer"
);
let executor = self.task_executor.clone();
executor.spawn(
self.stream(beacon_block_roots, envelope_tx),
"get_payload_envelopes_sender",
);
UnboundedReceiverStream::new(envelope_rx)
}
}

async fn send_errors<E: EthSpec>(
beacon_block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<PayloadEnvelopeResult<E>>)>,
beacon_chain_error: BeaconChainError,
) {
let result = Arc::new(Err(beacon_chain_error));
for beacon_block_root in beacon_block_roots {
if sender.send((beacon_block_root, result.clone())).is_err() {
break;
}
}
}
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod early_attester_cache;
mod errors;
pub mod events;
pub mod execution_payload;
pub mod execution_payload_envelope_streamer;
pub mod fetch_blobs;
pub mod fork_choice_signal;
pub mod graffiti_calculator;
Expand Down
31 changes: 28 additions & 3 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,8 @@ pub enum Work<E: EthSpec> {
Status(BlockingFn),
BlocksByRangeRequest(AsyncFn),
BlocksByRootsRequest(AsyncFn),
PayloadEnvelopesByRangeRequest(AsyncFn),
PayloadEnvelopesByRootRequest(AsyncFn),
BlobsByRangeRequest(BlockingFn),
BlobsByRootsRequest(BlockingFn),
DataColumnsByRootsRequest(BlockingFn),
Expand Down Expand Up @@ -464,6 +466,8 @@ pub enum WorkType {
Status,
BlocksByRangeRequest,
BlocksByRootsRequest,
PayloadEnvelopesByRangeRequest,
PayloadEnvelopesByRootRequest,
BlobsByRangeRequest,
BlobsByRootsRequest,
DataColumnsByRootsRequest,
Expand Down Expand Up @@ -522,6 +526,8 @@ impl<E: EthSpec> Work<E> {
Work::Status(_) => WorkType::Status,
Work::BlocksByRangeRequest(_) => WorkType::BlocksByRangeRequest,
Work::BlocksByRootsRequest(_) => WorkType::BlocksByRootsRequest,
Work::PayloadEnvelopesByRangeRequest(_) => WorkType::PayloadEnvelopesByRangeRequest,
Work::PayloadEnvelopesByRootRequest(_) => WorkType::PayloadEnvelopesByRootRequest,
Work::BlobsByRangeRequest(_) => WorkType::BlobsByRangeRequest,
Work::BlobsByRootsRequest(_) => WorkType::BlobsByRootsRequest,
Work::DataColumnsByRootsRequest(_) => WorkType::DataColumnsByRootsRequest,
Expand Down Expand Up @@ -969,6 +975,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
Some(item)
} else if let Some(item) = work_queues.dcbrange_queue.pop() {
Some(item)
} else if let Some(item) = work_queues.payload_envelopes_brange_queue.pop()
{
Some(item)
} else if let Some(item) = work_queues.payload_envelopes_broots_queue.pop()
{
Some(item)
// Check slashings after all other consensus messages so we prioritize
// following head.
//
Expand Down Expand Up @@ -1155,6 +1167,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::BlocksByRootsRequest { .. } => {
work_queues.block_broots_queue.push(work, work_id)
}
Work::PayloadEnvelopesByRangeRequest { .. } => work_queues
.payload_envelopes_brange_queue
.push(work, work_id),
Work::PayloadEnvelopesByRootRequest { .. } => work_queues
.payload_envelopes_broots_queue
.push(work, work_id),
Work::BlobsByRangeRequest { .. } => {
work_queues.blob_brange_queue.push(work, work_id)
}
Expand Down Expand Up @@ -1270,6 +1288,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::Status => work_queues.status_queue.len(),
WorkType::BlocksByRangeRequest => work_queues.block_brange_queue.len(),
WorkType::BlocksByRootsRequest => work_queues.block_broots_queue.len(),
WorkType::PayloadEnvelopesByRangeRequest => {
work_queues.payload_envelopes_brange_queue.len()
}
WorkType::PayloadEnvelopesByRootRequest => {
work_queues.payload_envelopes_broots_queue.len()
}
WorkType::BlobsByRangeRequest => work_queues.blob_brange_queue.len(),
WorkType::BlobsByRootsRequest => work_queues.blob_broots_queue.len(),
WorkType::DataColumnsByRootsRequest => work_queues.dcbroots_queue.len(),
Expand Down Expand Up @@ -1456,9 +1480,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
| Work::DataColumnsByRangeRequest(process_fn) => {
task_spawner.spawn_blocking(process_fn)
}
Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
task_spawner.spawn_async(work)
}
Work::BlocksByRangeRequest(work)
| Work::BlocksByRootsRequest(work)
| Work::PayloadEnvelopesByRangeRequest(work)
| Work::PayloadEnvelopesByRootRequest(work) => task_spawner.spawn_async(work),
Work::ChainSegmentBackfill(process_fn) => {
if self.config.enable_backfill_rate_limiting {
task_spawner.spawn_blocking_with_rayon(RayonPoolType::LowPriority, process_fn)
Expand Down
12 changes: 12 additions & 0 deletions beacon_node/beacon_processor/src/scheduler/work_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ pub struct BeaconProcessorQueueLengths {
blob_brange_queue: usize,
dcbroots_queue: usize,
dcbrange_queue: usize,
payload_envelopes_brange_queue: usize,
payload_envelopes_broots_queue: usize,
gossip_bls_to_execution_change_queue: usize,
gossip_execution_payload_queue: usize,
gossip_execution_payload_bid_queue: usize,
Expand Down Expand Up @@ -204,6 +206,8 @@ impl BeaconProcessorQueueLengths {
blob_brange_queue: 1024,
dcbroots_queue: 1024,
dcbrange_queue: 1024,
payload_envelopes_brange_queue: 1024,
payload_envelopes_broots_queue: 1024,
gossip_bls_to_execution_change_queue: 16384,
// TODO(EIP-7732): verify 1024 is preferable. I used same value as `gossip_block_queue` and `gossip_blob_queue`
gossip_execution_payload_queue: 1024,
Expand Down Expand Up @@ -253,6 +257,8 @@ pub struct WorkQueues<E: EthSpec> {
pub status_queue: FifoQueue<Work<E>>,
pub block_brange_queue: FifoQueue<Work<E>>,
pub block_broots_queue: FifoQueue<Work<E>>,
pub payload_envelopes_brange_queue: FifoQueue<Work<E>>,
pub payload_envelopes_broots_queue: FifoQueue<Work<E>>,
pub blob_broots_queue: FifoQueue<Work<E>>,
pub blob_brange_queue: FifoQueue<Work<E>>,
pub dcbroots_queue: FifoQueue<Work<E>>,
Expand Down Expand Up @@ -323,6 +329,10 @@ impl<E: EthSpec> WorkQueues<E> {
let blob_brange_queue = FifoQueue::new(queue_lengths.blob_brange_queue);
let dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue);
let dcbrange_queue = FifoQueue::new(queue_lengths.dcbrange_queue);
let payload_envelopes_brange_queue =
FifoQueue::new(queue_lengths.payload_envelopes_brange_queue);
let payload_envelopes_broots_queue =
FifoQueue::new(queue_lengths.payload_envelopes_broots_queue);

let gossip_bls_to_execution_change_queue =
FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue);
Expand Down Expand Up @@ -382,6 +392,8 @@ impl<E: EthSpec> WorkQueues<E> {
blob_brange_queue,
dcbroots_queue,
dcbrange_queue,
payload_envelopes_brange_queue,
payload_envelopes_broots_queue,
gossip_bls_to_execution_change_queue,
gossip_execution_payload_queue,
gossip_execution_payload_bid_queue,
Expand Down
6 changes: 6 additions & 0 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,8 @@ impl<E: EthSpec> PeerManager<E> {
Protocol::BlocksByRange => PeerAction::MidToleranceError,
Protocol::BlocksByRoot => PeerAction::MidToleranceError,
Protocol::BlobsByRange => PeerAction::MidToleranceError,
Protocol::PayloadEnvelopesByRange => PeerAction::MidToleranceError,
Protocol::PayloadEnvelopesByRoot => PeerAction::MidToleranceError,
// Lighthouse does not currently make light client requests; therefore, this
// is an unexpected scenario. We do not ban the peer for rate limiting.
Protocol::LightClientBootstrap => return,
Expand All @@ -615,6 +617,8 @@ impl<E: EthSpec> PeerManager<E> {
Protocol::Ping => PeerAction::Fatal,
Protocol::BlocksByRange => return,
Protocol::BlocksByRoot => return,
Protocol::PayloadEnvelopesByRange => return,
Protocol::PayloadEnvelopesByRoot => return,
Protocol::BlobsByRange => return,
Protocol::BlobsByRoot => return,
Protocol::DataColumnsByRoot => return,
Expand All @@ -638,6 +642,8 @@ impl<E: EthSpec> PeerManager<E> {
Protocol::Ping => PeerAction::LowToleranceError,
Protocol::BlocksByRange => PeerAction::MidToleranceError,
Protocol::BlocksByRoot => PeerAction::MidToleranceError,
Protocol::PayloadEnvelopesByRange => PeerAction::MidToleranceError,
Protocol::PayloadEnvelopesByRoot => PeerAction::MidToleranceError,
Protocol::BlobsByRange => PeerAction::MidToleranceError,
Protocol::BlobsByRoot => PeerAction::MidToleranceError,
Protocol::DataColumnsByRoot => PeerAction::MidToleranceError,
Expand Down
Loading