@@ -301,6 +301,10 @@ async fn leader_upload(version: DapVersion) {
301301 report_metadata : ReportMetadata {
302302 id : ReportId ( [ 1 ; 16 ] ) ,
303303 time : t. now ,
304+ public_extensions : match version {
305+ DapVersion :: Draft09 => None ,
306+ DapVersion :: Latest => Some ( Vec :: new ( ) ) ,
307+ } ,
304308 } ,
305309 public_share : b"public share" . to_vec ( ) ,
306310 encrypted_input_shares : [
@@ -533,6 +537,150 @@ async fn leader_upload_taskprov_wrong_version(version: DapVersion) {
533537
534538async_test_versions ! ( leader_upload_taskprov_wrong_version) ;
535539
540+ #[ tokio:: test]
541+ async fn leader_upload_taskprov_public ( ) {
542+ let version = DapVersion :: Latest ;
543+ let t = TestRunner :: default_with_version ( version) . await ;
544+ let client = t. http_client ( ) ;
545+ let hpke_config_list = t. get_hpke_configs ( version, client) . await . unwrap ( ) ;
546+
547+ let ( task_config, task_id, taskprov_advertisement) = DapTaskParameters {
548+ version,
549+ min_batch_size : 10 ,
550+ query : DapBatchMode :: TimeInterval ,
551+ leader_url : t. task_config . leader_url . clone ( ) ,
552+ helper_url : t. task_config . helper_url . clone ( ) ,
553+ ..Default :: default ( )
554+ }
555+ . to_config_with_taskprov (
556+ b"cool task" . to_vec ( ) ,
557+ t. now ,
558+ daphne:: roles:: aggregator:: TaskprovConfig {
559+ hpke_collector_config : & t. taskprov_collector_hpke_receiver . config ,
560+ vdaf_verify_key_init : & t. taskprov_vdaf_verify_key_init ,
561+ } ,
562+ )
563+ . unwrap ( ) ;
564+
565+ let mut report = task_config
566+ . vdaf
567+ . produce_report (
568+ & hpke_config_list,
569+ t. now + 1 ,
570+ & task_id,
571+ DapMeasurement :: U32Vec ( vec ! [ 1 ; 10 ] ) ,
572+ version,
573+ )
574+ . unwrap ( ) ;
575+ report. report_metadata . public_extensions = Some ( vec ! [ Extension :: Taskprov ] ) ;
576+ t. leader_request_expect_ok (
577+ client,
578+ & format ! ( "tasks/{}/reports" , task_id. to_base64url( ) ) ,
579+ & http:: Method :: POST ,
580+ DapMediaType :: Report ,
581+ Some (
582+ & taskprov_advertisement
583+ . serialize_to_header_value ( version)
584+ . unwrap ( ) ,
585+ ) ,
586+ report. get_encoded_with_param ( & version) . unwrap ( ) ,
587+ )
588+ . await
589+ . unwrap ( ) ;
590+ }
591+
592+ #[ tokio:: test]
593+ async fn leader_upload_taksprov_public_errors ( ) {
594+ let version = DapVersion :: Latest ;
595+ let t = TestRunner :: default_with_version ( version) . await ;
596+ let client = t. http_client ( ) ;
597+ let hpke_config_list = t. get_hpke_configs ( version, client) . await . unwrap ( ) ;
598+
599+ let ( task_config, task_id, taskprov_advertisement) = DapTaskParameters {
600+ version,
601+ min_batch_size : 10 ,
602+ query : DapBatchMode :: TimeInterval ,
603+ leader_url : t. task_config . leader_url . clone ( ) ,
604+ helper_url : t. task_config . helper_url . clone ( ) ,
605+ ..Default :: default ( )
606+ }
607+ . to_config_with_taskprov (
608+ b"cool task" . to_vec ( ) ,
609+ t. now ,
610+ daphne:: roles:: aggregator:: TaskprovConfig {
611+ hpke_collector_config : & t. taskprov_collector_hpke_receiver . config ,
612+ vdaf_verify_key_init : & t. taskprov_vdaf_verify_key_init ,
613+ } ,
614+ )
615+ . unwrap ( ) ;
616+
617+ // Repeated public extension
618+ let mut report = task_config
619+ . vdaf
620+ . produce_report (
621+ & hpke_config_list,
622+ t. now + 1 ,
623+ & task_id,
624+ DapMeasurement :: U32Vec ( vec ! [ 1 ; 10 ] ) ,
625+ version,
626+ )
627+ . unwrap ( ) ;
628+ report. report_metadata . public_extensions = Some ( vec ! [ Extension :: Taskprov , Extension :: Taskprov ] ) ;
629+ t. leader_request_expect_abort (
630+ client,
631+ None ,
632+ & format ! ( "tasks/{}/reports" , task_id. to_base64url( ) ) ,
633+ & http:: Method :: POST ,
634+ DapMediaType :: Report ,
635+ Some (
636+ & taskprov_advertisement
637+ . serialize_to_header_value ( version)
638+ . unwrap ( ) ,
639+ ) ,
640+ report. get_encoded_with_param ( & version) . unwrap ( ) ,
641+ 400 ,
642+ "invalidMessage" ,
643+ )
644+ . await
645+ . unwrap ( ) ;
646+
647+ // Unsupported public extension
648+ let mut report = task_config
649+ . vdaf
650+ . produce_report (
651+ & hpke_config_list,
652+ t. now + 1 ,
653+ & task_id,
654+ DapMeasurement :: U32Vec ( vec ! [ 1 ; 10 ] ) ,
655+ version,
656+ )
657+ . unwrap ( ) ;
658+ report. report_metadata . public_extensions = Some ( vec ! [
659+ Extension :: Taskprov ,
660+ Extension :: NotImplemented {
661+ typ: 3 ,
662+ payload: b"ignore" . to_vec( ) ,
663+ } ,
664+ ] ) ;
665+ t. leader_request_expect_abort (
666+ client,
667+ None ,
668+ & format ! ( "tasks/{}/reports" , task_id. to_base64url( ) ) ,
669+ & http:: Method :: POST ,
670+ DapMediaType :: Report ,
671+ Some (
672+ & taskprov_advertisement
673+ . serialize_to_header_value ( version)
674+ . unwrap ( ) ,
675+ ) ,
676+ report. get_encoded_with_param ( & version) . unwrap ( ) ,
677+ 400 ,
678+ "unsupportedExtension" ,
679+ )
680+ . await
681+ . unwrap ( ) ;
682+ }
683+
536684async fn internal_leader_process ( version : DapVersion ) {
537685 let t = TestRunner :: default_with_version ( version) . await ;
538686 let path = t. upload_path ( ) ;
@@ -1348,6 +1496,118 @@ async fn leader_selected() {
13481496 . unwrap ( ) ;
13491497}
13501498
1499+ #[ tokio:: test]
1500+ async fn leader_collect_taskprov_repeated_abort ( ) {
1501+ let version = DapVersion :: Latest ;
1502+ const DAP_TASKPROV_COLLECTOR_TOKEN : & str = "I-am-the-collector" ;
1503+ let t = TestRunner :: default_with_version ( version) . await ;
1504+ let batch_interval = t. batch_interval ( ) ;
1505+
1506+ let client = t. http_client ( ) ;
1507+ let hpke_config_list = t. get_hpke_configs ( version, client) . await . unwrap ( ) ;
1508+
1509+ let ( task_config, task_id, taskprov_advertisement) = DapTaskParameters {
1510+ version,
1511+ min_batch_size : 10 ,
1512+ query : DapBatchMode :: TimeInterval ,
1513+ leader_url : t. task_config . leader_url . clone ( ) ,
1514+ helper_url : t. task_config . helper_url . clone ( ) ,
1515+ ..Default :: default ( )
1516+ }
1517+ . to_config_with_taskprov (
1518+ b"cool task" . to_vec ( ) ,
1519+ t. now ,
1520+ daphne:: roles:: aggregator:: TaskprovConfig {
1521+ hpke_collector_config : & t. taskprov_collector_hpke_receiver . config ,
1522+ vdaf_verify_key_init : & t. taskprov_vdaf_verify_key_init ,
1523+ } ,
1524+ )
1525+ . unwrap ( ) ;
1526+
1527+ let path = TestRunner :: upload_path_for_task ( & task_id) ;
1528+ let method = match version {
1529+ DapVersion :: Draft09 => & Method :: PUT ,
1530+ DapVersion :: Latest => & Method :: POST ,
1531+ } ;
1532+
1533+ // The reports are uploaded in the background.
1534+ for _ in 0 ..t. task_config . min_batch_size {
1535+ let extensions = vec ! [ Extension :: Taskprov ] ;
1536+ t. leader_request_expect_ok (
1537+ client,
1538+ & path,
1539+ method,
1540+ DapMediaType :: Report ,
1541+ Some (
1542+ & taskprov_advertisement
1543+ . serialize_to_header_value ( version)
1544+ . unwrap ( ) ,
1545+ ) ,
1546+ {
1547+ let mut report = task_config
1548+ . vdaf
1549+ . produce_report_with_extensions (
1550+ & hpke_config_list,
1551+ t. now + 1 ,
1552+ & task_id,
1553+ DapMeasurement :: U32Vec ( vec ! [ 1 ; 10 ] ) ,
1554+ extensions,
1555+ version,
1556+ )
1557+ . unwrap ( ) ;
1558+ report. report_metadata . public_extensions = Some ( vec ! [ Extension :: Taskprov ] ) ;
1559+ report. get_encoded_with_param ( & version) . unwrap ( )
1560+ } ,
1561+ )
1562+ . await
1563+ . unwrap ( ) ;
1564+ }
1565+
1566+ let agg_param = DapAggregationParam :: Empty ;
1567+
1568+ // Get the collect URI.
1569+ let collect_req = CollectionReq {
1570+ query : Query :: TimeInterval { batch_interval } ,
1571+ agg_param : agg_param. get_encoded ( ) . unwrap ( ) ,
1572+ } ;
1573+ let collect_uri = t
1574+ . leader_post_collect_using_token (
1575+ client,
1576+ DAP_TASKPROV_COLLECTOR_TOKEN ,
1577+ Some ( & taskprov_advertisement) ,
1578+ Some ( & task_id) ,
1579+ collect_req. get_encoded_with_param ( & t. version ) . unwrap ( ) ,
1580+ )
1581+ . await
1582+ . unwrap ( ) ;
1583+ println ! ( "collect_uri: {collect_uri}" ) ;
1584+
1585+ // Poll the collect URI before the CollectResp is ready.
1586+ let resp = t
1587+ . poll_collection_url_using_token ( client, & collect_uri, DAP_TASKPROV_COLLECTOR_TOKEN )
1588+ . await
1589+ . unwrap ( ) ;
1590+ #[ expect( clippy:: format_in_format_args) ]
1591+ {
1592+ assert_eq ! (
1593+ resp. status( ) ,
1594+ 400 ,
1595+ "response: {} {}" ,
1596+ format!( "{resp:?}" ) ,
1597+ resp. text( ) . await . unwrap( )
1598+ ) ;
1599+ }
1600+
1601+ // The reports are aggregated in the background.
1602+ let agg_telem = t. internal_process ( client) . await . unwrap ( ) ;
1603+ assert_eq ! (
1604+ agg_telem. reports_processed, task_config. min_batch_size,
1605+ "reports processed"
1606+ ) ;
1607+ assert_eq ! ( agg_telem. reports_aggregated, 0 , "reports aggregated" ) ;
1608+ assert_eq ! ( agg_telem. reports_collected, 0 , "reports collected" ) ;
1609+ }
1610+
13511611async fn leader_collect_taskprov_ok ( version : DapVersion ) {
13521612 const DAP_TASKPROV_COLLECTOR_TOKEN : & str = "I-am-the-collector" ;
13531613 let t = TestRunner :: default_with_version ( version) . await ;
0 commit comments