Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/dapf/src/acceptance/load_testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ pub async fn execute_single_combination_from_env(
&measurment,
VERSION,
system_now.0,
Some(vec![]),
vec![messages::Extension::Taskprov],
t.replay_reports,
)
Expand Down
4 changes: 4 additions & 0 deletions crates/dapf/src/acceptance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,10 @@ impl Test {
measurement.as_ref(),
version,
now.0,
match version {
DapVersion::Draft09 => None,
DapVersion::Latest => Some(vec![]),
},
vec![messages::Extension::Taskprov],
self.replay_reports,
)
Expand Down
114 changes: 114 additions & 0 deletions crates/daphne-server/tests/e2e/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ async fn leader_upload(version: DapVersion) {
report_metadata: ReportMetadata {
id: ReportId([1; 16]),
time: t.now,
public_extensions: match version {
DapVersion::Draft09 => None,
DapVersion::Latest => Some(Vec::new()),
},
},
public_share: b"public share".to_vec(),
encrypted_input_shares: [
Expand Down Expand Up @@ -424,6 +428,10 @@ async fn leader_upload_taskprov() {
t.now,
&task_id,
DapMeasurement::U32Vec(vec![1; 10]),
match version {
DapVersion::Draft09 => None,
DapVersion::Latest => Some(vec![]),
},
vec![Extension::Taskprov],
version,
)
Expand Down Expand Up @@ -451,6 +459,10 @@ async fn leader_upload_taskprov() {
t.now,
&task_id,
DapMeasurement::U32Vec(vec![1; 10]),
match version {
DapVersion::Draft09 => None,
DapVersion::Latest => Some(vec![]),
},
vec![Extension::Taskprov],
version,
)
Expand Down Expand Up @@ -516,6 +528,10 @@ async fn leader_upload_taskprov_wrong_version(version: DapVersion) {
t.now,
&task_id,
DapMeasurement::U32Vec(vec![1; 10]),
match version {
DapVersion::Draft09 => None,
DapVersion::Latest => Some(vec![]),
},
vec![Extension::Taskprov],
version,
)
Expand All @@ -541,6 +557,100 @@ async fn leader_upload_taskprov_wrong_version(version: DapVersion) {

async_test_versions!(leader_upload_taskprov_wrong_version);

#[tokio::test]
async fn leader_upload_taksprov_public_errors() {
let version = DapVersion::Latest;
let t = TestRunner::default_with_version(version).await;
let client = t.http_client();
let hpke_config_list = t.get_hpke_configs(version, client).await.unwrap();

let (task_config, task_id, taskprov_advertisement) = DapTaskParameters {
version,
min_batch_size: 10,
query: DapBatchMode::TimeInterval,
leader_url: t.task_config.leader_url.clone(),
helper_url: t.task_config.helper_url.clone(),
..Default::default()
}
.to_config_with_taskprov(
b"cool task".to_vec(),
t.now,
daphne::roles::aggregator::TaskprovConfig {
hpke_collector_config: &t.taskprov_collector_hpke_receiver.config,
vdaf_verify_key_init: &t.taskprov_vdaf_verify_key_init,
},
)
.unwrap();

// Repeated public extension
let report = task_config
.vdaf
.produce_report_with_extensions(
&hpke_config_list,
t.now + 1,
&task_id,
DapMeasurement::U32Vec(vec![1; 10]),
Some(vec![Extension::Taskprov, Extension::Taskprov]),
vec![],
version,
)
.unwrap();
t.leader_request_expect_abort(
client,
None,
&format!("tasks/{}/reports", task_id.to_base64url()),
&http::Method::POST,
DapMediaType::Report,
Some(
&taskprov_advertisement
.serialize_to_header_value(version)
.unwrap(),
),
report.get_encoded_with_param(&version).unwrap(),
400,
"invalidMessage",
)
.await
.unwrap();

// Unsupported public extension
let report = task_config
.vdaf
.produce_report_with_extensions(
&hpke_config_list,
t.now + 1,
&task_id,
DapMeasurement::U32Vec(vec![1; 10]),
Some(vec![
Extension::Taskprov,
Extension::NotImplemented {
typ: 3,
payload: b"ignore".to_vec(),
},
]),
vec![],
version,
)
.unwrap();
t.leader_request_expect_abort(
client,
None,
&format!("tasks/{}/reports", task_id.to_base64url()),
&http::Method::POST,
DapMediaType::Report,
Some(
&taskprov_advertisement
.serialize_to_header_value(version)
.unwrap(),
),
report.get_encoded_with_param(&version).unwrap(),
400,
"unsupportedExtension",
)
.await
.unwrap();
}

async fn internal_leader_process(version: DapVersion) {
let t = TestRunner::default_with_version(version).await;
let path = t.upload_path();
Expand Down Expand Up @@ -1408,6 +1518,10 @@ async fn leader_collect_taskprov_ok(version: DapVersion) {
now,
&task_id,
DapMeasurement::U32Vec(vec![1; 10]),
match version {
DapVersion::Draft09 => None,
DapVersion::Latest => Some(vec![]),
},
extensions,
version,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,24 @@ struct PartialDapTaskConfig @0xdcc9bf18fc62d406 {
vdafVerifyKey @4 :VdafVerifyKey;
}

struct PublicExtensionsList @0x8b3c98c0ddd0043e {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mendess I just ran capnp id and pasted the output here. So this is really just a random ID that needs to be unique?


union {
# Each extension is encoded according to the DAP spec in
# tag-length-value form.
list @0 :List(Data);

# draft09 compatibility: Previously DAP had no extensions in the
# report.
none @1 :Void;
}
}

struct ReportMetadata @0xefba178ad4584bc4 {

id @0 :Base.ReportId;
time @1 :Base.Time;
id @0 :Base.ReportId;
time @1 :Base.Time;
publicExtensions @2 :PublicExtensionsList;
}

struct PrepareInit @0x8192568cb3d03f59 {
Expand Down
88 changes: 82 additions & 6 deletions crates/daphne-service-utils/src/compute_offload/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ use crate::{
hpke_receiver_config::{self, hpke_config},
initialize_reports,
initialized_reports::{self, initialized_report},
partial_dap_task_config, prepare_init, report_metadata, time_range,
partial_dap_task_config, prepare_init, public_extensions_list, report_metadata, time_range,
},
};
use daphne::{
constants::DapAggregatorRole,
hpke::{HpkeConfig, HpkeReceiverConfig},
messages::{self, HpkeCiphertext, PrepareInit, ReportMetadata, ReportShare, TaskId},
messages::{self, Extension, HpkeCiphertext, PrepareInit, ReportMetadata, ReportShare, TaskId},
vdaf::{VdafConfig, VdafPrepShare, VdafPrepState},
InitializedReport, PartialDapTaskConfigForReportInit, WithPeerPrepShare,
};
use prio::codec::{Encode, ParameterizedDecode, ParameterizedEncode};
use prio::codec::{Decode, Encode, ParameterizedDecode, ParameterizedEncode};
use std::{borrow::Cow, ops::Range};

pub struct InitializeReports<'s> {
Expand Down Expand Up @@ -318,9 +318,27 @@ impl CapnprotoPayloadEncode for ReportMetadata {
type Builder<'a> = report_metadata::Builder<'a>;

fn encode_to_builder(&self, mut builder: Self::Builder<'_>) {
let Self { id, time } = self;
let Self {
id,
time,
public_extensions,
} = self;
id.encode_to_builder(builder.reborrow().init_id());
builder.set_time(*time);
if let Some(ref extensions) = public_extensions {
let mut e = builder
.init_public_extensions()
.init_list(usize_to_capnp_len(extensions.len()));
for (i, data) in extensions
.iter()
.enumerate()
.map(|(i, ext)| (usize_to_capnp_len(i), ext.get_encoded().unwrap()))
{
e.reborrow().set(i, &data);
}
} else {
builder.init_public_extensions().set_none(());
}
}
}

Expand All @@ -331,9 +349,25 @@ impl CapnprotoPayloadDecode for ReportMetadata {
where
Self: Sized,
{
let id = <_>::decode_from_reader(reader.get_id()?)?;
let time = reader.get_time();
let public_extensions = match reader.get_public_extensions()?.which()? {
public_extensions_list::List(list) => Some(
list?
.into_iter()
.map(|data| {
Extension::get_decoded(data?)
.map_err(|e| capnp::Error::failed(e.to_string()))
})
.collect::<Result<Vec<_>, capnp::Error>>()?,
),
public_extensions_list::None(()) => None,
};

Ok(Self {
id: <_>::decode_from_reader(reader.get_id()?)?,
time: reader.get_time(),
id,
time,
public_extensions,
})
}
}
Expand Down Expand Up @@ -486,3 +520,45 @@ fn to_capnp<E: ToString>(e: E) -> capnp::Error {
extra: e.to_string(),
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::capnproto::{CapnprotoPayloadDecodeExt, CapnprotoPayloadEncodeExt};

#[test]
fn report_metadata_roundtrip() {
let report_metadata = ReportMetadata {
id: messages::ReportId(rand::random()),
time: rand::random(),
public_extensions: Some(vec![
Extension::Taskprov,
Extension::NotImplemented {
typ: 23,
payload: b"some extension payload".to_vec(),
},
]),
};

assert_eq!(
report_metadata,
ReportMetadata::decode_from_bytes(&report_metadata.encode_to_bytes()).unwrap()
);
}

#[test]
fn report_metadata_roundtrip_draft09() {
let report_metadata = ReportMetadata {
id: messages::ReportId(rand::random()),
time: rand::random(),
// draft09 compatibility: Previously there was no extensions field in the report
// metadata.
public_extensions: None,
};

assert_eq!(
report_metadata,
ReportMetadata::decode_from_bytes(&report_metadata.encode_to_bytes()).unwrap()
);
}
}
20 changes: 19 additions & 1 deletion crates/daphne/src/error/aborts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ pub enum DapAbort {
/// Unrecognized DAP task. Sent in response to a request indicating an unrecognized task ID.
#[error("unrecognizedTask")]
UnrecognizedTask { task_id: TaskId },

/// Unsupported Extension. Sent in response to a report upload with an unsupported extension.
#[error("unsupportedExtension")]
UnsupportedExtension { detail: String, task_id: TaskId },
}

impl DapAbort {
Expand All @@ -116,7 +120,8 @@ impl DapAbort {
| Self::InvalidBatchSize { detail, task_id }
| Self::BatchModeMismatch { detail, task_id }
| Self::UnauthorizedRequest { detail, task_id }
| Self::InvalidMessage { detail, task_id } => (
| Self::InvalidMessage { detail, task_id }
| Self::UnsupportedExtension { detail, task_id } => (
Some(task_id),
Some(detail),
None,
Expand Down Expand Up @@ -259,6 +264,16 @@ impl DapAbort {
})
}

pub fn unsupported_extension(
task_id: &TaskId,
unknown_extensions: &[u16],
) -> Result<Self, DapError> {
Ok(Self::UnsupportedExtension {
detail: format!("{unknown_extensions:?}"),
task_id: *task_id,
})
}

fn title_and_type(&self) -> (&'static str, Option<String>) {
let (title, dap_abort_type) = match self {
Self::BatchInvalid { .. } => ("Batch boundary check failed", Some(self.to_string())),
Expand Down Expand Up @@ -300,6 +315,9 @@ impl DapAbort {
Some(self.to_string()),
),
Self::BadRequest(..) => ("Bad request", None),
Self::UnsupportedExtension { .. } => {
("Unsupported extensions in report", Some(self.to_string()))
}
};

(
Expand Down
8 changes: 8 additions & 0 deletions crates/daphne/src/hpke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,10 @@ mod test {
report_metadata: &ReportMetadata {
id: ReportId(rand::random()),
time: rand::random(),
public_extensions: match version {
DapVersion::Draft09 => None,
DapVersion::Latest => Some(Vec::new()),
},
},
};
let plaintext = b"plaintext";
Expand Down Expand Up @@ -703,6 +707,10 @@ mod test {
let report_metadata = &ReportMetadata {
id: ReportId(rand::random()),
time: rand::random(),
public_extensions: match version {
DapVersion::Draft09 => None,
DapVersion::Latest => Some(Vec::new()),
},
};
let public_share = &vec![rand::random(); (0..100).choose(&mut rand::thread_rng()).unwrap()];

Expand Down
Loading
Loading