Skip to content

Commit e7babd9

Browse files
committed
wip test
1 parent 919159f commit e7babd9

File tree

8 files changed

+179
-80
lines changed

8 files changed

+179
-80
lines changed

crates/dapf/src/acceptance/mod.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use daphne::{
2424
hpke::{HpkeConfig, HpkeKemId, HpkeReceiverConfig},
2525
messages::{
2626
self, taskprov::TaskprovAdvertisement, AggregateShareReq, AggregationJobId, Base64Encode,
27-
BatchId, BatchSelector, PartialBatchSelector, TaskId,
27+
BatchId, BatchSelector, PartialBatchSelector, ReadyAggregationJobResp, TaskId,
2828
},
2929
metrics::DaphneMetrics,
3030
testing::report_generator::ReportGenerator,
@@ -508,7 +508,7 @@ impl Test {
508508
let _guard = load_control.wait().await;
509509
info!("Starting AggregationJobInitReq");
510510
let start = Instant::now();
511-
let agg_job_resp = self
511+
let mut agg_job_resp = self
512512
.http_client
513513
.submit_aggregation_job_init_req(
514514
self.helper_url.join(&format!(
@@ -525,14 +525,30 @@ impl Test {
525525
)
526526
.await?;
527527
let duration = start.elapsed();
528-
info!("Finished AggregationJobInitReq in {duration:#?}");
528+
info!("Finished submitting AggregationJobInitReq in {duration:#?}");
529+
let ready = loop {
530+
agg_job_resp = match agg_job_resp {
531+
messages::AggregationJobResp::Ready { transitions } => {
532+
break ReadyAggregationJobResp { transitions }
533+
}
534+
messages::AggregationJobResp::Processing => {
535+
self.http_client
536+
.poll_aggregation_job_init(
537+
self.helper_url
538+
.join(&format!("tasks/{task_id}/aggregation_jobs/{agg_job_id}"))?,
539+
task_config.version,
540+
functions::helper::Options {
541+
taskprov_advertisement: taskprov_advertisement.as_ref(),
542+
bearer_token: self.bearer_token.as_ref(),
543+
},
544+
)
545+
.await?
546+
}
547+
};
548+
};
529549

530-
let agg_share_span = task_config.consume_agg_job_resp(
531-
task_id,
532-
agg_job_state,
533-
agg_job_resp,
534-
self.metrics(),
535-
)?;
550+
let agg_share_span =
551+
task_config.consume_agg_job_resp(task_id, agg_job_state, ready, self.metrics())?;
536552

537553
let aggregated_report_count = agg_share_span
538554
.iter()

crates/dapf/src/functions/helper.rs

Lines changed: 55 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
22
// SPDX-License-Identifier: BSD-3-Clause
33

4-
use anyhow::{anyhow, Context as _};
4+
use anyhow::{anyhow, Context};
55
use daphne::{
66
constants::DapMediaType,
77
error::aborts::ProblemDetails,
@@ -12,7 +12,7 @@ use daphne::{
1212
DapVersion,
1313
};
1414
use daphne_service_utils::{bearer_token::BearerToken, http_headers};
15-
use prio::codec::{ParameterizedDecode as _, ParameterizedEncode as _};
15+
use prio::codec::{ParameterizedDecode, ParameterizedEncode as _};
1616
use reqwest::header;
1717
use url::Url;
1818

@@ -43,30 +43,30 @@ impl HttpClient {
4343
.send()
4444
.await
4545
.context("sending AggregationJobInitReq")?;
46-
if resp.status() == 400 {
47-
let text = resp.text().await?;
48-
let problem_details: ProblemDetails =
49-
serde_json::from_str(&text).with_context(|| {
50-
format!("400 Bad Request: failed to parse problem details document: {text:?}")
51-
})?;
52-
Err(anyhow!("400 Bad Request: {problem_details:?}"))
53-
} else if resp.status() == 500 {
54-
Err(anyhow::anyhow!(
55-
"500 Internal Server Error: {}",
56-
resp.text().await?
57-
))
58-
} else if !resp.status().is_success() {
59-
Err(response_to_anyhow(resp).await).context("while running an AggregationJobInitReq")
60-
} else {
61-
AggregationJobResp::get_decoded_with_param(
62-
&version,
63-
&resp
64-
.bytes()
65-
.await
66-
.context("transfering bytes from the AggregateInitReq")?,
67-
)
68-
.with_context(|| "failed to parse response to AggregateInitReq from Helper")
69-
}
46+
handle_response(resp, &version).await
47+
}
48+
49+
pub async fn poll_aggregation_job_init(
50+
&self,
51+
url: Url,
52+
version: DapVersion,
53+
opts: Options<'_>,
54+
) -> anyhow::Result<AggregationJobResp> {
55+
let resp = self
56+
.get(url)
57+
.headers(construct_request_headers(
58+
DapMediaType::AggregationJobInitReq
59+
.as_str_for_version(version)
60+
.with_context(|| {
61+
format!("AggregationJobInitReq media type is not defined for {version}")
62+
})?,
63+
version,
64+
opts,
65+
)?)
66+
.send()
67+
.await
68+
.context("polling aggregation job init req")?;
69+
handle_response(resp, &version).await
7070
}
7171

7272
pub async fn get_aggregate_share(
@@ -144,3 +144,32 @@ fn construct_request_headers(
144144
}
145145
Ok(headers)
146146
}
147+
148+
async fn handle_response<R, P>(resp: reqwest::Response, params: &P) -> anyhow::Result<R>
149+
where
150+
R: ParameterizedDecode<P>,
151+
{
152+
if resp.status() == 400 {
153+
let text = resp.text().await?;
154+
let problem_details: ProblemDetails = serde_json::from_str(&text).with_context(|| {
155+
format!("400 Bad Request: failed to parse problem details document: {text:?}")
156+
})?;
157+
Err(anyhow!("400 Bad Request: {problem_details:?}"))
158+
} else if resp.status() == 500 {
159+
Err(anyhow::anyhow!(
160+
"500 Internal Server Error: {}",
161+
resp.text().await?
162+
))
163+
} else if !resp.status().is_success() {
164+
Err(response_to_anyhow(resp).await).context("while running an AggregationJobInitReq")
165+
} else {
166+
R::get_decoded_with_param(
167+
params,
168+
&resp
169+
.bytes()
170+
.await
171+
.context("transfering bytes from the AggregateInitReq")?,
172+
)
173+
.with_context(|| "failed to parse response to AggregateInitReq from Helper")
174+
}
175+
}

crates/daphne-server/src/roles/leader.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,10 @@ impl DapLeader for crate::App {
121121
{
122122
self.send_http(meta, Method::PUT, url, payload).await
123123
}
124+
125+
async fn send_http_get(&self, meta: DapRequestMeta, url: Url) -> Result<DapResponse, DapError> {
126+
self.send_http(meta, Method::PUT, url, ()).await
127+
}
124128
}
125129

126130
impl crate::App {

crates/daphne-worker/src/aggregator/roles/leader.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,10 @@ impl DapLeader for App {
125125
{
126126
self.send_http(meta, Method::PUT, url, payload).await
127127
}
128+
129+
async fn send_http_get(&self, meta: DapRequestMeta, url: Url) -> Result<DapResponse, DapError> {
130+
self.send_http(meta, Method::GET, url, ()).await
131+
}
128132
}
129133

130134
impl App {

crates/daphne/src/protocol/aggregator.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ use crate::{
1111
fatal_error,
1212
hpke::{info_and_aad, HpkeConfig, HpkeDecrypter},
1313
messages::{
14-
self, encode_u32_bytes, AggregationJobInitReq, AggregationJobResp, Base64Encode,
15-
BatchSelector, HpkeCiphertext, PartialBatchSelector, PrepareInit, ReadyAggregationJobResp,
16-
Report, ReportError, ReportId, ReportShare, TaskId, Transition, TransitionVar,
14+
self, encode_u32_bytes, AggregationJobInitReq, Base64Encode, BatchSelector, HpkeCiphertext,
15+
PartialBatchSelector, PrepareInit, ReadyAggregationJobResp, Report, ReportError, ReportId,
16+
ReportShare, TaskId, Transition, TransitionVar,
1717
},
1818
metrics::{DaphneMetrics, ReportStatus},
1919
protocol::{decode_ping_pong_framed, PingPongMessageType},
@@ -364,17 +364,14 @@ impl DapTaskConfig {
364364
&self,
365365
task_id: &TaskId,
366366
state: DapAggregationJobState,
367-
agg_job_resp: AggregationJobResp,
367+
agg_job_resp: ReadyAggregationJobResp,
368368
metrics: &dyn DaphneMetrics,
369369
) -> Result<DapAggregateSpan<DapAggregateShare>, DapError> {
370-
let AggregationJobResp::Ready { transitions } = agg_job_resp else {
371-
todo!("polling from the leader not implemented yet")
372-
};
373-
if transitions.len() != state.seq.len() {
370+
if agg_job_resp.transitions.len() != state.seq.len() {
374371
return Err(DapAbort::InvalidMessage {
375372
detail: format!(
376373
"aggregation job response has {} reports; expected {}",
377-
transitions.len(),
374+
agg_job_resp.transitions.len(),
378375
state.seq.len(),
379376
),
380377
task_id: *task_id,
@@ -383,7 +380,7 @@ impl DapTaskConfig {
383380
}
384381

385382
let mut agg_span = DapAggregateSpan::default();
386-
for (helper, leader) in zip(transitions, state.seq) {
383+
for (helper, leader) in zip(&agg_job_resp.transitions, state.seq) {
387384
if helper.report_id != leader.report_id {
388385
return Err(DapAbort::InvalidMessage {
389386
detail: format!(

crates/daphne/src/protocol/mod.rs

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -552,17 +552,15 @@ mod test {
552552
]);
553553
let (leader_state, agg_job_init_req) =
554554
t.produce_agg_job_req(&DapAggregationParam::Empty, reports);
555-
let (_agg_span, mut agg_job_resp) = t.handle_agg_job_req(agg_job_init_req);
555+
let (_agg_span, agg_job_resp) = t.handle_agg_job_req(agg_job_init_req);
556556

557-
let AggregationJobResp::Ready { transitions } = &mut agg_job_resp else {
558-
panic!("expected a ready response, got processing")
559-
};
557+
let mut agg_job_resp = agg_job_resp.unwrap_ready();
560558

561559
// Helper sends transitions out of order.
562-
transitions.swap(0, 1);
560+
agg_job_resp.transitions.swap(0, 1);
563561

564562
assert_matches!(
565-
t.consume_agg_job_resp_expect_err(leader_state, agg_job_resp),
563+
t.consume_agg_job_resp_expect_err(leader_state, agg_job_resp,),
566564
DapError::Abort(DapAbort::InvalidMessage { .. })
567565
);
568566
}
@@ -577,15 +575,13 @@ mod test {
577575
]);
578576
let (leader_state, agg_job_init_req) =
579577
t.produce_agg_job_req(&DapAggregationParam::Empty, reports);
580-
let (_agg_span, mut agg_job_resp) = t.handle_agg_job_req(agg_job_init_req);
578+
let (_agg_span, agg_job_resp) = t.handle_agg_job_req(agg_job_init_req);
581579

582-
let AggregationJobResp::Ready { transitions } = &mut agg_job_resp else {
583-
panic!("expected a ready response, got processing")
584-
};
580+
let mut agg_job_resp = agg_job_resp.unwrap_ready();
585581

586582
// Helper sends a transition twice.
587-
let repeated_transition = transitions[0].clone();
588-
transitions.push(repeated_transition);
583+
let repeated_transition = agg_job_resp.transitions[0].clone();
584+
agg_job_resp.transitions.push(repeated_transition);
589585

590586
assert_matches!(
591587
t.consume_agg_job_resp_expect_err(leader_state, agg_job_resp),
@@ -604,13 +600,12 @@ mod test {
604600
]);
605601
let (leader_state, agg_job_init_req) =
606602
t.produce_agg_job_req(&DapAggregationParam::Empty, reports);
607-
let (_agg_span, mut agg_job_resp) = t.handle_agg_job_req(agg_job_init_req);
603+
let (_agg_span, agg_job_resp) = t.handle_agg_job_req(agg_job_init_req);
604+
605+
let mut agg_job_resp = agg_job_resp.unwrap_ready();
608606

609-
let AggregationJobResp::Ready { transitions } = &mut agg_job_resp else {
610-
panic!("expected a ready response, got processing")
611-
};
612607
// Helper sent a transition with an unrecognized report ID.
613-
transitions.push(Transition {
608+
agg_job_resp.transitions.push(Transition {
614609
report_id: ReportId(rng.gen()),
615610
var: TransitionVar::Continued(b"whatever".to_vec()),
616611
});
@@ -628,14 +623,13 @@ mod test {
628623
let reports = t.produce_reports(vec![DapMeasurement::U32Vec(vec![1; 10])]);
629624
let (leader_state, agg_job_init_req) =
630625
t.produce_agg_job_req(&DapAggregationParam::Empty, reports);
631-
let (_helper_agg_span, mut agg_job_resp) = t.handle_agg_job_req(agg_job_init_req);
626+
let (_helper_agg_span, agg_job_resp) = t.handle_agg_job_req(agg_job_init_req);
627+
628+
let mut agg_job_resp = agg_job_resp.unwrap_ready();
632629

633-
let AggregationJobResp::Ready { transitions } = &mut agg_job_resp else {
634-
panic!("expected a ready response, got processing")
635-
};
636630
// Helper sent a transition with an unrecognized report ID. Simulate this by flipping the
637631
// first bit of the report ID.
638-
transitions[0].report_id.0[0] ^= 1;
632+
agg_job_resp.transitions[0].report_id.0[0] ^= 1;
639633

640634
assert_matches!(
641635
t.consume_agg_job_resp_expect_err(leader_state, agg_job_resp),
@@ -660,6 +654,7 @@ mod test {
660654

661655
let (leader_agg_span, helper_agg_span) = {
662656
let (helper_agg_span, agg_job_resp) = t.handle_agg_job_req(agg_job_init_req);
657+
let agg_job_resp = agg_job_resp.unwrap_ready();
663658
let leader_agg_span = t.consume_agg_job_resp(leader_state, agg_job_resp);
664659

665660
(leader_agg_span, helper_agg_span)
@@ -713,12 +708,11 @@ mod test {
713708
.collect::<Vec<_>>();
714709
let (helper_agg_span, agg_job_resp) = t.handle_agg_job_req(agg_job_init_req);
715710

716-
let AggregationJobResp::Ready { transitions } = &agg_job_resp else {
717-
panic!("expected a ready response, got processing")
718-
};
711+
let agg_job_resp = agg_job_resp.unwrap_ready();
712+
719713
assert_eq!(2, helper_agg_span.report_count());
720-
assert_eq!(3, transitions.len());
721-
for (transition, prep_init_id) in zip(transitions, prep_init_ids) {
714+
assert_eq!(3, agg_job_resp.transitions.len());
715+
for (transition, prep_init_id) in zip(&agg_job_resp.transitions, prep_init_ids) {
722716
assert_eq!(transition.report_id, prep_init_id);
723717
}
724718

0 commit comments

Comments
 (0)