Skip to content

Commit 5979140

Browse files
committed
Reject reports earlier than task_config.not_before
1 parent afbafd0 commit 5979140

File tree

12 files changed

+98
-10
lines changed

12 files changed

+98
-10
lines changed

crates/dapf/src/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -905,6 +905,10 @@ async fn handle_test_routes(action: TestAction, http_client: HttpClient) -> anyh
905905
|| 604_800u64,
906906
"task should expire in",
907907
)?,
908+
task_commencement: SystemTime::now()
909+
.duration_since(SystemTime::UNIX_EPOCH)
910+
.unwrap()
911+
.as_secs(),
908912
};
909913

910914
print_json(&internal_task);

crates/daphne-server/src/roles/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@ mod test_utils {
111111
fatal_error,
112112
hpke::{HpkeConfig, HpkeReceiverConfig},
113113
messages::decode_base64url_vec,
114-
roles::DapAggregator,
115114
vdaf::{Prio3Config, VdafConfig},
116115
DapBatchMode, DapError, DapTaskConfig, DapVersion,
117116
};
@@ -304,7 +303,7 @@ mod test_utils {
304303
leader_url: cmd.leader,
305304
helper_url: cmd.helper,
306305
time_precision: cmd.time_precision,
307-
not_before: self.get_current_time(),
306+
not_before: cmd.task_commencement,
308307
not_after: cmd.task_expiration,
309308
min_batch_size: cmd.min_batch_size,
310309
query,

crates/daphne-server/src/storage_proxy_connection/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ pub struct RequestBuilder<'d, B: DurableMethod, P: AsRef<[u8]>> {
5757
request: DurableRequest<P>,
5858
}
5959

60-
impl<'d, B: DurableMethod + Debug, P: AsRef<[u8]>> RequestBuilder<'d, B, P> {
60+
impl<B: DurableMethod + Debug, P: AsRef<[u8]>> RequestBuilder<'_, B, P> {
6161
#[tracing::instrument(skip_all, fields(path = ?self.path))]
6262
pub async fn send<R>(self) -> Result<R, Error>
6363
where

crates/daphne-server/tests/e2e/e2e.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1490,6 +1490,12 @@ async fn leader_collect_taskprov_ok(version: DapVersion) {
14901490
)
14911491
.unwrap();
14921492

1493+
println!("Now - not_before: {}", t.now - task_config.not_before);
1494+
println!(
1495+
"Now - batch_interval.start: {}",
1496+
t.now - batch_interval.start
1497+
);
1498+
println!("t.now: {}", t.now);
14931499
let path = TestRunner::upload_path_for_task(&task_id);
14941500
let method = match version {
14951501
DapVersion::Draft09 => &Method::PUT,
@@ -1500,7 +1506,9 @@ async fn leader_collect_taskprov_ok(version: DapVersion) {
15001506
let mut rng = thread_rng();
15011507
for _ in 0..t.task_config.min_batch_size {
15021508
let extensions = vec![Extension::Taskprov];
1509+
println!("Report interval: {:?}", TestRunner::report_interval(&batch_interval));
15031510
let now = rng.gen_range(TestRunner::report_interval(&batch_interval));
1511+
println!("\tnow: {}", now);
15041512
t.leader_request_expect_ok(
15051513
client,
15061514
&path,

crates/daphne-server/tests/e2e/test_runner.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ impl TestRunner {
113113
version,
114114
leader_url: leader_url.clone(),
115115
helper_url: helper_url.clone(),
116-
not_before: now,
116+
not_before: now - (now % TIME_PRECISION) - 1,
117117
not_after: now + 604_800, // one week from now
118118
time_precision: TIME_PRECISION,
119119
min_batch_size: MIN_BATCH_SIZE,
@@ -236,6 +236,7 @@ impl TestRunner {
236236
"time_precision": t.task_config.time_precision,
237237
"collector_hpke_config": collector_hpke_config_base64url.clone(),
238238
"task_expiration": t.task_config.not_after,
239+
"task_commencement": t.task_config.not_before,
239240
});
240241
let add_task_path = format!("{}/internal/test/add_task", version.as_ref());
241242
let res: InternalTestCommandResult = t
@@ -263,6 +264,7 @@ impl TestRunner {
263264
"time_precision": t.task_config.time_precision,
264265
"collector_hpke_config": collector_hpke_config_base64url.clone(),
265266
"task_expiration": t.task_config.not_after,
267+
"task_commencement": t.task_config.not_before,
266268
});
267269
let res: InternalTestCommandResult = t
268270
.helper_post_internal(&add_task_path, &helper_add_task_cmd)

crates/daphne-service-utils/src/test_route_types.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,5 +113,6 @@ pub struct InternalTestAddTask {
113113
pub max_batch_size: Option<NonZeroU32>,
114114
pub time_precision: Duration,
115115
pub collector_hpke_config: String, // base64url
116+
pub task_commencement: Time,
116117
pub task_expiration: Time,
117118
}

crates/daphne/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ impl DapTaskParameters {
585585
)
586586
.unwrap()
587587
.into_opted_in(&taskprov::OptInParam {
588-
not_before: now,
588+
not_before: now - (now % self.time_precision) - 1,
589589
num_agg_span_shards: self.num_agg_span_shards,
590590
});
591591

crates/daphne/src/protocol/mod.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ mod test {
274274
test_versions! { produce_agg_job_req_skip_hpke_decrypt_err }
275275

276276
fn produce_agg_job_req_skip_time_too_stale(version: DapVersion) {
277-
let t = AggregationJobTest::new(TEST_VDAF, HpkeKemId::X25519HkdfSha256, version);
277+
let mut t = AggregationJobTest::new(TEST_VDAF, HpkeKemId::X25519HkdfSha256, version);
278278
let reports = vec![t
279279
.task_config
280280
.vdaf
@@ -287,6 +287,7 @@ mod test {
287287
)
288288
.unwrap()];
289289

290+
t.task_config.not_before = t.valid_report_time_range().start - 2;
290291
let (agg_job_state, _agg_job_init_req) =
291292
t.produce_agg_job_req(&DapAggregationParam::Empty, reports);
292293
assert_eq!(agg_job_state.report_count(), 0);
@@ -298,6 +299,30 @@ mod test {
298299

299300
test_versions! { produce_agg_job_req_skip_time_too_stale }
300301

302+
fn produce_agg_job_req_skip_time_before_not_before(version: DapVersion) {
303+
let t = AggregationJobTest::new(TEST_VDAF, HpkeKemId::X25519HkdfSha256, version);
304+
let reports = vec![t
305+
.task_config
306+
.vdaf
307+
.produce_report(
308+
&t.client_hpke_config_list,
309+
t.task_config.not_before - 1,
310+
&t.task_id,
311+
DapMeasurement::U32Vec(vec![1; 10]),
312+
t.task_config.version,
313+
)
314+
.unwrap()];
315+
let (agg_job_state, _agg_job_init_req) =
316+
t.produce_agg_job_req(&DapAggregationParam::Empty, reports);
317+
assert_eq!(agg_job_state.report_count(), 0);
318+
319+
assert_metrics_include!(t.leader_registry, {
320+
r#"report_counter{env="test_leader",host="leader.com",status="rejected_task_not_started"}"#: 1,
321+
});
322+
}
323+
324+
test_versions! {produce_agg_job_req_skip_time_before_not_before}
325+
301326
fn produce_agg_job_req_skip_time_too_early(version: DapVersion) {
302327
let t = AggregationJobTest::new(TEST_VDAF, HpkeKemId::X25519HkdfSha256, version);
303328
let reports = vec![t
@@ -403,6 +428,7 @@ mod test {
403428
// Temporarily overwrite the valid report time range so that the Leader accepts the
404429
// out-of-range report and produces the request.
405430
let tmp = t.valid_report_range.clone();
431+
t.task_config.not_before = t.valid_report_time_range().start - 2;
406432
t.valid_report_range = 0..u64::MAX;
407433
let (_, agg_job_init_req) =
408434
t.produce_agg_job_req(&DapAggregationParam::Empty, reports.clone());
@@ -420,6 +446,40 @@ mod test {
420446

421447
test_versions! { handle_agg_job_req_skip_time_too_stale }
422448

449+
fn handle_agg_job_req_skip_time_before_not_before(version: DapVersion) {
450+
let mut t = AggregationJobTest::new(TEST_VDAF, HpkeKemId::X25519HkdfSha256, version);
451+
let reports = vec![t
452+
.task_config
453+
.vdaf
454+
.produce_report(
455+
&t.client_hpke_config_list,
456+
t.task_config.not_before - 1,
457+
&t.task_id,
458+
DapMeasurement::U32Vec(vec![1; 10]),
459+
t.task_config.version,
460+
)
461+
.unwrap()];
462+
463+
let agg_job_init_req = {
464+
// Temporarily overwrite the task_config not_before time so that the Leader accepts the
465+
// out-of-range report and produces the request.
466+
t.task_config.not_before -= 2;
467+
let (_, agg_job_init_req) =
468+
t.produce_agg_job_req(&DapAggregationParam::Empty, reports.clone());
469+
t.task_config.not_before += 2;
470+
agg_job_init_req
471+
};
472+
let (_agg_span, agg_job_resp) = t.handle_agg_job_req(agg_job_init_req);
473+
474+
assert_eq!(agg_job_resp.transitions.len(), 1);
475+
assert_matches!(
476+
agg_job_resp.transitions[0].var,
477+
TransitionVar::Failed(ReportError::TaskNotStarted)
478+
);
479+
}
480+
481+
test_versions! {handle_agg_job_req_skip_time_before_not_before}
482+
423483
fn handle_agg_job_req_skip_time_too_early(version: DapVersion) {
424484
let mut t = AggregationJobTest::new(TEST_VDAF, HpkeKemId::X25519HkdfSha256, version);
425485
let reports = vec![t

crates/daphne/src/protocol/report_init.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ impl InitializedReport<WithPeerPrepShare> {
100100
impl<'s> From<&'s DapTaskConfig> for PartialDapTaskConfigForReportInit<'s> {
101101
fn from(config: &'s DapTaskConfig) -> Self {
102102
PartialDapTaskConfigForReportInit {
103+
not_before: config.not_before,
103104
not_after: config.not_after,
104105
method_is_taskprov: config.method_is_taskprov(),
105106
version: config.version,
@@ -112,6 +113,7 @@ impl<'s> From<&'s DapTaskConfig> for PartialDapTaskConfigForReportInit<'s> {
112113
impl<'s> From<&'s PartialDapTaskConfigForReportInit<'_>> for PartialDapTaskConfigForReportInit<'s> {
113114
fn from(config: &'s PartialDapTaskConfigForReportInit<'_>) -> Self {
114115
Self {
116+
not_before: config.not_before,
115117
not_after: config.not_after,
116118
method_is_taskprov: config.method_is_taskprov,
117119
version: config.version,
@@ -122,6 +124,7 @@ impl<'s> From<&'s PartialDapTaskConfigForReportInit<'_>> for PartialDapTaskConfi
122124
}
123125

124126
pub struct PartialDapTaskConfigForReportInit<'s> {
127+
pub not_before: messages::Time,
125128
pub not_after: messages::Time,
126129
pub method_is_taskprov: bool,
127130
pub version: DapVersion,
@@ -145,19 +148,24 @@ impl<P> InitializedReport<P> {
145148
let task_config = task_config.into();
146149
macro_rules! reject {
147150
($failure:ident) => {
151+
{tracing::warn!("Rejected - {}\nTimestamp - {}", ReportError::$failure, report_share.report_metadata.time);
148152
return Ok(InitializedReport::Rejected {
149153
metadata: report_share.report_metadata,
150154
report_err: ReportError::$failure,
151-
})
155+
})}
152156
};
153157
}
158+
159+
tracing::info!("valid_report_range: {}..{}", valid_report_range.start, valid_report_range.end);
154160
match report_share.report_metadata.time {
155161
t if t >= task_config.not_after => reject!(TaskExpired),
162+
t if t < task_config.not_before => {tracing::warn!("Reject TaskNotStarted"); reject!(TaskNotStarted)},
156163
t if t < valid_report_range.start => reject!(ReportDropped),
157164
t if valid_report_range.end < t => reject!(ReportTooEarly),
158165
_ => {}
159166
}
160167

168+
tracing::warn!("All tests pass");
161169
match (
162170
&report_share.report_metadata.public_extensions,
163171
task_config.version,

crates/daphne/src/roles/leader/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,6 @@ pub async fn handle_upload_req<A: DapLeader>(
216216
.into());
217217
}
218218

219-
// Check that the report was generated after the task's `not_before` time.
220219
if report.report_metadata.time
221220
< task_config.as_ref().not_before - task_config.as_ref().time_precision
222221
{
@@ -226,6 +225,13 @@ pub async fn handle_upload_req<A: DapLeader>(
226225
.into());
227226
}
228227

228+
// Check that the report was generated after the task's `not_before` time.
229+
println!(
230+
"report_metadata.time - task_config.not_before: {}",
231+
report.report_metadata.time as i128 - task_config.as_ref().not_before as i128
232+
);
233+
println!("report_metadata.time: {}", report.report_metadata.time);
234+
229235
if let Some(public_extensions) = &report.report_metadata.public_extensions {
230236
// We can be sure at this point that the ReportMetadata is well formed
231237
// because the decoding / error checking happens in the extractor.

0 commit comments

Comments
 (0)