Skip to content

Commit aef8291

Browse files
authored
Add max delay to reconstruction (#7976)
#7697 If we're three seconds into the current slot just trigger reconstruction. I don't know what the correct reconstruction deadline number is, but it should probably be at least half a second before the attestation deadline Co-Authored-By: Eitan Seri- Levi <[email protected]> Co-Authored-By: Eitan Seri-Levi <[email protected]>
1 parent fb77ce9 commit aef8291

File tree

3 files changed

+253
-6
lines changed

3 files changed

+253
-6
lines changed

beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ pub const BACKFILL_SCHEDULE_IN_SLOT: [(u32, u32); 3] = [
8282
(4, 5),
8383
];
8484

85+
/// Trigger reconstruction if we are this many seconds into the current slot
86+
pub const RECONSTRUCTION_DEADLINE: Duration = Duration::from_millis(3000);
87+
8588
/// Messages that the scheduler can receive.
8689
#[derive(AsRefStr)]
8790
pub enum ReprocessQueueMessage {
@@ -172,6 +175,7 @@ pub struct QueuedBackfillBatch(pub AsyncFn);
172175

173176
pub struct QueuedColumnReconstruction {
174177
pub block_root: Hash256,
178+
pub slot: Slot,
175179
pub process_fn: AsyncFn,
176180
}
177181

@@ -749,16 +753,26 @@ impl<S: SlotClock> ReprocessQueue<S> {
749753
}
750754
}
751755
InboundEvent::Msg(DelayColumnReconstruction(request)) => {
756+
let mut reconstruction_delay = QUEUED_RECONSTRUCTION_DELAY;
757+
if let Some(seconds_from_current_slot) =
758+
self.slot_clock.seconds_from_current_slot_start()
759+
&& let Some(current_slot) = self.slot_clock.now()
760+
&& seconds_from_current_slot >= RECONSTRUCTION_DEADLINE
761+
&& current_slot == request.slot
762+
{
763+
// If we are at least `RECONSTRUCTION_DEADLINE` seconds into the current slot,
764+
// and the reconstruction request is for the current slot, process reconstruction immediately.
765+
reconstruction_delay = Duration::from_secs(0);
766+
}
752767
match self.queued_column_reconstructions.entry(request.block_root) {
753768
Entry::Occupied(key) => {
754-
// Push back the reattempted reconstruction
755769
self.column_reconstructions_delay_queue
756-
.reset(key.get(), QUEUED_RECONSTRUCTION_DELAY)
770+
.reset(key.get(), reconstruction_delay);
757771
}
758772
Entry::Vacant(vacant) => {
759773
let delay_key = self
760774
.column_reconstructions_delay_queue
761-
.insert(request, QUEUED_RECONSTRUCTION_DELAY);
775+
.insert(request, reconstruction_delay);
762776
vacant.insert(delay_key);
763777
}
764778
}

beacon_node/network/src/network_beacon_processor/gossip_methods.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,6 +1064,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
10641064
work: Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction(
10651065
QueuedColumnReconstruction {
10661066
block_root,
1067+
slot: *slot,
10671068
process_fn: Box::pin(async move {
10681069
cloned_self
10691070
.attempt_data_column_reconstruction(block_root, true)

beacon_node/network/src/network_beacon_processor/tests.rs

Lines changed: 235 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,20 @@ impl TestRig {
9494
// This allows for testing voluntary exits without building out a massive chain.
9595
let mut spec = test_spec::<E>();
9696
spec.shard_committee_period = 2;
97-
Self::new_parametric(chain_length, BeaconProcessorConfig::default(), spec).await
97+
Self::new_parametric(chain_length, BeaconProcessorConfig::default(), false, spec).await
98+
}
99+
100+
pub async fn new_supernode(chain_length: u64) -> Self {
101+
// This allows for testing voluntary exits without building out a massive chain.
102+
let mut spec = test_spec::<E>();
103+
spec.shard_committee_period = 2;
104+
Self::new_parametric(chain_length, BeaconProcessorConfig::default(), true, spec).await
98105
}
99106

100107
pub async fn new_parametric(
101108
chain_length: u64,
102109
beacon_processor_config: BeaconProcessorConfig,
110+
import_data_columns: bool,
103111
spec: ChainSpec,
104112
) -> Self {
105113
let spec = Arc::new(spec);
@@ -108,6 +116,7 @@ impl TestRig {
108116
.deterministic_keypairs(VALIDATOR_COUNT)
109117
.fresh_ephemeral_store()
110118
.mock_execution_layer()
119+
.import_all_data_columns(import_data_columns)
111120
.chain_config(<_>::default())
112121
.build();
113122

@@ -601,6 +610,40 @@ impl TestRig {
601610
.await
602611
}
603612

613+
pub async fn assert_event_journal_completes_with_timeout(
614+
&mut self,
615+
expected: &[WorkType],
616+
timeout: Duration,
617+
) {
618+
self.assert_event_journal_with_timeout(
619+
&expected
620+
.iter()
621+
.map(Into::<&'static str>::into)
622+
.chain(std::iter::once(WORKER_FREED))
623+
.chain(std::iter::once(NOTHING_TO_DO))
624+
.collect::<Vec<_>>(),
625+
timeout,
626+
)
627+
.await
628+
}
629+
630+
pub async fn assert_event_journal_does_not_complete_with_timeout(
631+
&mut self,
632+
expected: &[WorkType],
633+
timeout: Duration,
634+
) {
635+
self.assert_not_in_event_journal_with_timeout(
636+
&expected
637+
.iter()
638+
.map(Into::<&'static str>::into)
639+
.chain(std::iter::once(WORKER_FREED))
640+
.chain(std::iter::once(NOTHING_TO_DO))
641+
.collect::<Vec<_>>(),
642+
timeout,
643+
)
644+
.await
645+
}
646+
604647
pub async fn assert_event_journal_completes(&mut self, expected: &[WorkType]) {
605648
self.assert_event_journal(
606649
&expected
@@ -651,6 +694,37 @@ impl TestRig {
651694
assert_eq!(events, expected);
652695
}
653696

697+
/// Assert that the `BeaconProcessor` event journal is not as `expected`.
698+
pub async fn assert_not_in_event_journal_with_timeout(
699+
&mut self,
700+
expected: &[&str],
701+
timeout: Duration,
702+
) {
703+
let mut events = Vec::with_capacity(expected.len());
704+
705+
let drain_future = async {
706+
while let Some(event) = self.work_journal_rx.recv().await {
707+
events.push(event);
708+
709+
// Break as soon as we collect the desired number of events.
710+
if events.len() >= expected.len() {
711+
break;
712+
}
713+
}
714+
};
715+
716+
// Panic if we don't time out.
717+
tokio::select! {
718+
_ = tokio::time::sleep(timeout) => {},
719+
_ = drain_future => panic!(
720+
"Got events before timeout. Expected no events but got {:?}",
721+
events
722+
),
723+
}
724+
725+
assert_ne!(events, expected);
726+
}
727+
654728
/// Listen for network messages and collect them for a specified duration or until reaching a count.
655729
///
656730
/// Returns None if no messages were received, or Some(Vec) containing the received messages.
@@ -743,6 +817,159 @@ fn junk_message_id() -> MessageId {
743817
MessageId::new(&[])
744818
}
745819

820+
// Test that column reconstruction is delayed for columns that arrive
821+
// at the beginning of the slot.
822+
#[tokio::test]
823+
async fn data_column_reconstruction_at_slot_start() {
824+
if test_spec::<E>().fulu_fork_epoch.is_none() {
825+
return;
826+
};
827+
828+
let mut rig = TestRig::new_supernode(SMALL_CHAIN).await;
829+
830+
let slot_start = rig
831+
.chain
832+
.slot_clock
833+
.start_of(rig.next_block.slot())
834+
.unwrap();
835+
836+
rig.chain
837+
.slot_clock
838+
.set_current_time(slot_start - rig.chain.spec.maximum_gossip_clock_disparity());
839+
840+
assert_eq!(
841+
rig.chain.slot().unwrap(),
842+
rig.next_block.slot() - 1,
843+
"chain should be at the correct slot"
844+
);
845+
846+
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
847+
for i in 0..num_data_columns {
848+
rig.enqueue_gossip_data_columns(i);
849+
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
850+
.await;
851+
}
852+
853+
if num_data_columns > 0 {
854+
// Reconstruction is delayed by 100ms, we should not be able to complete
855+
// reconstruction up to this point
856+
rig.assert_event_journal_does_not_complete_with_timeout(
857+
&[WorkType::ColumnReconstruction],
858+
Duration::from_millis(100),
859+
)
860+
.await;
861+
862+
// We've waited at least 150ms, reconstruction can now be triggered
863+
rig.assert_event_journal_completes_with_timeout(
864+
&[WorkType::ColumnReconstruction],
865+
Duration::from_millis(200),
866+
)
867+
.await;
868+
}
869+
}
870+
871+
// Test that column reconstruction happens immediately for columns that arrive at the
872+
// reconstruction deadline.
873+
#[tokio::test]
874+
async fn data_column_reconstruction_at_deadline() {
875+
if test_spec::<E>().fulu_fork_epoch.is_none() {
876+
return;
877+
};
878+
879+
let mut rig = TestRig::new_supernode(SMALL_CHAIN).await;
880+
881+
let slot_start = rig
882+
.chain
883+
.slot_clock
884+
.start_of(rig.next_block.slot())
885+
.unwrap();
886+
887+
rig.chain
888+
.slot_clock
889+
.set_current_time(slot_start - rig.chain.spec.maximum_gossip_clock_disparity());
890+
891+
assert_eq!(
892+
rig.chain.slot().unwrap(),
893+
rig.next_block.slot() - 1,
894+
"chain should be at the correct slot"
895+
);
896+
897+
// We push the slot clock to 3 seconds into the slot, this is the deadline to trigger reconstruction.
898+
rig.chain
899+
.slot_clock
900+
.set_current_time(slot_start + Duration::from_secs(3));
901+
902+
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
903+
for i in 0..num_data_columns {
904+
rig.enqueue_gossip_data_columns(i);
905+
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
906+
.await;
907+
}
908+
909+
// Since we're at the reconstruction deadline, reconstruction should be triggered immediately
910+
if num_data_columns > 0 {
911+
rig.assert_event_journal_completes_with_timeout(
912+
&[WorkType::ColumnReconstruction],
913+
Duration::from_millis(50),
914+
)
915+
.await;
916+
}
917+
}
918+
919+
// Test the column reconstruction is delayed for columns that arrive for a previous slot.
920+
#[tokio::test]
921+
async fn data_column_reconstruction_at_next_slot() {
922+
if test_spec::<E>().fulu_fork_epoch.is_none() {
923+
return;
924+
};
925+
926+
let mut rig = TestRig::new_supernode(SMALL_CHAIN).await;
927+
928+
let slot_start = rig
929+
.chain
930+
.slot_clock
931+
.start_of(rig.next_block.slot())
932+
.unwrap();
933+
934+
rig.chain
935+
.slot_clock
936+
.set_current_time(slot_start - rig.chain.spec.maximum_gossip_clock_disparity());
937+
938+
assert_eq!(
939+
rig.chain.slot().unwrap(),
940+
rig.next_block.slot() - 1,
941+
"chain should be at the correct slot"
942+
);
943+
944+
// We push the slot clock to the next slot.
945+
rig.chain
946+
.slot_clock
947+
.set_current_time(slot_start + Duration::from_secs(12));
948+
949+
let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0);
950+
for i in 0..num_data_columns {
951+
rig.enqueue_gossip_data_columns(i);
952+
rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar])
953+
.await;
954+
}
955+
956+
if num_data_columns > 0 {
957+
// Since we are in the next slot reconstruction for the previous slot should be delayed again
958+
rig.assert_event_journal_does_not_complete_with_timeout(
959+
&[WorkType::ColumnReconstruction],
960+
Duration::from_millis(100),
961+
)
962+
.await;
963+
964+
// We've waited at least 150ms, reconstruction can now be triggered
965+
rig.assert_event_journal_completes_with_timeout(
966+
&[WorkType::ColumnReconstruction],
967+
Duration::from_millis(200),
968+
)
969+
.await;
970+
}
971+
}
972+
746973
/// Blocks that arrive early should be queued for later processing.
747974
#[tokio::test]
748975
async fn import_gossip_block_acceptably_early() {
@@ -1359,8 +1586,13 @@ async fn test_backfill_sync_processing_rate_limiting_disabled() {
13591586
enable_backfill_rate_limiting: false,
13601587
..Default::default()
13611588
};
1362-
let mut rig =
1363-
TestRig::new_parametric(SMALL_CHAIN, beacon_processor_config, test_spec::<E>()).await;
1589+
let mut rig = TestRig::new_parametric(
1590+
SMALL_CHAIN,
1591+
beacon_processor_config,
1592+
false,
1593+
test_spec::<E>(),
1594+
)
1595+
.await;
13641596

13651597
for _ in 0..3 {
13661598
rig.enqueue_backfill_batch();

0 commit comments

Comments
 (0)