@@ -36,6 +36,8 @@ const LATEST_CERTIFIED_HEIGHT: &str = "state_manager_latest_certified_height";
3636const LAST_MANIFEST_HEIGHT : & str = "state_manager_last_computed_manifest_height" ;
3737const REPLICATED_STATE_PURGE_HEIGHT_DISK : & str = "replicated_state_purge_height_disk" ;
3838
39+ const METRIC_PROCESS_BATCH_PHASE_DURATION : & str = "mr_process_batch_phase_duration_seconds" ;
40+
3941const GIB : u64 = 1 << 30 ;
4042
4143pub async fn rejoin_test (
@@ -263,29 +265,27 @@ async fn deploy_busy_canister(agent: &Agent, effective_canister_id: PrincipalId,
263265 . expect ( "Failed to set up a busy canister." ) ;
264266}
265267
266- pub async fn rejoin_test_long_rounds (
267- env : TestEnv ,
268+ async fn deploy_canisters_for_long_rounds (
269+ logger : & slog:: Logger ,
270+ nodes : Vec < IcNodeSnapshot > ,
268271 num_canisters : usize ,
269- dkg_interval : u64 ,
270- rejoin_node : IcNodeSnapshot ,
271- agent_node : IcNodeSnapshot ,
272272) {
273- let logger = env . logger ( ) ;
274- let agent = agent_node . build_default_agent_async ( ) . await ;
273+ let init_node = nodes [ 0 ] . clone ( ) ;
274+ let agent = init_node . build_default_agent_async ( ) . await ;
275275 let ic00 = ManagementCanister :: create ( & agent) ;
276276
277277 let num_seed_canisters = 4 ;
278278 info ! (
279279 logger,
280280 "Deploying {} seed canisters on a node {} ..." ,
281281 num_seed_canisters,
282- agent_node . get_public_url( )
282+ init_node . get_public_url( )
283283 ) ;
284284 let mut create_seed_canisters_futs = vec ! [ ] ;
285285 for _ in 0 ..num_seed_canisters {
286286 create_seed_canisters_futs. push ( deploy_seed_canister (
287287 & ic00,
288- agent_node . effective_canister_id ( ) ,
288+ init_node . effective_canister_id ( ) ,
289289 ) ) ;
290290 }
291291 let seed_canisters = join_all ( create_seed_canisters_futs) . await ;
@@ -312,24 +312,62 @@ pub async fn rejoin_test_long_rounds(
312312 }
313313
314314 // We deploy 8 "busy" canisters: this way,
315- // there are 2 canisters per each of the 4 scheduler cores
315+ // there are 2 canisters per each of the 4 scheduler threads
316316 // and thus every thread executes 2 x 1.8B = 3.6B instructions.
317317 let num_busy_canisters = 8 ;
318318 info ! (
319319 logger,
320320 "Deploying {} busy canisters on a node {} ..." ,
321321 num_busy_canisters,
322- agent_node . get_public_url( )
322+ init_node . get_public_url( )
323323 ) ;
324324 let mut create_busy_canisters_futs = vec ! [ ] ;
325325 for _ in 0 ..num_busy_canisters {
326326 create_busy_canisters_futs. push ( deploy_busy_canister (
327327 & agent,
328- agent_node . effective_canister_id ( ) ,
329- & logger,
328+ init_node . effective_canister_id ( ) ,
329+ logger,
330330 ) ) ;
331331 }
332332 join_all ( create_busy_canisters_futs) . await ;
333+ }
334+
335+ pub async fn rejoin_test_long_rounds (
336+ env : TestEnv ,
337+ nodes : Vec < IcNodeSnapshot > ,
338+ num_canisters : usize ,
339+ dkg_interval : u64 ,
340+ ) {
341+ let logger = env. logger ( ) ;
342+ deploy_canisters_for_long_rounds ( & logger, nodes. clone ( ) , num_canisters) . await ;
343+
344+ // Sort nodes by their average duration to process a batch.
345+ let mut average_process_batch_durations = vec ! [ ] ;
346+ for node in & nodes {
347+ let duration = average_process_batch_duration ( & logger, node. clone ( ) ) . await ;
348+ average_process_batch_durations. push ( duration) ;
349+ }
350+ let mut paired: Vec < _ > = average_process_batch_durations
351+ . into_iter ( )
352+ . zip ( nodes. into_iter ( ) )
353+ . collect ( ) ;
354+ paired. sort_by ( |( k1, _) , ( k2, _) | k1. total_cmp ( k2) ) ;
355+ let sorted_nodes: Vec < _ > = paired. into_iter ( ) . map ( |( _, v) | v) . collect ( ) ;
356+
357+ // The fastest node will be the reference node used to check
358+ // the latest certified height of the subnet.
359+ let reference_node = sorted_nodes[ 0 ] . clone ( ) ;
360+
361+ // The restarted node will be the slowest node
362+ // required for consensus in terms of batch processing time:
363+ // this way, the restarted node cannot catch up with the subnet
364+ // without additional measures (to be implemented in the future).
365+ // E.g., for `n = 13`, we have `f = 4` and the nodes at indices
366+ // `0`, `1`, ..., `n - (f + 1)` are required for consensus,
367+ // i.e., we restart the node at (0-based) index
368+ // `n - (f + 1) = n - (n / 3 + 1)`.
369+ let n = sorted_nodes. len ( ) ;
370+ let rejoin_node = sorted_nodes[ n - ( n / 3 + 1 ) ] . clone ( ) ;
333371
334372 info ! (
335373 logger,
@@ -345,15 +383,15 @@ pub async fn rejoin_test_long_rounds(
345383 // This way, the restarted node starts from that CUP
346384 // and we can assert it to catch up until the next CUP.
347385 info ! ( logger, "Waiting for a CUP ..." ) ;
348- let agent_node_status = agent_node
386+ let reference_node_status = reference_node
349387 . status_async ( )
350388 . await
351- . expect ( "Failed to get status of agent_node " ) ;
352- let latest_certified_height = agent_node_status
389+ . expect ( "Failed to get status of reference_node " ) ;
390+ let latest_certified_height = reference_node_status
353391 . certified_height
354- . expect ( "Failed to get certified height of agent_node " )
392+ . expect ( "Failed to get certified height of reference_node " )
355393 . get ( ) ;
356- wait_for_cup ( & logger, latest_certified_height, agent_node . clone ( ) ) . await ;
394+ wait_for_cup ( & logger, latest_certified_height, reference_node . clone ( ) ) . await ;
357395
358396 info ! ( logger, "Start the killed node again ..." ) ;
359397 rejoin_node. vm ( ) . start ( ) ;
@@ -362,7 +400,7 @@ pub async fn rejoin_test_long_rounds(
362400 let last_cup_height = wait_for_cup (
363401 & logger,
364402 latest_certified_height + dkg_interval + 1 ,
365- agent_node . clone ( ) ,
403+ reference_node . clone ( ) ,
366404 )
367405 . await ;
368406
@@ -449,6 +487,37 @@ pub async fn assert_state_sync_has_happened(
449487 panic ! ( "Couldn't verify that a state sync has happened after {NUM_RETRIES} attempts." ) ;
450488}
451489
490+ async fn average_process_batch_duration ( log : & slog:: Logger , node : IcNodeSnapshot ) -> f64 {
491+ let label_sum = format ! ( "{METRIC_PROCESS_BATCH_PHASE_DURATION}_sum" ) ;
492+ let label_count = format ! ( "{METRIC_PROCESS_BATCH_PHASE_DURATION}_count" ) ;
493+ let metrics = fetch_metrics :: < f64 > ( log, node. clone ( ) , vec ! [ & label_sum, & label_count] ) . await ;
494+ let sums: Vec < _ > = metrics
495+ . iter ( )
496+ . filter_map ( |( k, v) | {
497+ if k. starts_with ( & label_sum) {
498+ Some ( v)
499+ } else {
500+ None
501+ }
502+ } )
503+ . collect ( ) ;
504+ let counts: Vec < _ > = metrics
505+ . iter ( )
506+ . filter_map ( |( k, v) | {
507+ if k. starts_with ( & label_count) {
508+ Some ( v)
509+ } else {
510+ None
511+ }
512+ } )
513+ . collect ( ) ;
514+ assert_eq ! ( sums. len( ) , counts. len( ) ) ;
515+ sums. iter ( )
516+ . zip ( counts. iter ( ) )
517+ . map ( |( x, y) | x[ 0 ] / y[ 0 ] )
518+ . sum ( )
519+ }
520+
452521pub async fn fetch_metrics < T > (
453522 log : & slog:: Logger ,
454523 node : IcNodeSnapshot ,
@@ -468,7 +537,10 @@ where
468537 let metrics_result = metrics. fetch :: < T > ( ) . await ;
469538 match metrics_result {
470539 Ok ( result) => {
471- if labels. iter ( ) . all ( |& label| result. contains_key ( label) ) {
540+ if labels
541+ . iter ( )
542+ . all ( |& label| result. iter ( ) . any ( |( k, _) | k. starts_with ( label) ) )
543+ {
472544 info ! ( log, "Metrics successfully scraped {:?}." , result) ;
473545 return result;
474546 } else {
0 commit comments