Skip to content

Commit 7e5cd9e

Browse files
committed
Address comments #2
1 parent 8f8e5b9 commit 7e5cd9e

File tree

5 files changed

+61
-190
lines changed

5 files changed

+61
-190
lines changed

crates/daphne-server/tests/e2e/e2e.rs

Lines changed: 14 additions & 174 deletions
Original file line numberDiff line numberDiff line change
@@ -557,58 +557,6 @@ async fn leader_upload_taskprov_wrong_version(version: DapVersion) {
557557

558558
async_test_versions!(leader_upload_taskprov_wrong_version);
559559

560-
#[tokio::test]
561-
async fn leader_upload_taskprov_public() {
562-
let version = DapVersion::Latest;
563-
let t = TestRunner::default_with_version(version).await;
564-
let client = t.http_client();
565-
let hpke_config_list = t.get_hpke_configs(version, client).await.unwrap();
566-
567-
let (task_config, task_id, taskprov_advertisement) = DapTaskParameters {
568-
version,
569-
min_batch_size: 10,
570-
query: DapBatchMode::TimeInterval,
571-
leader_url: t.task_config.leader_url.clone(),
572-
helper_url: t.task_config.helper_url.clone(),
573-
..Default::default()
574-
}
575-
.to_config_with_taskprov(
576-
b"cool task".to_vec(),
577-
t.now,
578-
daphne::roles::aggregator::TaskprovConfig {
579-
hpke_collector_config: &t.taskprov_collector_hpke_receiver.config,
580-
vdaf_verify_key_init: &t.taskprov_vdaf_verify_key_init,
581-
},
582-
)
583-
.unwrap();
584-
585-
let mut report = task_config
586-
.vdaf
587-
.produce_report(
588-
&hpke_config_list,
589-
t.now + 1,
590-
&task_id,
591-
DapMeasurement::U32Vec(vec![1; 10]),
592-
version,
593-
)
594-
.unwrap();
595-
report.report_metadata.public_extensions = Some(vec![Extension::Taskprov]);
596-
t.leader_request_expect_ok(
597-
client,
598-
&format!("tasks/{}/reports", task_id.to_base64url()),
599-
&http::Method::POST,
600-
DapMediaType::Report,
601-
Some(
602-
&taskprov_advertisement
603-
.serialize_to_header_value(version)
604-
.unwrap(),
605-
),
606-
report.get_encoded_with_param(&version).unwrap(),
607-
)
608-
.await
609-
.unwrap();
610-
}
611-
612560
#[tokio::test]
613561
async fn leader_upload_taksprov_public_errors() {
614562
let version = DapVersion::Latest;
@@ -635,17 +583,18 @@ async fn leader_upload_taksprov_public_errors() {
635583
.unwrap();
636584

637585
// Repeated public extension
638-
let mut report = task_config
586+
let report = task_config
639587
.vdaf
640-
.produce_report(
588+
.produce_report_with_extensions(
641589
&hpke_config_list,
642590
t.now + 1,
643591
&task_id,
644592
DapMeasurement::U32Vec(vec![1; 10]),
593+
Some(vec![Extension::Taskprov, Extension::Taskprov]),
594+
vec![],
645595
version,
646596
)
647597
.unwrap();
648-
report.report_metadata.public_extensions = Some(vec![Extension::Taskprov, Extension::Taskprov]);
649598
t.leader_request_expect_abort(
650599
client,
651600
None,
@@ -665,23 +614,24 @@ async fn leader_upload_taksprov_public_errors() {
665614
.unwrap();
666615

667616
// Unsupported public extension
668-
let mut report = task_config
617+
let report = task_config
669618
.vdaf
670-
.produce_report(
619+
.produce_report_with_extensions(
671620
&hpke_config_list,
672621
t.now + 1,
673622
&task_id,
674623
DapMeasurement::U32Vec(vec![1; 10]),
624+
Some(vec![
625+
Extension::Taskprov,
626+
Extension::NotImplemented {
627+
typ: 3,
628+
payload: b"ignore".to_vec(),
629+
},
630+
]),
631+
vec![],
675632
version,
676633
)
677634
.unwrap();
678-
report.report_metadata.public_extensions = Some(vec![
679-
Extension::Taskprov,
680-
Extension::NotImplemented {
681-
typ: 3,
682-
payload: b"ignore".to_vec(),
683-
},
684-
]);
685635
t.leader_request_expect_abort(
686636
client,
687637
None,
@@ -1514,116 +1464,6 @@ async fn leader_selected() {
15141464
.unwrap();
15151465
}
15161466

1517-
#[tokio::test]
1518-
async fn leader_collect_taskprov_repeated_abort() {
1519-
let version = DapVersion::Latest;
1520-
const DAP_TASKPROV_COLLECTOR_TOKEN: &str = "I-am-the-collector";
1521-
let t = TestRunner::default_with_version(version).await;
1522-
let batch_interval = t.batch_interval();
1523-
1524-
let client = t.http_client();
1525-
let hpke_config_list = t.get_hpke_configs(version, client).await.unwrap();
1526-
1527-
let (task_config, task_id, taskprov_advertisement) = DapTaskParameters {
1528-
version,
1529-
min_batch_size: 10,
1530-
query: DapBatchMode::TimeInterval,
1531-
leader_url: t.task_config.leader_url.clone(),
1532-
helper_url: t.task_config.helper_url.clone(),
1533-
..Default::default()
1534-
}
1535-
.to_config_with_taskprov(
1536-
b"cool task".to_vec(),
1537-
t.now,
1538-
daphne::roles::aggregator::TaskprovConfig {
1539-
hpke_collector_config: &t.taskprov_collector_hpke_receiver.config,
1540-
vdaf_verify_key_init: &t.taskprov_vdaf_verify_key_init,
1541-
},
1542-
)
1543-
.unwrap();
1544-
1545-
let path = TestRunner::upload_path_for_task(&task_id);
1546-
let method = &Method::POST;
1547-
// The reports are uploaded in the background.
1548-
let mut rng = thread_rng();
1549-
for _ in 0..t.task_config.min_batch_size {
1550-
let extensions = vec![Extension::Taskprov];
1551-
let now = rng.gen_range(TestRunner::report_interval(&batch_interval));
1552-
t.leader_request_expect_ok(
1553-
client,
1554-
&path,
1555-
method,
1556-
DapMediaType::Report,
1557-
Some(
1558-
&taskprov_advertisement
1559-
.serialize_to_header_value(version)
1560-
.unwrap(),
1561-
),
1562-
{
1563-
let report = task_config
1564-
.vdaf
1565-
.produce_report_with_extensions(
1566-
&hpke_config_list,
1567-
now,
1568-
&task_id,
1569-
DapMeasurement::U32Vec(vec![1; 10]),
1570-
Some(vec![Extension::Taskprov]),
1571-
extensions,
1572-
version,
1573-
)
1574-
.unwrap();
1575-
report.get_encoded_with_param(&version).unwrap()
1576-
},
1577-
)
1578-
.await
1579-
.unwrap();
1580-
}
1581-
1582-
let agg_param = DapAggregationParam::Empty;
1583-
1584-
// Get the collect URI.
1585-
let collect_req = CollectionReq {
1586-
query: Query::TimeInterval { batch_interval },
1587-
agg_param: agg_param.get_encoded().unwrap(),
1588-
};
1589-
let collect_uri = t
1590-
.leader_post_collect_using_token(
1591-
client,
1592-
DAP_TASKPROV_COLLECTOR_TOKEN,
1593-
Some(&taskprov_advertisement),
1594-
Some(&task_id),
1595-
collect_req.get_encoded_with_param(&t.version).unwrap(),
1596-
)
1597-
.await
1598-
.unwrap();
1599-
println!("collect_uri: {collect_uri}");
1600-
1601-
// Poll the collect URI before the CollectResp is ready.
1602-
let resp = t
1603-
.poll_collection_url_using_token(client, &collect_uri, DAP_TASKPROV_COLLECTOR_TOKEN)
1604-
.await
1605-
.unwrap();
1606-
#[expect(clippy::format_in_format_args)]
1607-
{
1608-
assert_eq!(
1609-
resp.status(),
1610-
202,
1611-
"response: {} {}",
1612-
format!("{resp:?}"),
1613-
resp.text().await.unwrap()
1614-
);
1615-
}
1616-
1617-
// The reports are aggregated in the background.
1618-
let agg_telem = t.internal_process(client).await.unwrap();
1619-
assert_eq!(
1620-
agg_telem.reports_processed, task_config.min_batch_size,
1621-
"reports processed"
1622-
);
1623-
assert_eq!(agg_telem.reports_aggregated, 0, "reports aggregated");
1624-
assert_eq!(agg_telem.reports_collected, 0, "reports collected");
1625-
}
1626-
16271467
async fn leader_collect_taskprov_ok(version: DapVersion) {
16281468
const DAP_TASKPROV_COLLECTOR_TOKEN: &str = "I-am-the-collector";
16291469
let t = TestRunner::default_with_version(version).await;

crates/daphne/src/messages/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,11 @@ impl ParameterizedEncode<DapVersion> for ReportMetadata {
256256
(DapVersion::Latest, Some(extensions)) => {
257257
encode_u16_items(bytes, version, extensions.as_slice())?;
258258
}
259-
_ => return Err(CodecError::UnexpectedValue),
259+
_ => {
260+
return Err(CodecError::Other(
261+
"encountered incorrectly set public extensions".into(),
262+
))
263+
}
260264
}
261265

262266
Ok(())
@@ -1638,7 +1642,7 @@ mod test {
16381642
};
16391643
assert!(matches!(
16401644
bad_rm.get_encoded_with_param(&version).unwrap_err(),
1641-
CodecError::UnexpectedValue
1645+
CodecError::Other(..)
16421646
));
16431647
let bytes = good_rm.get_encoded_with_param(&version).unwrap();
16441648
assert_eq!(

crates/daphne/src/protocol/client.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
use crate::{
55
constants::DapAggregatorRole,
6+
fatal_error,
67
hpke::{info_and_aad, HpkeConfig},
78
messages::{Extension, PlaintextInputShare, Report, ReportId, ReportMetadata, TaskId, Time},
89
DapError, DapMeasurement, DapVersion, VdafConfig,
@@ -72,13 +73,12 @@ impl VdafConfig {
7273
private_extensions: Vec<Extension>,
7374
version: DapVersion,
7475
) -> Result<Report, DapError> {
75-
match (&public_extensions, version) {
76-
(Some(_), DapVersion::Draft09) | (None, DapVersion::Latest) => {
77-
return Err(DapError::ReportError(
78-
crate::messages::ReportError::InvalidMessage,
79-
))
80-
}
81-
_ => (),
76+
if let (Some(_), DapVersion::Draft09) | (None, DapVersion::Latest) =
77+
(&public_extensions, version)
78+
{
79+
return Err(fatal_error!(
80+
err = format!("public extensions not set correctly for {version:?}")
81+
));
8282
}
8383

8484
let mut plaintext_input_share = PlaintextInputShare {

crates/daphne/src/roles/leader/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,8 @@ pub async fn handle_upload_req<A: DapLeader>(
227227
}
228228

229229
if let Some(public_extensions) = &report.report_metadata.public_extensions {
230-
// We can be sure at this point that the ReportMetadata is well formed (as
231-
// the decoding / error checking happens in the extractor).
230+
// We can be sure at this point that the ReportMetadata is well formed
231+
// because the decoding / error checking happens in the extractor.
232232
assert_eq!(DapVersion::Latest, task_config.version);
233233
let mut unknown_extensions = Vec::<u16>::new();
234234
if protocol::check_no_duplicates(public_extensions.iter()).is_err() {

crates/daphne/src/roles/mod.rs

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -779,11 +779,36 @@ mod test {
779779
let t = Test::new(version);
780780
let task_id = &t.time_interval_task_id;
781781
let task_config = t.leader.unchecked_get_task_config(task_id).await;
782-
let mut report = t.gen_test_report(task_id).await;
783-
report.report_metadata.public_extensions = Some(vec![Extension::NotImplemented {
784-
typ: 0x01,
785-
payload: vec![0x01],
786-
}]);
782+
783+
// Construct HPKE config list.
784+
let hpke_config_list = [
785+
t.leader
786+
.get_hpke_config_for(task_config.version, Some(task_id))
787+
.await
788+
.unwrap()
789+
.clone(),
790+
t.helper
791+
.get_hpke_config_for(task_config.version, Some(task_id))
792+
.await
793+
.unwrap()
794+
.clone(),
795+
];
796+
797+
let report = task_config
798+
.vdaf
799+
.produce_report_with_extensions(
800+
&hpke_config_list,
801+
t.now,
802+
task_id,
803+
DapMeasurement::U32Vec(vec![1; 10]),
804+
Some(vec![Extension::NotImplemented {
805+
typ: 0x01,
806+
payload: vec![0x01],
807+
}]),
808+
vec![],
809+
task_config.version,
810+
)
811+
.unwrap();
787812

788813
let req = DapRequest {
789814
meta: DapRequestMeta {
@@ -812,6 +837,8 @@ mod test {
812837
let task_id = &t.time_interval_task_id;
813838
let task_config = t.leader.unchecked_get_task_config(task_id).await;
814839
let mut report = t.gen_test_report(task_id).await;
840+
// This change breaks the HPKE decryption, but triggers a failure
841+
// before the HPKE data is checked.
815842
report.report_metadata.public_extensions = Some(vec![]);
816843

817844
let req = DapRequest {

0 commit comments

Comments
 (0)