@@ -24,7 +24,7 @@ use daphne::{
2424 hpke:: { HpkeConfig , HpkeKemId , HpkeReceiverConfig } ,
2525 messages:: {
2626 self , taskprov:: TaskprovAdvertisement , AggregateShareReq , AggregationJobId , Base64Encode ,
27- BatchId , BatchSelector , PartialBatchSelector , TaskId ,
27+ BatchId , BatchSelector , PartialBatchSelector , ReadyAggregationJobResp , TaskId ,
2828 } ,
2929 metrics:: DaphneMetrics ,
3030 testing:: report_generator:: ReportGenerator ,
@@ -511,7 +511,7 @@ impl Test {
511511 let _guard = load_control. wait ( ) . await ;
512512 info ! ( "Starting AggregationJobInitReq" ) ;
513513 let start = Instant :: now ( ) ;
514- let agg_job_resp = self
514+ let mut agg_job_resp = self
515515 . http_client
516516 . submit_aggregation_job_init_req (
517517 self . helper_url . join ( & format ! (
@@ -528,14 +528,42 @@ impl Test {
528528 )
529529 . await ?;
530530 let duration = start. elapsed ( ) ;
531- info ! ( "Finished AggregationJobInitReq in {duration:#?}" ) ;
531+ info ! ( "Finished submitting AggregationJobInitReq in {duration:#?}" ) ;
532+ let mut poll_count = 1 ;
533+ let ready = loop {
534+ agg_job_resp = match agg_job_resp {
535+ messages:: AggregationJobResp :: Ready { transitions } => {
536+ if poll_count != 1 {
537+ info ! (
538+ "Finished polling for AggregationJobResp after {:#?}" ,
539+ start. elapsed( )
540+ ) ;
541+ }
542+ break ReadyAggregationJobResp { transitions } ;
543+ }
544+ messages:: AggregationJobResp :: Processing => {
545+ if poll_count == 1 {
546+ info ! ( "Polling for AggregationJobResp" ) ;
547+ }
548+ tokio:: time:: sleep ( Duration :: from_millis ( poll_count * 200 ) ) . await ;
549+ poll_count += 1 ;
550+ self . http_client
551+ . poll_aggregation_job_init (
552+ self . helper_url
553+ . join ( & format ! ( "tasks/{task_id}/aggregation_jobs/{agg_job_id}" ) ) ?,
554+ task_config. version ,
555+ functions:: helper:: Options {
556+ taskprov_advertisement : taskprov_advertisement. as_ref ( ) ,
557+ bearer_token : self . bearer_token . as_ref ( ) ,
558+ } ,
559+ )
560+ . await ?
561+ }
562+ } ;
563+ } ;
532564
533- let agg_share_span = task_config. consume_agg_job_resp (
534- task_id,
535- agg_job_state,
536- agg_job_resp,
537- self . metrics ( ) ,
538- ) ?;
565+ let agg_share_span =
566+ task_config. consume_agg_job_resp ( task_id, agg_job_state, ready, self . metrics ( ) ) ?;
539567
540568 let aggregated_report_count = agg_share_span
541569 . iter ( )
0 commit comments