9
9
//! sync as failed, log an error and attempt to retry once a new peer joins the node.
10
10
11
11
use crate :: network_beacon_processor:: ChainSegmentProcessId ;
12
+ use crate :: sync:: block_sidecar_coupling:: CouplingError ;
12
13
use crate :: sync:: manager:: BatchProcessResult ;
13
14
use crate :: sync:: network_context:: {
14
15
RangeRequestId , RpcRequestSendError , RpcResponseError , SyncNetworkContext ,
@@ -28,7 +29,7 @@ use std::collections::{
28
29
} ;
29
30
use std:: sync:: Arc ;
30
31
use tracing:: { debug, error, info, warn} ;
31
- use types:: { Epoch , EthSpec } ;
32
+ use types:: { ColumnIndex , Epoch , EthSpec } ;
32
33
33
34
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
34
35
/// blocks per batch are requested _at most_. A batch may request less blocks to account for
@@ -209,9 +210,11 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
209
210
. network_globals
210
211
. peers
211
212
. read ( )
212
- . synced_peers ( )
213
+ . synced_peers_for_epoch ( self . to_be_downloaded , None )
213
214
. next ( )
214
215
. is_some ( )
216
+ // backfill can't progress if we do not have peers in the required subnets post peerdas.
217
+ && self . good_peers_on_sampling_subnets ( self . to_be_downloaded , network)
215
218
{
216
219
// If there are peers to resume with, begin the resume.
217
220
debug ! ( start_epoch = ?self . current_start, awaiting_batches = self . batches. len( ) , processing_target = ?self . processing_target, "Resuming backfill sync" ) ;
@@ -305,6 +308,46 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
305
308
err : RpcResponseError ,
306
309
) -> Result < ( ) , BackFillError > {
307
310
if let Some ( batch) = self . batches . get_mut ( & batch_id) {
311
+ if let RpcResponseError :: BlockComponentCouplingError ( coupling_error) = & err {
312
+ match coupling_error {
313
+ CouplingError :: DataColumnPeerFailure {
314
+ error,
315
+ faulty_peers,
316
+ action,
317
+ exceeded_retries,
318
+ } => {
319
+ debug ! ( ?batch_id, error, "Block components coupling error" ) ;
320
+ // Note: we don't fail the batch here because a `CouplingError` is
321
+ // recoverable by requesting from other honest peers.
322
+ let mut failed_columns = HashSet :: new ( ) ;
323
+ let mut failed_peers = HashSet :: new ( ) ;
324
+ for ( column, peer) in faulty_peers {
325
+ failed_columns. insert ( * column) ;
326
+ failed_peers. insert ( * peer) ;
327
+ }
328
+ for peer in failed_peers. iter ( ) {
329
+ network. report_peer ( * peer, * action, "failed to return columns" ) ;
330
+ }
331
+
332
+ // Only retry if peer failure **and** retries have been exceeded
333
+ if !* exceeded_retries {
334
+ return self . retry_partial_batch (
335
+ network,
336
+ batch_id,
337
+ request_id,
338
+ failed_columns,
339
+ failed_peers,
340
+ ) ;
341
+ }
342
+ }
343
+ CouplingError :: BlobPeerFailure ( msg) => {
344
+ tracing:: debug!( ?batch_id, msg, "Blob peer failure" ) ;
345
+ }
346
+ CouplingError :: InternalError ( msg) => {
347
+ error ! ( ?batch_id, msg, "Block components coupling internal error" ) ;
348
+ }
349
+ }
350
+ }
308
351
// A batch could be retried without the peer failing the request (disconnecting/
309
352
// sending an error /timeout) if the peer is removed from the chain for other
310
353
// reasons. Check that this block belongs to the expected peer
@@ -834,12 +877,16 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
834
877
network : & mut SyncNetworkContext < T > ,
835
878
batch_id : BatchId ,
836
879
) -> Result < ( ) , BackFillError > {
880
+ if matches ! ( self . state( ) , BackFillState :: Paused ) {
881
+ return Err ( BackFillError :: Paused ) ;
882
+ }
837
883
if let Some ( batch) = self . batches . get_mut ( & batch_id) {
884
+ debug ! ( ?batch_id, "Sending backfill batch" ) ;
838
885
let synced_peers = self
839
886
. network_globals
840
887
. peers
841
888
. read ( )
842
- . synced_peers ( )
889
+ . synced_peers_for_epoch ( batch_id , None )
843
890
. cloned ( )
844
891
. collect :: < HashSet < _ > > ( ) ;
845
892
@@ -898,6 +945,53 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
898
945
Ok ( ( ) )
899
946
}
900
947
948
+ /// Retries partial column requests within the batch by creating new requests for the failed columns.
949
+ pub fn retry_partial_batch (
950
+ & mut self ,
951
+ network : & mut SyncNetworkContext < T > ,
952
+ batch_id : BatchId ,
953
+ id : Id ,
954
+ failed_columns : HashSet < ColumnIndex > ,
955
+ mut failed_peers : HashSet < PeerId > ,
956
+ ) -> Result < ( ) , BackFillError > {
957
+ if let Some ( batch) = self . batches . get_mut ( & batch_id) {
958
+ failed_peers. extend ( & batch. failed_peers ( ) ) ;
959
+ let req = batch. to_blocks_by_range_request ( ) . 0 ;
960
+
961
+ let synced_peers = network
962
+ . network_globals ( )
963
+ . peers
964
+ . read ( )
965
+ . synced_peers_for_epoch ( batch_id, None )
966
+ . cloned ( )
967
+ . collect :: < HashSet < _ > > ( ) ;
968
+
969
+ match network. retry_columns_by_range (
970
+ id,
971
+ & synced_peers,
972
+ & failed_peers,
973
+ req,
974
+ & failed_columns,
975
+ ) {
976
+ Ok ( _) => {
977
+ debug ! (
978
+ ?batch_id,
979
+ id, "Retried column requests from different peers"
980
+ ) ;
981
+ return Ok ( ( ) ) ;
982
+ }
983
+ Err ( e) => {
984
+ debug ! ( ?batch_id, id, e, "Failed to retry partial batch" ) ;
985
+ }
986
+ }
987
+ } else {
988
+ return Err ( BackFillError :: InvalidSyncState (
989
+ "Batch should exist to be retried" . to_string ( ) ,
990
+ ) ) ;
991
+ }
992
+ Ok ( ( ) )
993
+ }
994
+
901
995
/// When resuming a chain, this function searches for batches that need to be re-downloaded and
902
996
/// transitions their state to redownload the batch.
903
997
fn resume_batches ( & mut self , network : & mut SyncNetworkContext < T > ) -> Result < ( ) , BackFillError > {
@@ -973,6 +1067,11 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
973
1067
return None ;
974
1068
}
975
1069
1070
+ if !self . good_peers_on_sampling_subnets ( self . to_be_downloaded , network) {
1071
+ debug ! ( "Waiting for peers to be available on custody column subnets" ) ;
1072
+ return None ;
1073
+ }
1074
+
976
1075
let batch_id = self . to_be_downloaded ;
977
1076
// this batch could have been included already being an optimistic batch
978
1077
match self . batches . entry ( batch_id) {
@@ -1005,6 +1104,36 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
1005
1104
}
1006
1105
}
1007
1106
1107
+ /// Checks all sampling column subnets for peers. Returns `true` if there is at least one peer in
1108
+ /// every sampling column subnet.
1109
+ ///
1110
+ /// Returns `true` if peerdas isn't enabled for the epoch.
1111
+ fn good_peers_on_sampling_subnets (
1112
+ & self ,
1113
+ epoch : Epoch ,
1114
+ network : & SyncNetworkContext < T > ,
1115
+ ) -> bool {
1116
+ if network. chain . spec . is_peer_das_enabled_for_epoch ( epoch) {
1117
+ // Require peers on all sampling column subnets before sending batches
1118
+ let peers_on_all_custody_subnets = network
1119
+ . network_globals ( )
1120
+ . sampling_subnets ( )
1121
+ . iter ( )
1122
+ . all ( |subnet_id| {
1123
+ let peer_count = network
1124
+ . network_globals ( )
1125
+ . peers
1126
+ . read ( )
1127
+ . good_range_sync_custody_subnet_peers ( * subnet_id)
1128
+ . count ( ) ;
1129
+ peer_count > 0
1130
+ } ) ;
1131
+ peers_on_all_custody_subnets
1132
+ } else {
1133
+ true
1134
+ }
1135
+ }
1136
+
1008
1137
/// Resets the start epoch based on the beacon chain.
1009
1138
///
1010
1139
/// This errors if the beacon chain indicates that backfill sync has already completed or is
0 commit comments