@@ -67,11 +67,11 @@ use types::{
67
67
BeaconState , ChainSpec , EthSpec , Hash256 , RelativeEpoch , SignedAggregateAndProof ,
68
68
SingleAttestation , Slot , SubnetId ,
69
69
} ;
70
+ use work_reprocessing_queue:: IgnoredRpcBlock ;
70
71
use work_reprocessing_queue:: {
71
72
spawn_reprocess_scheduler, QueuedAggregate , QueuedLightClientUpdate , QueuedRpcBlock ,
72
73
QueuedUnaggregate , ReadyWork ,
73
74
} ;
74
- use work_reprocessing_queue:: { IgnoredRpcBlock , QueuedSamplingRequest } ;
75
75
76
76
mod metrics;
77
77
pub mod scheduler;
@@ -112,12 +112,9 @@ pub struct BeaconProcessorQueueLengths {
112
112
gossip_proposer_slashing_queue : usize ,
113
113
gossip_attester_slashing_queue : usize ,
114
114
unknown_light_client_update_queue : usize ,
115
- unknown_block_sampling_request_queue : usize ,
116
115
rpc_block_queue : usize ,
117
116
rpc_blob_queue : usize ,
118
117
rpc_custody_column_queue : usize ,
119
- rpc_verify_data_column_queue : usize ,
120
- sampling_result_queue : usize ,
121
118
column_reconstruction_queue : usize ,
122
119
chain_segment_queue : usize ,
123
120
backfill_chain_segment : usize ,
@@ -183,9 +180,6 @@ impl BeaconProcessorQueueLengths {
183
180
rpc_blob_queue : 1024 ,
184
181
// TODO(das): Placeholder values
185
182
rpc_custody_column_queue : 1000 ,
186
- rpc_verify_data_column_queue : 1000 ,
187
- unknown_block_sampling_request_queue : 16384 ,
188
- sampling_result_queue : 1000 ,
189
183
column_reconstruction_queue : 64 ,
190
184
chain_segment_queue : 64 ,
191
185
backfill_chain_segment : 64 ,
@@ -487,10 +481,6 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
487
481
process_fn,
488
482
} ,
489
483
} ,
490
- ReadyWork :: SamplingRequest ( QueuedSamplingRequest { process_fn, .. } ) => Self {
491
- drop_during_sync : true ,
492
- work : Work :: UnknownBlockSamplingRequest { process_fn } ,
493
- } ,
494
484
ReadyWork :: BackfillSync ( QueuedBackfillBatch ( process_fn) ) => Self {
495
485
drop_during_sync : false ,
496
486
work : Work :: ChainSegmentBackfill ( process_fn) ,
@@ -582,9 +572,6 @@ pub enum Work<E: EthSpec> {
582
572
parent_root : Hash256 ,
583
573
process_fn : BlockingFn ,
584
574
} ,
585
- UnknownBlockSamplingRequest {
586
- process_fn : BlockingFn ,
587
- } ,
588
575
GossipAggregateBatch {
589
576
aggregates : Vec < GossipAggregatePackage < E > > ,
590
577
process_batch : Box < dyn FnOnce ( Vec < GossipAggregatePackage < E > > ) + Send + Sync > ,
@@ -611,8 +598,6 @@ pub enum Work<E: EthSpec> {
611
598
process_fn : AsyncFn ,
612
599
} ,
613
600
RpcCustodyColumn ( AsyncFn ) ,
614
- RpcVerifyDataColumn ( AsyncFn ) ,
615
- SamplingResult ( AsyncFn ) ,
616
601
ColumnReconstruction ( AsyncFn ) ,
617
602
IgnoredRpcBlock {
618
603
process_fn : BlockingFn ,
@@ -652,7 +637,6 @@ pub enum WorkType {
652
637
GossipAggregate ,
653
638
UnknownBlockAggregate ,
654
639
UnknownLightClientOptimisticUpdate ,
655
- UnknownBlockSamplingRequest ,
656
640
GossipAggregateBatch ,
657
641
GossipBlock ,
658
642
GossipBlobSidecar ,
@@ -668,8 +652,6 @@ pub enum WorkType {
668
652
RpcBlock ,
669
653
RpcBlobs ,
670
654
RpcCustodyColumn ,
671
- RpcVerifyDataColumn ,
672
- SamplingResult ,
673
655
ColumnReconstruction ,
674
656
IgnoredRpcBlock ,
675
657
ChainSegment ,
@@ -720,8 +702,6 @@ impl<E: EthSpec> Work<E> {
720
702
Work :: RpcBlock { .. } => WorkType :: RpcBlock ,
721
703
Work :: RpcBlobs { .. } => WorkType :: RpcBlobs ,
722
704
Work :: RpcCustodyColumn { .. } => WorkType :: RpcCustodyColumn ,
723
- Work :: RpcVerifyDataColumn { .. } => WorkType :: RpcVerifyDataColumn ,
724
- Work :: SamplingResult { .. } => WorkType :: SamplingResult ,
725
705
Work :: ColumnReconstruction ( _) => WorkType :: ColumnReconstruction ,
726
706
Work :: IgnoredRpcBlock { .. } => WorkType :: IgnoredRpcBlock ,
727
707
Work :: ChainSegment { .. } => WorkType :: ChainSegment ,
@@ -741,7 +721,6 @@ impl<E: EthSpec> Work<E> {
741
721
Work :: LightClientUpdatesByRangeRequest ( _) => WorkType :: LightClientUpdatesByRangeRequest ,
742
722
Work :: UnknownBlockAttestation { .. } => WorkType :: UnknownBlockAttestation ,
743
723
Work :: UnknownBlockAggregate { .. } => WorkType :: UnknownBlockAggregate ,
744
- Work :: UnknownBlockSamplingRequest { .. } => WorkType :: UnknownBlockSamplingRequest ,
745
724
Work :: UnknownLightClientOptimisticUpdate { .. } => {
746
725
WorkType :: UnknownLightClientOptimisticUpdate
747
726
}
@@ -884,14 +863,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
884
863
let mut rpc_block_queue = FifoQueue :: new ( queue_lengths. rpc_block_queue ) ;
885
864
let mut rpc_blob_queue = FifoQueue :: new ( queue_lengths. rpc_blob_queue ) ;
886
865
let mut rpc_custody_column_queue = FifoQueue :: new ( queue_lengths. rpc_custody_column_queue ) ;
887
- let mut rpc_verify_data_column_queue =
888
- FifoQueue :: new ( queue_lengths. rpc_verify_data_column_queue ) ;
889
- // TODO(das): the sampling_request_queue is never read
890
- let mut sampling_result_queue = FifoQueue :: new ( queue_lengths. sampling_result_queue ) ;
891
866
let mut column_reconstruction_queue =
892
867
FifoQueue :: new ( queue_lengths. column_reconstruction_queue ) ;
893
- let mut unknown_block_sampling_request_queue =
894
- FifoQueue :: new ( queue_lengths. unknown_block_sampling_request_queue ) ;
895
868
let mut chain_segment_queue = FifoQueue :: new ( queue_lengths. chain_segment_queue ) ;
896
869
let mut backfill_chain_segment = FifoQueue :: new ( queue_lengths. backfill_chain_segment ) ;
897
870
let mut gossip_block_queue = FifoQueue :: new ( queue_lengths. gossip_block_queue ) ;
@@ -1058,13 +1031,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
1058
1031
Some ( item)
1059
1032
} else if let Some ( item) = rpc_custody_column_queue. pop ( ) {
1060
1033
Some ( item)
1061
- // TODO(das): decide proper prioritization for sampling columns
1062
1034
} else if let Some ( item) = rpc_custody_column_queue. pop ( ) {
1063
1035
Some ( item)
1064
- } else if let Some ( item) = rpc_verify_data_column_queue. pop ( ) {
1065
- Some ( item)
1066
- } else if let Some ( item) = sampling_result_queue. pop ( ) {
1067
- Some ( item)
1068
1036
// Check delayed blocks before gossip blocks, the gossip blocks might rely
1069
1037
// on the delayed ones.
1070
1038
} else if let Some ( item) = delayed_block_queue. pop ( ) {
@@ -1224,9 +1192,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
1224
1192
Some ( item)
1225
1193
} else if let Some ( item) = dcbrange_queue. pop ( ) {
1226
1194
Some ( item)
1227
- // Prioritize sampling requests after block syncing requests
1228
- } else if let Some ( item) = unknown_block_sampling_request_queue. pop ( ) {
1229
- Some ( item)
1230
1195
// Check slashings after all other consensus messages so we prioritize
1231
1196
// following head.
1232
1197
//
@@ -1379,10 +1344,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
1379
1344
Work :: RpcCustodyColumn { .. } => {
1380
1345
rpc_custody_column_queue. push ( work, work_id)
1381
1346
}
1382
- Work :: RpcVerifyDataColumn ( _) => {
1383
- rpc_verify_data_column_queue. push ( work, work_id)
1384
- }
1385
- Work :: SamplingResult ( _) => sampling_result_queue. push ( work, work_id) ,
1386
1347
Work :: ColumnReconstruction ( _) => {
1387
1348
column_reconstruction_queue. push ( work, work_id)
1388
1349
}
@@ -1425,9 +1386,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
1425
1386
Work :: UnknownLightClientOptimisticUpdate { .. } => {
1426
1387
unknown_light_client_update_queue. push ( work, work_id)
1427
1388
}
1428
- Work :: UnknownBlockSamplingRequest { .. } => {
1429
- unknown_block_sampling_request_queue. push ( work, work_id)
1430
- }
1431
1389
Work :: ApiRequestP0 { .. } => api_request_p0_queue. push ( work, work_id) ,
1432
1390
Work :: ApiRequestP1 { .. } => api_request_p1_queue. push ( work, work_id) ,
1433
1391
} ;
@@ -1451,9 +1409,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
1451
1409
WorkType :: UnknownLightClientOptimisticUpdate => {
1452
1410
unknown_light_client_update_queue. len ( )
1453
1411
}
1454
- WorkType :: UnknownBlockSamplingRequest => {
1455
- unknown_block_sampling_request_queue. len ( )
1456
- }
1457
1412
WorkType :: GossipAggregateBatch => 0 , // No queue
1458
1413
WorkType :: GossipBlock => gossip_block_queue. len ( ) ,
1459
1414
WorkType :: GossipBlobSidecar => gossip_blob_queue. len ( ) ,
@@ -1473,8 +1428,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
1473
1428
WorkType :: RpcBlock => rpc_block_queue. len ( ) ,
1474
1429
WorkType :: RpcBlobs | WorkType :: IgnoredRpcBlock => rpc_blob_queue. len ( ) ,
1475
1430
WorkType :: RpcCustodyColumn => rpc_custody_column_queue. len ( ) ,
1476
- WorkType :: RpcVerifyDataColumn => rpc_verify_data_column_queue. len ( ) ,
1477
- WorkType :: SamplingResult => sampling_result_queue. len ( ) ,
1478
1431
WorkType :: ColumnReconstruction => column_reconstruction_queue. len ( ) ,
1479
1432
WorkType :: ChainSegment => chain_segment_queue. len ( ) ,
1480
1433
WorkType :: ChainSegmentBackfill => backfill_chain_segment. len ( ) ,
@@ -1600,8 +1553,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
1600
1553
} ) ,
1601
1554
Work :: UnknownBlockAttestation { process_fn }
1602
1555
| Work :: UnknownBlockAggregate { process_fn }
1603
- | Work :: UnknownLightClientOptimisticUpdate { process_fn, .. }
1604
- | Work :: UnknownBlockSamplingRequest { process_fn } => {
1556
+ | Work :: UnknownLightClientOptimisticUpdate { process_fn, .. } => {
1605
1557
task_spawner. spawn_blocking ( process_fn)
1606
1558
}
1607
1559
Work :: DelayedImportBlock {
@@ -1612,8 +1564,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
1612
1564
Work :: RpcBlock { process_fn }
1613
1565
| Work :: RpcBlobs { process_fn }
1614
1566
| Work :: RpcCustodyColumn ( process_fn)
1615
- | Work :: RpcVerifyDataColumn ( process_fn)
1616
- | Work :: SamplingResult ( process_fn)
1617
1567
| Work :: ColumnReconstruction ( process_fn) => task_spawner. spawn_async ( process_fn) ,
1618
1568
Work :: IgnoredRpcBlock { process_fn } => task_spawner. spawn_blocking ( process_fn) ,
1619
1569
Work :: GossipBlock ( work)
0 commit comments