Skip to content

Commit a056cd3

Browse files
committed
Add support for public extensions in Reports.
1 parent 13833c5 commit a056cd3

File tree

9 files changed

+546
-19
lines changed

9 files changed

+546
-19
lines changed

crates/daphne-server/tests/e2e/e2e.rs

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,10 @@ async fn leader_upload(version: DapVersion) {
304304
report_metadata: ReportMetadata {
305305
id: ReportId([1; 16]),
306306
time: t.now,
307+
public_extensions: match version {
308+
DapVersion::Draft09 => None,
309+
DapVersion::Latest => Some(Vec::new()),
310+
},
307311
},
308312
public_share: b"public share".to_vec(),
309313
encrypted_input_shares: [
@@ -541,6 +545,150 @@ async fn leader_upload_taskprov_wrong_version(version: DapVersion) {
541545

542546
async_test_versions!(leader_upload_taskprov_wrong_version);
543547

548+
#[tokio::test]
549+
async fn leader_upload_taskprov_public() {
550+
let version = DapVersion::Latest;
551+
let t = TestRunner::default_with_version(version).await;
552+
let client = t.http_client();
553+
let hpke_config_list = t.get_hpke_configs(version, client).await.unwrap();
554+
555+
let (task_config, task_id, taskprov_advertisement) = DapTaskParameters {
556+
version,
557+
min_batch_size: 10,
558+
query: DapBatchMode::TimeInterval,
559+
leader_url: t.task_config.leader_url.clone(),
560+
helper_url: t.task_config.helper_url.clone(),
561+
..Default::default()
562+
}
563+
.to_config_with_taskprov(
564+
b"cool task".to_vec(),
565+
t.now,
566+
daphne::roles::aggregator::TaskprovConfig {
567+
hpke_collector_config: &t.taskprov_collector_hpke_receiver.config,
568+
vdaf_verify_key_init: &t.taskprov_vdaf_verify_key_init,
569+
},
570+
)
571+
.unwrap();
572+
573+
let mut report = task_config
574+
.vdaf
575+
.produce_report(
576+
&hpke_config_list,
577+
t.now + 1,
578+
&task_id,
579+
DapMeasurement::U32Vec(vec![1; 10]),
580+
version,
581+
)
582+
.unwrap();
583+
report.report_metadata.public_extensions = Some(vec![Extension::Taskprov]);
584+
t.leader_request_expect_ok(
585+
client,
586+
&format!("tasks/{}/reports", task_id.to_base64url()),
587+
&http::Method::POST,
588+
DapMediaType::Report,
589+
Some(
590+
&taskprov_advertisement
591+
.serialize_to_header_value(version)
592+
.unwrap(),
593+
),
594+
report.get_encoded_with_param(&version).unwrap(),
595+
)
596+
.await
597+
.unwrap();
598+
}
599+
600+
#[tokio::test]
601+
async fn leader_upload_taksprov_public_errors() {
602+
let version = DapVersion::Latest;
603+
let t = TestRunner::default_with_version(version).await;
604+
let client = t.http_client();
605+
let hpke_config_list = t.get_hpke_configs(version, client).await.unwrap();
606+
607+
let (task_config, task_id, taskprov_advertisement) = DapTaskParameters {
608+
version,
609+
min_batch_size: 10,
610+
query: DapBatchMode::TimeInterval,
611+
leader_url: t.task_config.leader_url.clone(),
612+
helper_url: t.task_config.helper_url.clone(),
613+
..Default::default()
614+
}
615+
.to_config_with_taskprov(
616+
b"cool task".to_vec(),
617+
t.now,
618+
daphne::roles::aggregator::TaskprovConfig {
619+
hpke_collector_config: &t.taskprov_collector_hpke_receiver.config,
620+
vdaf_verify_key_init: &t.taskprov_vdaf_verify_key_init,
621+
},
622+
)
623+
.unwrap();
624+
625+
// Repeated public extension
626+
let mut report = task_config
627+
.vdaf
628+
.produce_report(
629+
&hpke_config_list,
630+
t.now + 1,
631+
&task_id,
632+
DapMeasurement::U32Vec(vec![1; 10]),
633+
version,
634+
)
635+
.unwrap();
636+
report.report_metadata.public_extensions = Some(vec![Extension::Taskprov, Extension::Taskprov]);
637+
t.leader_request_expect_abort(
638+
client,
639+
None,
640+
&format!("tasks/{}/reports", task_id.to_base64url()),
641+
&http::Method::POST,
642+
DapMediaType::Report,
643+
Some(
644+
&taskprov_advertisement
645+
.serialize_to_header_value(version)
646+
.unwrap(),
647+
),
648+
report.get_encoded_with_param(&version).unwrap(),
649+
400,
650+
"invalidMessage",
651+
)
652+
.await
653+
.unwrap();
654+
655+
// Unsupported public extension
656+
let mut report = task_config
657+
.vdaf
658+
.produce_report(
659+
&hpke_config_list,
660+
t.now + 1,
661+
&task_id,
662+
DapMeasurement::U32Vec(vec![1; 10]),
663+
version,
664+
)
665+
.unwrap();
666+
report.report_metadata.public_extensions = Some(vec![
667+
Extension::Taskprov,
668+
Extension::NotImplemented {
669+
typ: 3,
670+
payload: b"ignore".to_vec(),
671+
},
672+
]);
673+
t.leader_request_expect_abort(
674+
client,
675+
None,
676+
&format!("tasks/{}/reports", task_id.to_base64url()),
677+
&http::Method::POST,
678+
DapMediaType::Report,
679+
Some(
680+
&taskprov_advertisement
681+
.serialize_to_header_value(version)
682+
.unwrap(),
683+
),
684+
report.get_encoded_with_param(&version).unwrap(),
685+
400,
686+
"unsupportedExtension",
687+
)
688+
.await
689+
.unwrap();
690+
}
691+
544692
async fn internal_leader_process(version: DapVersion) {
545693
let t = TestRunner::default_with_version(version).await;
546694
let path = t.upload_path();
@@ -1354,6 +1502,116 @@ async fn leader_selected() {
13541502
.unwrap();
13551503
}
13561504

1505+
#[tokio::test]
1506+
async fn leader_collect_taskprov_repeated_abort() {
1507+
let version = DapVersion::Latest;
1508+
const DAP_TASKPROV_COLLECTOR_TOKEN: &str = "I-am-the-collector";
1509+
let t = TestRunner::default_with_version(version).await;
1510+
let batch_interval = t.batch_interval();
1511+
1512+
let client = t.http_client();
1513+
let hpke_config_list = t.get_hpke_configs(version, client).await.unwrap();
1514+
1515+
let (task_config, task_id, taskprov_advertisement) = DapTaskParameters {
1516+
version,
1517+
min_batch_size: 10,
1518+
query: DapBatchMode::TimeInterval,
1519+
leader_url: t.task_config.leader_url.clone(),
1520+
helper_url: t.task_config.helper_url.clone(),
1521+
..Default::default()
1522+
}
1523+
.to_config_with_taskprov(
1524+
b"cool task".to_vec(),
1525+
t.now,
1526+
daphne::roles::aggregator::TaskprovConfig {
1527+
hpke_collector_config: &t.taskprov_collector_hpke_receiver.config,
1528+
vdaf_verify_key_init: &t.taskprov_vdaf_verify_key_init,
1529+
},
1530+
)
1531+
.unwrap();
1532+
1533+
let path = TestRunner::upload_path_for_task(&task_id);
1534+
let method = &Method::POST;
1535+
// The reports are uploaded in the background.
1536+
let mut rng = thread_rng();
1537+
for _ in 0..t.task_config.min_batch_size {
1538+
let extensions = vec![Extension::Taskprov];
1539+
let now = rng.gen_range(TestRunner::report_interval(&batch_interval));
1540+
t.leader_request_expect_ok(
1541+
client,
1542+
&path,
1543+
method,
1544+
DapMediaType::Report,
1545+
Some(
1546+
&taskprov_advertisement
1547+
.serialize_to_header_value(version)
1548+
.unwrap(),
1549+
),
1550+
{
1551+
let mut report = task_config
1552+
.vdaf
1553+
.produce_report_with_extensions(
1554+
&hpke_config_list,
1555+
now,
1556+
&task_id,
1557+
DapMeasurement::U32Vec(vec![1; 10]),
1558+
extensions,
1559+
version,
1560+
)
1561+
.unwrap();
1562+
report.report_metadata.public_extensions = Some(vec![Extension::Taskprov]);
1563+
report.get_encoded_with_param(&version).unwrap()
1564+
},
1565+
)
1566+
.await
1567+
.unwrap();
1568+
}
1569+
1570+
let agg_param = DapAggregationParam::Empty;
1571+
1572+
// Get the collect URI.
1573+
let collect_req = CollectionReq {
1574+
query: Query::TimeInterval { batch_interval },
1575+
agg_param: agg_param.get_encoded().unwrap(),
1576+
};
1577+
let collect_uri = t
1578+
.leader_post_collect_using_token(
1579+
client,
1580+
DAP_TASKPROV_COLLECTOR_TOKEN,
1581+
Some(&taskprov_advertisement),
1582+
Some(&task_id),
1583+
collect_req.get_encoded_with_param(&t.version).unwrap(),
1584+
)
1585+
.await
1586+
.unwrap();
1587+
println!("collect_uri: {collect_uri}");
1588+
1589+
// Poll the collect URI before the CollectResp is ready.
1590+
let resp = t
1591+
.poll_collection_url_using_token(client, &collect_uri, DAP_TASKPROV_COLLECTOR_TOKEN)
1592+
.await
1593+
.unwrap();
1594+
#[expect(clippy::format_in_format_args)]
1595+
{
1596+
assert_eq!(
1597+
resp.status(),
1598+
202,
1599+
"response: {} {}",
1600+
format!("{resp:?}"),
1601+
resp.text().await.unwrap()
1602+
);
1603+
}
1604+
1605+
// The reports are aggregated in the background.
1606+
let agg_telem = t.internal_process(client).await.unwrap();
1607+
assert_eq!(
1608+
agg_telem.reports_processed, task_config.min_batch_size,
1609+
"reports processed"
1610+
);
1611+
assert_eq!(agg_telem.reports_aggregated, 0, "reports aggregated");
1612+
assert_eq!(agg_telem.reports_collected, 0, "reports collected");
1613+
}
1614+
13571615
async fn leader_collect_taskprov_ok(version: DapVersion) {
13581616
const DAP_TASKPROV_COLLECTOR_TOKEN: &str = "I-am-the-collector";
13591617
let t = TestRunner::default_with_version(version).await;

crates/daphne/src/error/aborts.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ pub enum DapAbort {
9595
/// Unrecognized DAP task. Sent in response to a request indicating an unrecognized task ID.
9696
#[error("unrecognizedTask")]
9797
UnrecognizedTask { task_id: TaskId },
98+
99+
/// Unsupported Extension. Sent in response to a report upload with an unsupported extension.
100+
#[error("unsupportedExtension")]
101+
UnsupportedExtension { detail: String, task_id: TaskId },
98102
}
99103

100104
impl DapAbort {
@@ -116,7 +120,8 @@ impl DapAbort {
116120
| Self::InvalidBatchSize { detail, task_id }
117121
| Self::BatchModeMismatch { detail, task_id }
118122
| Self::UnauthorizedRequest { detail, task_id }
119-
| Self::InvalidMessage { detail, task_id } => (
123+
| Self::InvalidMessage { detail, task_id }
124+
| Self::UnsupportedExtension { detail, task_id } => (
120125
Some(task_id),
121126
Some(detail),
122127
None,
@@ -259,6 +264,20 @@ impl DapAbort {
259264
})
260265
}
261266

267+
pub fn unsupported_extension(
268+
task_id: &TaskId,
269+
unknown_extensions: &[u16],
270+
) -> Result<Self, DapError> {
271+
let detail = serde_json::to_string(&unknown_extensions);
272+
match detail {
273+
Ok(s) => Ok(Self::UnsupportedExtension {
274+
detail: s,
275+
task_id: *task_id,
276+
}),
277+
Err(x) => Err(fatal_error!(err = %x,)),
278+
}
279+
}
280+
262281
fn title_and_type(&self) -> (&'static str, Option<String>) {
263282
let (title, dap_abort_type) = match self {
264283
Self::BatchInvalid { .. } => ("Batch boundary check failed", Some(self.to_string())),
@@ -300,6 +319,9 @@ impl DapAbort {
300319
Some(self.to_string()),
301320
),
302321
Self::BadRequest(..) => ("Bad request", None),
322+
Self::UnsupportedExtension { .. } => {
323+
("Unsupported extensions in report", Some(self.to_string()))
324+
}
303325
};
304326

305327
(

crates/daphne/src/hpke.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,10 @@ mod test {
612612
report_metadata: &ReportMetadata {
613613
id: ReportId(rand::random()),
614614
time: rand::random(),
615+
public_extensions: match version {
616+
DapVersion::Draft09 => None,
617+
DapVersion::Latest => Some(Vec::new()),
618+
},
615619
},
616620
};
617621
let plaintext = b"plaintext";
@@ -703,6 +707,10 @@ mod test {
703707
let report_metadata = &ReportMetadata {
704708
id: ReportId(rand::random()),
705709
time: rand::random(),
710+
public_extensions: match version {
711+
DapVersion::Draft09 => None,
712+
DapVersion::Latest => Some(Vec::new()),
713+
},
706714
};
707715
let public_share = &vec![rand::random(); (0..100).choose(&mut rand::thread_rng()).unwrap()];
708716

0 commit comments

Comments
 (0)