Skip to content
8 changes: 6 additions & 2 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,11 @@ pub enum Work<E: EthSpec> {
IgnoredRpcBlock {
process_fn: BlockingFn,
},
ChainSegment(AsyncFn),
ChainSegment {
process_fn: AsyncFn,
/// (chain_id, batch_epoch) for test observability
process_id: (u32, u64),
},
ChainSegmentBackfill(BlockingFn),
Status(BlockingFn),
BlocksByRangeRequest(AsyncFn),
Expand Down Expand Up @@ -1423,7 +1427,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
} => task_spawner.spawn_blocking(move || {
process_batch(aggregates);
}),
Work::ChainSegment(process_fn) => task_spawner.spawn_async(async move {
Work::ChainSegment { process_fn, .. } => task_spawner.spawn_async(async move {
process_fn.await;
}),
Work::UnknownBlockAttestation { process_fn }
Expand Down
7 changes: 5 additions & 2 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,11 +619,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// Back-sync batches are dispatched with a different `Work` variant so
// they can be rate-limited.
let work = match process_id {
ChainSegmentProcessId::RangeBatchId(_, _) => {
ChainSegmentProcessId::RangeBatchId(chain_id, epoch) => {
let process_fn = async move {
processor.process_chain_segment(process_id, blocks).await;
};
Work::ChainSegment(Box::pin(process_fn))
Work::ChainSegment {
process_fn: Box::pin(process_fn),
process_id: (chain_id, epoch.as_u64()),
}
}
ChainSegmentProcessId::BackSyncBatchId(_) => {
let process_fn =
Expand Down
Loading
Loading