Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 71 additions & 7 deletions beacon_node/network/src/network_beacon_processor/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,22 +279,42 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.collect::<Vec<_>>();
let mut send_blob_count = 0;

let fulu_start_slot = self
.chain
.spec
.fulu_fork_epoch
.map(|epoch| epoch.start_slot(T::EthSpec::slots_per_epoch()));

let mut blob_list_results = HashMap::new();
for id in request.blob_ids.as_slice() {
let BlobIdentifier {
block_root: root,
index,
} = id;

// First attempt to get the blobs from the RPC cache.
if let Ok(Some(blob)) = self.chain.data_availability_checker.get_blob(id) {
// Check if the blob requested is from a Fulu slot, if so, skip the current blob id and proceed to the next
Copy link
Member

@jimmygchen jimmygchen Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if the block is after fulu slot, this condition will never satisfy, as the result will always be Ok(None) as we never store blobs from Fulu. I don't think we need to handle it here as it should be unreachable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revised in bf20880

if let Some(fulu_slot) = fulu_start_slot {
if blob.slot() >= fulu_slot {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified slightly using fulu_start_slot.is_some_and(|fulu_slot| blob.slot() >= fulu_slot).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fulu_slot is used in the logging below. So after changing, the compiler is saying fulu_sot not found.

Should I remove fulu_slot from the logging?

debug!(
%peer_id,
request_root = %root,
blob_slot = %blob.slot(),
%fulu_slot,
"BlobsByRoot request is at or after Fulu slot, returning empty response"
);
continue;
}
}

self.send_response(
peer_id,
inbound_request_id,
Response::BlobsByRoot(Some(blob)),
);
send_blob_count += 1;
} else {
let BlobIdentifier {
block_root: root,
index,
} = id;

let blob_list_result = match blob_list_results.entry(root) {
Entry::Vacant(entry) => {
entry.insert(self.chain.get_blobs_checking_early_attester_cache(root))
Expand All @@ -306,6 +326,20 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Ok(blobs_sidecar_list) => {
'inner: for blob_sidecar in blobs_sidecar_list.iter() {
if blob_sidecar.index == *index {
// Same logic as above to check for Fulu slot
if let Some(fulu_slot) = fulu_start_slot {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will also be unreachable.

The purpose to avoid serving post-fulu slots in the byRange method is to save us work from doing block roots lookup - it doesn't seem as relevant here, as we've already hit the database and incurred the cost.

The spec also says the following, so we can't really penalise these peers during this deprecation period

Clients SHOULD NOT penalize peers for requesting blob sidecars from FULU_FORK_EPOCH.

I think we could probably check all the caches to see if we have the block before hitting the database?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revised in bf20880, trying to avoid touching the database

if blob_sidecar.slot() >= fulu_slot {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above

debug!(
%peer_id,
request_root = %root,
blob_slot = %blob_sidecar.slot(),
%fulu_slot,
"BlobsByRoot request is at or after Fulu slot, returning empty response"
);
break 'inner;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this block - if we have the blob then it means fulu is not activated and there's no need to check for fulu slot

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in 0cbd9fd


self.send_response(
peer_id,
inbound_request_id,
Expand Down Expand Up @@ -884,6 +918,36 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);

let request_start_slot = Slot::from(req.start_slot);
// This variable may only change when the request_start_slot + req.count spans across the Fulu fork slot
let mut effective_count = req.count;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could make the if / else block return the count, and that would allow you to remove the mut

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to revise this in 32c3ab0, not sure if it is good?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep that's better!


if let Some(fulu_epoch) = self.chain.spec.fulu_fork_epoch {
let fulu_start_slot = fulu_epoch.start_slot(T::EthSpec::slots_per_epoch());
let request_end_slot = request_start_slot + req.count - 1;

// If the request_start_slot is at or after a Fulu slot, return empty response
if request_start_slot >= fulu_start_slot {
debug!(
%peer_id,
%request_start_slot,
%fulu_start_slot,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we can remove all the fulu start slot logging to reduce the logging noise - this doesn't give us much during debugging as there's many easy way to find out, e.g. /config/spec, network config etc

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All fulu-related logging removed in 0cbd9fd

returned = 0,
"BlobsByRange request is at or after a Fulu slot, returning empty response"
);
return Ok(());
// For the case that the request slots spans across the Fulu fork slot
} else if request_start_slot < fulu_start_slot && request_end_slot >= fulu_start_slot {
effective_count = (fulu_start_slot - request_start_slot).as_u64();
debug!(
%peer_id,
%request_start_slot,
%fulu_start_slot,
requested = req.count,
returned = effective_count,
"BlobsByRange request spans across Fulu fork, only serving blobs before Fulu slots"
)
}
}

let data_availability_boundary_slot = match self.chain.data_availability_boundary() {
Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()),
Expand Down Expand Up @@ -921,7 +985,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}

let block_roots =
self.get_block_roots_for_slot_range(req.start_slot, req.count, "BlobsByRange")?;
self.get_block_roots_for_slot_range(req.start_slot, effective_count, "BlobsByRange")?;

let current_slot = self
.chain
Expand All @@ -948,7 +1012,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// Due to skip slots, blobs could be out of the range, we ensure they
// are in the range before sending
if blob_sidecar.slot() >= request_start_slot
&& blob_sidecar.slot() < request_start_slot + req.count
&& blob_sidecar.slot() < request_start_slot + effective_count
{
blobs_sent += 1;
self.send_network_message(NetworkMessage::SendResponse {
Expand Down
163 changes: 153 additions & 10 deletions beacon_node/network/src/network_beacon_processor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use beacon_chain::{BeaconChain, WhenSlotSkipped};
use beacon_processor::{work_reprocessing_queue::*, *};
use gossipsub::MessageAcceptance;
use itertools::Itertools;
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3};
use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest, MetaDataV3};
use lighthouse_network::rpc::InboundRequestId;
use lighthouse_network::{
discv5::enr::{self, CombinedKey},
Expand All @@ -34,11 +34,12 @@ use std::iter::Iterator;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList};
use types::{
AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList,
DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof,
SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot, SubnetId,
DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, RuntimeVariableList,
SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot,
SubnetId,
};

type E = MainnetEthSpec;
Expand Down Expand Up @@ -417,15 +418,22 @@ impl TestRig {
}
}

pub fn enqueue_blobs_by_range_request(&self, count: u64) {
pub fn enqueue_blobs_by_range_request(&self, start_slot: u64, count: u64) {
self.network_beacon_processor
.send_blobs_by_range_request(
PeerId::random(),
InboundRequestId::new_unchecked(42, 24),
BlobsByRangeRequest {
start_slot: 0,
count,
},
BlobsByRangeRequest { start_slot, count },
)
.unwrap();
}

pub fn enqueue_blobs_by_root_request(&self, blob_ids: RuntimeVariableList<BlobIdentifier>) {
self.network_beacon_processor
.send_blobs_by_roots_request(
PeerId::random(),
InboundRequestId::new_unchecked(42, 24),
BlobsByRootRequest { blob_ids },
)
.unwrap();
}
Expand Down Expand Up @@ -1325,8 +1333,9 @@ async fn test_blobs_by_range() {
return;
};
let mut rig = TestRig::new(64).await;
let start_slot = 0;
let slot_count = 32;
rig.enqueue_blobs_by_range_request(slot_count);
rig.enqueue_blobs_by_range_request(start_slot, slot_count);

let mut blob_count = 0;
for slot in 0..slot_count {
Expand Down Expand Up @@ -1362,3 +1371,137 @@ async fn test_blobs_by_range() {
}
assert_eq!(blob_count, actual_count);
}

#[tokio::test]
async fn test_blobs_by_range_post_fulu_should_return_empty() {
// Only test for Fulu fork
if test_spec::<E>().fulu_fork_epoch.is_none() {
return;
};
let mut rig = TestRig::new(64).await;
let start_slot = 0;
let slot_count = 32;
rig.enqueue_blobs_by_range_request(start_slot, slot_count);

let mut actual_count = 0;

while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
response: Response::BlobsByRange(blob),
inbound_request_id: _,
} = next
{
if blob.is_some() {
actual_count += 1;
} else {
break;
}
} else {
panic!("unexpected message {:?}", next);
}
}
// Post-Fulu should return 0 blobs
assert_eq!(0, actual_count);
}

#[tokio::test]
async fn test_blobs_by_range_spans_fulu_fork() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this test would fail before your PR?
but i think it's useful that it covers blobs by range works before Fulu

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this test wouldn't fail even without this PR change, which is why I think the test is not robust enough.

I have already started from mid-epoch slot, and the blob request spans across fork boundary. I wonder if I am missing something in the test?

// Only test for Electra & Fulu fork transition
if test_spec::<E>().electra_fork_epoch.is_none() {
return;
};
let mut spec = test_spec::<E>();
spec.fulu_fork_epoch = Some(Epoch::new(1));

let mut rig = TestRig::new_parametric(64, BeaconProcessorConfig::default(), spec).await;

let start_slot = 16;
// This will span from epoch 0 (Electra) to epoch 1 (Fulu)
let slot_count = 32;

rig.enqueue_blobs_by_range_request(start_slot, slot_count);

let mut blob_count = 0;
for slot in start_slot..slot_count {
let root = rig
.chain
.block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None)
.unwrap();
blob_count += root
.map(|root| {
rig.chain
.get_blobs(&root)
.map(|list| list.len())
.unwrap_or(0)
})
.unwrap_or(0);
}

let mut actual_count = 0;

while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
response: Response::BlobsByRange(blob),
inbound_request_id: _,
} = next
{
if blob.is_some() {
actual_count += 1;
} else {
break;
}
} else {
panic!("unexpected message {:?}", next);
}
}
assert_eq!(blob_count, actual_count);
}

#[tokio::test]
async fn test_blobs_by_root_post_fulu_should_return_empty() {
// Only test for Fulu fork
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it would be worth adding a test to confirm it still work right before fulu fork?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test in 0cbd9fd

if test_spec::<E>().fulu_fork_epoch.is_none() {
return;
};

let mut rig = TestRig::new(64).await;

// Get the block root of a sample slot, e.g., slot 1
let block_root = rig
.chain
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
.unwrap()
.unwrap();

let blob_ids = vec![BlobIdentifier {
block_root,
index: 0,
}];

let blob_ids_list = RuntimeVariableList::new(blob_ids, 1).unwrap();

rig.enqueue_blobs_by_root_request(blob_ids_list);

let mut actual_count = 0;

while let Some(next) = rig.network_rx.recv().await {
if let NetworkMessage::SendResponse {
peer_id: _,
response: Response::BlobsByRoot(blob),
inbound_request_id: _,
} = next
{
if blob.is_some() {
actual_count += 1;
} else {
break;
}
} else {
panic!("unexpected message {:?}", next);
}
}
// Post-Fulu should return 0 blobs
assert_eq!(0, actual_count);
}