Skip to content

Commit 167397c

Browse files
committed
Begin debugging
1 parent 5979140 commit 167397c

File tree

9 files changed

+169
-7
lines changed

9 files changed

+169
-7
lines changed

crates/daphne-server/src/roles/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ mod test_utils {
294294
}
295295
};
296296

297+
tracing::warn!("Add task: {}..{}", cmd.task_commencement, cmd.task_expiration);
297298
if self
298299
.kv()
299300
.put_if_not_exists_with_expiration::<kv::prefix::TaskConfig>(

crates/daphne-server/src/router/test_routes.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use daphne::{
1414
constants::DapAggregatorRole,
1515
hpke::HpkeReceiverConfig,
1616
messages::{Base64Encode, TaskId},
17-
roles::{leader, DapLeader},
17+
roles::{leader, DapAggregator, DapLeader},
1818
DapVersion,
1919
};
2020
use daphne_service_utils::test_route_types::{InternalTestAddTask, InternalTestEndpointForTask};
@@ -129,6 +129,9 @@ async fn add_task(
129129
Path(version): Path<DapVersion>,
130130
Json(cmd): Json<InternalTestAddTask>,
131131
) -> impl IntoResponse {
132+
tracing::warn!("TaskID: {:?}", cmd.task_id);
133+
tracing::warn!("task conf range: {}..{}", cmd.task_commencement, cmd.task_expiration);
134+
tracing::warn!("Valid time range: {:?}", app.valid_report_time_range());
132135
match app.internal_add_task(version, cmd).await {
133136
Ok(()) => (
134137
StatusCode::OK,

crates/daphne-server/tests/e2e/e2e.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1469,9 +1469,6 @@ async fn leader_collect_taskprov_ok(version: DapVersion) {
14691469
let t = TestRunner::default_with_version(version).await;
14701470
let batch_interval = t.batch_interval();
14711471

1472-
let client = t.http_client();
1473-
let hpke_config_list = t.get_hpke_configs(version, client).await.unwrap();
1474-
14751472
let (task_config, task_id, taskprov_advertisement) = DapTaskParameters {
14761473
version,
14771474
min_batch_size: 10,
@@ -1490,6 +1487,13 @@ async fn leader_collect_taskprov_ok(version: DapVersion) {
14901487
)
14911488
.unwrap();
14921489

1490+
t.setup_endpoints(&task_id, version).await;
1491+
1492+
let client = &t.http_client();
1493+
let hpke_config_list = t.get_hpke_configs_task_id(version, client, &task_id).await.unwrap();
1494+
println!("Generated TaskID: {}",t.task_id);
1495+
1496+
println!("TaskID: {:?}", task_id);
14931497
println!("Now - not_before: {}", t.now - task_config.not_before);
14941498
println!(
14951499
"Now - batch_interval.start: {}",
@@ -1626,7 +1630,7 @@ async fn leader_collect_taskprov_ok(version: DapVersion) {
16261630
assert_eq!(resp.status(), 200);
16271631
assert_eq!(
16281632
resp.bytes().await.unwrap(),
1629-
collection.get_encoded_with_param(&t.version).unwrap()
1633+
collection.get_encoded_with_param(&version).unwrap()
16301634
);
16311635
}
16321636

crates/daphne-server/tests/e2e/test_runner.rs

Lines changed: 144 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ impl TestRunner {
145145
let leader_bearer_token = hex::encode(rng.gen::<[u8; 16]>());
146146
let collector_bearer_token = hex::encode(rng.gen::<[u8; 16]>());
147147
let t = Self {
148-
global_config,
148+
global_config,
149149
task_id,
150150
now,
151151
task_config,
@@ -219,6 +219,8 @@ impl TestRunner {
219219
//
220220
// First, delete the data from the previous test.
221221
t.internal_delete_all(&t.batch_interval()).await.unwrap();
222+
println!("Task_config.not_before: {}", t.task_config.not_before);
223+
println!("t.task_id: {:}", t.task_id);
222224

223225
// Configure the Leader with the task.
224226
let leader_add_task_cmd = json!({
@@ -309,9 +311,127 @@ impl TestRunner {
309311
);
310312

311313
println!("############ starting test ############");
314+
312315
t
313316
}
314317

318+
pub async fn setup_endpoints(&self, task_id: &TaskId, version: DapVersion){
319+
// Configure the endpoints.
320+
//
321+
// First, delete the data from the previous test.
322+
self.internal_delete_all(&self.batch_interval()).await.unwrap();
323+
println!("Task_config.not_before: {}", self.task_config.not_before);
324+
println!("self.task_id: {:}", self.task_id);
325+
326+
let vdaf_verify_key_base64url = encode_base64url(self.task_config.vdaf_verify_key.as_ref());
327+
328+
let (batch_mode, max_batch_size) = match self.task_config.query {
329+
DapBatchMode::TimeInterval => (1, None),
330+
DapBatchMode::LeaderSelected { max_batch_size } => (2, Some(max_batch_size)),
331+
};
332+
333+
let collector_hpke_config_base64url =
334+
encode_base64url(self.collector_hpke_receiver.config.get_encoded().unwrap());
335+
336+
let vdaf = json!({
337+
"type": "Prio2",
338+
"dimension": assert_matches!(
339+
self.task_config.vdaf,
340+
VdafConfig::Prio2{ dimension } => format!("{dimension}")
341+
),
342+
});
343+
// Configure the Leader with the task.
344+
let leader_add_task_cmd = json!({
345+
"task_id": task_id.to_base64url(),
346+
"leader": self.leader_url,
347+
"helper": self.helper_url,
348+
"vdaf": vdaf.clone(),
349+
"leader_authentication_token": self.leader_bearer_token.clone(),
350+
"collector_authentication_token": self.collector_bearer_token.clone(),
351+
"role": "leader",
352+
"vdaf_verify_key": vdaf_verify_key_base64url,
353+
"batch_mode": batch_mode,
354+
"min_batch_size": self.task_config.min_batch_size,
355+
"max_batch_size": max_batch_size,
356+
"time_precision": self.task_config.time_precision,
357+
"collector_hpke_config": collector_hpke_config_base64url.clone(),
358+
"task_expiration": self.task_config.not_after,
359+
"task_commencement": self.task_config.not_before,
360+
});
361+
let add_task_path = format!("{}/internal/test/add_task", version.as_ref());
362+
let res: InternalTestCommandResult = self
363+
.leader_post_internal(&add_task_path, &leader_add_task_cmd)
364+
.await
365+
.unwrap();
366+
assert_eq!(
367+
res.status, "success",
368+
"response status: {}, error: {:?}",
369+
res.status, res.error
370+
);
371+
372+
// Configure the Helper with the task.
373+
let helper_add_task_cmd = json!({
374+
"task_id": task_id.to_base64url(),
375+
"leader": self.leader_url,
376+
"helper": self.helper_url,
377+
"vdaf": vdaf.clone(),
378+
"leader_authentication_token": self.leader_bearer_token.clone(),
379+
"role": "helper",
380+
"vdaf_verify_key": vdaf_verify_key_base64url,
381+
"batch_mode": batch_mode,
382+
"min_batch_size": self.task_config.min_batch_size,
383+
"max_batch_size": max_batch_size,
384+
"time_precision": self.task_config.time_precision,
385+
"collector_hpke_config": collector_hpke_config_base64url.clone(),
386+
"task_expiration": self.task_config.not_after,
387+
"task_commencement": self.task_config.not_before,
388+
});
389+
let res: InternalTestCommandResult = self
390+
.helper_post_internal(&add_task_path, &helper_add_task_cmd)
391+
.await
392+
.unwrap();
393+
assert_eq!(
394+
res.status, "success",
395+
"response status: {}, error: {:?}",
396+
res.status, res.error
397+
);
398+
399+
let gen_config = || {
400+
HpkeReceiverConfig::gen(0, HpkeKemId::X25519HkdfSha256)
401+
.expect("failed to generate receiver config")
402+
};
403+
let res: InternalTestCommandResult = self
404+
.helper_post_internal(
405+
&format!("{version}/internal/test/add_hpke_config"),
406+
&gen_config(),
407+
)
408+
.await
409+
.unwrap();
410+
411+
assert_eq!(
412+
res.status, "success",
413+
"response status: {}, error {:?}",
414+
res.status, res.error
415+
);
416+
417+
let res: InternalTestCommandResult = self
418+
.leader_post_internal(
419+
&format!("{version}/internal/test/add_hpke_config"),
420+
&gen_config(),
421+
)
422+
.await
423+
.unwrap();
424+
425+
assert_eq!(
426+
res.status, "success",
427+
"response status: {}, error {:?}",
428+
res.status, res.error
429+
);
430+
431+
println!("############ starting test ############");
432+
}
433+
434+
315435
pub fn http_client(&self) -> &reqwest::Client {
316436
&self.http_client
317437
}
@@ -352,6 +472,29 @@ impl TestRunner {
352472
])
353473
}
354474

475+
pub async fn get_hpke_configs_task_id(
476+
&self,
477+
_version: DapVersion,
478+
client: &reqwest::Client,
479+
task_id: &TaskId
480+
) -> anyhow::Result<[HpkeConfig; 2]> {
481+
let raw_leader_hpke_config = get_raw_hpke_config(client, task_id.as_ref(), &self.leader_url, "leader", &self.version).await?;
482+
let raw_helper_hpke_config = get_raw_hpke_config(client, task_id.as_ref(), &self.helper_url, "helper", &self.version).await?;
483+
484+
let mut leader_hpke_config_list = HpkeConfigList::get_decoded(&raw_leader_hpke_config)?;
485+
let mut helper_hpke_config_list = HpkeConfigList::get_decoded(&raw_helper_hpke_config)?;
486+
if leader_hpke_config_list.hpke_configs.len() != 1
487+
|| helper_hpke_config_list.hpke_configs.len() != 1
488+
{
489+
panic!("only a length 1 HpkeConfList is currently supported by the test suite")
490+
}
491+
Ok([
492+
leader_hpke_config_list.hpke_configs.pop().unwrap(),
493+
helper_hpke_config_list.hpke_configs.pop().unwrap(),
494+
])
495+
}
496+
497+
355498
pub async fn leader_get_raw_hpke_config(
356499
&self,
357500
client: &reqwest::Client,

crates/daphne/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,7 @@ impl DapTaskParameters {
575575
};
576576

577577
let task_id = taskprov_advertisement.compute_task_id(self.version);
578+
println!("Computed TaskID: {}", task_id);
578579

579580
// Compute the DAP task config.
580581
let task_config = taskprov::DapTaskConfigNeedsOptIn::try_from_taskprov_advertisement(

crates/daphne/src/protocol/aggregator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ impl DapTaskConfig {
100100
where
101101
S: Iterator<Item = Report>,
102102
{
103+
tracing::warn!("DapTaskConfig times:{}..{}", self.not_before, self.not_after);
103104
let (report_count_hint, _upper_bound) = reports.size_hint();
104105

105106
let mut states = Vec::with_capacity(report_count_hint);

crates/daphne/src/protocol/report_init.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,13 @@ impl InitializedReport<()> {
6363
report_share: ReportShare,
6464
agg_param: &DapAggregationParam,
6565
) -> Result<Self, DapError> {
66+
let tc: PartialDapTaskConfigForReportInit = task_config.into().clone();
67+
tracing::warn!("DapTaskConfig times:{}..{}", tc.not_before, tc.not_after);
6668
Self::initialize(
6769
decrypter,
6870
valid_report_range,
6971
task_id,
70-
task_config,
72+
tc.clone(),
7173
report_share,
7274
(),
7375
agg_param,
@@ -123,6 +125,7 @@ impl<'s> From<&'s PartialDapTaskConfigForReportInit<'_>> for PartialDapTaskConfi
123125
}
124126
}
125127

128+
#[derive(Clone)]
126129
pub struct PartialDapTaskConfigForReportInit<'s> {
127130
pub not_before: messages::Time,
128131
pub not_after: messages::Time,
@@ -156,7 +159,10 @@ impl<P> InitializedReport<P> {
156159
};
157160
}
158161

162+
tracing::info!("report timestamp: {}", report_share.report_metadata.time);
159163
tracing::info!("valid_report_range: {}..{}", valid_report_range.start, valid_report_range.end);
164+
tracing::info!("task_config.range: {}..{}", task_config.not_before, task_config.not_after);
165+
160166
match report_share.report_metadata.time {
161167
t if t >= task_config.not_after => reject!(TaskExpired),
162168
t if t < task_config.not_before => {tracing::warn!("Reject TaskNotStarted"); reject!(TaskNotStarted)},

crates/daphne/src/roles/leader/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,7 @@ pub async fn process<A: DapLeader>(
543543

544544
tracing::debug!("RUNNING read_work_stream");
545545

546+
tracing::warn!("Aggregator valid time range: {:?}", aggregator.valid_report_time_range());
546547
let mut agg_jobs = HashMap::new();
547548
let mut pending_coll_jobs = Vec::new();
548549
for work_item in aggregator.dequeue_work(num_items).await? {
@@ -565,6 +566,7 @@ pub async fn process<A: DapLeader>(
565566
return Ok(0);
566567
}
567568

569+
tracing::warn!("Retrieved time range: {}..{}", task_config.not_before, task_config.not_after);
568570
tracing::debug!(
569571
"RUNNING run_agg_job FOR TID {task_id} AND {part_batch_sel:?} AND {host}"
570572
);

crates/daphne/src/testing/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -689,6 +689,7 @@ impl DapAggregator for InMemoryAggregator {
689689
task_id: &TaskId,
690690
task_config: DapTaskConfig,
691691
) -> Result<(), DapError> {
692+
tracing::warn!("Put taskprov config");
692693
let mut tasks = self.tasks.lock().expect("tasks: lock failed");
693694
tasks.deref_mut().insert(*task_id, task_config);
694695
Ok(())

0 commit comments

Comments
 (0)