Skip to content

Commit 13dd33c

Browse files
committed
WIP
1 parent ab85fa3 commit 13dd33c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+2365
-331
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,9 @@ tracing = "0.1.40"
8989
tracing-core = "0.1.32"
9090
tracing-subscriber = "0.3.18"
9191
url = { version = "2.5.4", features = ["serde"] }
92+
wasm-bindgen = "0.2.99"
9293
webpki = "0.22.4"
93-
worker = { version = "0.5", features = ["http"] }
94+
worker = "0.5"
9495
x509-parser = "0.15.1"
9596

9697
[workspace.dependencies.sentry]

Makefile

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@ helper:
2121
-c ./crates/daphne-server/examples/configuration-helper.toml
2222
h: helper
2323

24+
compute-offload:
25+
RUST_LOG=hyper=off,debug cargo run \
26+
--profile release-symbols \
27+
--features test-utils \
28+
--example service \
29+
-- \
30+
-c ./crates/daphne-server/examples/configuration-cpu-offload.toml
31+
co: compute-offload
32+
33+
2434
helper-worker:
2535
cd ./crates/daphne-worker-test/ && \
2636
wrangler dev -c wrangler.aggregator.toml --port 8788 -e helper

crates/dapf/src/acceptance/mod.rs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use daphne::{
2424
hpke::{HpkeConfig, HpkeKemId, HpkeReceiverConfig},
2525
messages::{
2626
self, taskprov::TaskprovAdvertisement, AggregateShareReq, AggregationJobId, Base64Encode,
27-
BatchId, BatchSelector, PartialBatchSelector, TaskId,
27+
BatchId, BatchSelector, PartialBatchSelector, ReadyAggregationJobResp, TaskId,
2828
},
2929
metrics::DaphneMetrics,
3030
testing::report_generator::ReportGenerator,
@@ -508,7 +508,7 @@ impl Test {
508508
let _guard = load_control.wait().await;
509509
info!("Starting AggregationJobInitReq");
510510
let start = Instant::now();
511-
let agg_job_resp = self
511+
let mut agg_job_resp = self
512512
.http_client
513513
.submit_aggregation_job_init_req(
514514
self.helper_url.join(&format!(
@@ -525,14 +525,30 @@ impl Test {
525525
)
526526
.await?;
527527
let duration = start.elapsed();
528-
info!("Finished AggregationJobInitReq in {duration:#?}");
528+
info!("Finished submitting AggregationJobInitReq in {duration:#?}");
529+
let ready = loop {
530+
agg_job_resp = match agg_job_resp {
531+
messages::AggregationJobResp::Ready { transitions } => {
532+
break ReadyAggregationJobResp { transitions }
533+
}
534+
messages::AggregationJobResp::Processing => {
535+
self.http_client
536+
.poll_aggregation_job_init(
537+
self.helper_url
538+
.join(&format!("tasks/{task_id}/aggregation_jobs/{agg_job_id}"))?,
539+
task_config.version,
540+
functions::helper::Options {
541+
taskprov_advertisement: taskprov_advertisement.as_ref(),
542+
bearer_token: self.bearer_token.as_ref(),
543+
},
544+
)
545+
.await?
546+
}
547+
};
548+
};
529549

530-
let agg_share_span = task_config.consume_agg_job_resp(
531-
task_id,
532-
agg_job_state,
533-
agg_job_resp,
534-
self.metrics(),
535-
)?;
550+
let agg_share_span =
551+
task_config.consume_agg_job_resp(task_id, agg_job_state, ready, self.metrics())?;
536552

537553
let aggregated_report_count = agg_share_span
538554
.iter()

crates/dapf/src/functions/helper.rs

Lines changed: 55 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
22
// SPDX-License-Identifier: BSD-3-Clause
33

4-
use anyhow::{anyhow, Context as _};
4+
use anyhow::{anyhow, Context};
55
use daphne::{
66
constants::DapMediaType,
77
error::aborts::ProblemDetails,
@@ -12,7 +12,7 @@ use daphne::{
1212
DapVersion,
1313
};
1414
use daphne_service_utils::{bearer_token::BearerToken, http_headers};
15-
use prio::codec::{ParameterizedDecode as _, ParameterizedEncode as _};
15+
use prio::codec::{ParameterizedDecode, ParameterizedEncode as _};
1616
use reqwest::header;
1717
use url::Url;
1818

@@ -43,30 +43,30 @@ impl HttpClient {
4343
.send()
4444
.await
4545
.context("sending AggregationJobInitReq")?;
46-
if resp.status() == 400 {
47-
let text = resp.text().await?;
48-
let problem_details: ProblemDetails =
49-
serde_json::from_str(&text).with_context(|| {
50-
format!("400 Bad Request: failed to parse problem details document: {text:?}")
51-
})?;
52-
Err(anyhow!("400 Bad Request: {problem_details:?}"))
53-
} else if resp.status() == 500 {
54-
Err(anyhow::anyhow!(
55-
"500 Internal Server Error: {}",
56-
resp.text().await?
57-
))
58-
} else if !resp.status().is_success() {
59-
Err(response_to_anyhow(resp).await).context("while running an AggregationJobInitReq")
60-
} else {
61-
AggregationJobResp::get_decoded_with_param(
62-
&version,
63-
&resp
64-
.bytes()
65-
.await
66-
.context("transfering bytes from the AggregateInitReq")?,
67-
)
68-
.with_context(|| "failed to parse response to AggregateInitReq from Helper")
69-
}
46+
handle_response(resp, &version).await
47+
}
48+
49+
pub async fn poll_aggregation_job_init(
50+
&self,
51+
url: Url,
52+
version: DapVersion,
53+
opts: Options<'_>,
54+
) -> anyhow::Result<AggregationJobResp> {
55+
let resp = self
56+
.get(url)
57+
.headers(construct_request_headers(
58+
DapMediaType::AggregationJobInitReq
59+
.as_str_for_version(version)
60+
.with_context(|| {
61+
format!("AggregationJobInitReq media type is not defined for {version}")
62+
})?,
63+
version,
64+
opts,
65+
)?)
66+
.send()
67+
.await
68+
.context("polling aggregation job init req")?;
69+
handle_response(resp, &version).await
7070
}
7171

7272
pub async fn get_aggregate_share(
@@ -144,3 +144,32 @@ fn construct_request_headers(
144144
}
145145
Ok(headers)
146146
}
147+
148+
async fn handle_response<R, P>(resp: reqwest::Response, params: &P) -> anyhow::Result<R>
149+
where
150+
R: ParameterizedDecode<P>,
151+
{
152+
if resp.status() == 400 {
153+
let text = resp.text().await?;
154+
let problem_details: ProblemDetails = serde_json::from_str(&text).with_context(|| {
155+
format!("400 Bad Request: failed to parse problem details document: {text:?}")
156+
})?;
157+
Err(anyhow!("400 Bad Request: {problem_details:?}"))
158+
} else if resp.status() == 500 {
159+
Err(anyhow::anyhow!(
160+
"500 Internal Server Error: {}",
161+
resp.text().await?
162+
))
163+
} else if !resp.status().is_success() {
164+
Err(response_to_anyhow(resp).await).context("while running an AggregationJobInitReq")
165+
} else {
166+
R::get_decoded_with_param(
167+
params,
168+
&resp
169+
.bytes()
170+
.await
171+
.context("transfering bytes from the AggregateInitReq")?,
172+
)
173+
.with_context(|| "failed to parse response to AggregateInitReq from Helper")
174+
}
175+
}

crates/daphne-server/src/roles/aggregator.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
1+
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
22
// SPDX-License-Identifier: BSD-3-Clause
33

44
use std::{future::ready, num::NonZeroUsize, ops::Range, time::SystemTime};
@@ -79,6 +79,7 @@ impl DapAggregator for crate::App {
7979
#[tracing::instrument(skip(self))]
8080
async fn get_agg_share(
8181
&self,
82+
_version: DapVersion,
8283
task_id: &TaskId,
8384
batch_sel: &BatchSelector,
8485
) -> Result<DapAggregateShare, DapError> {

crates/daphne-server/src/roles/helper.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33

44
use axum::async_trait;
55
use daphne::{
6-
messages::{AggregationJobId, TaskId},
6+
fatal_error,
7+
messages::{AggregationJobId, AggregationJobResp, TaskId},
78
roles::{helper::AggregationJobRequestHash, DapHelper},
89
DapError, DapVersion,
910
};
@@ -20,4 +21,13 @@ impl DapHelper for crate::App {
2021
// the server implementation can't check for this
2122
Ok(())
2223
}
24+
25+
async fn poll_aggregated(
26+
&self,
27+
_version: DapVersion,
28+
_task_id: &TaskId,
29+
_agg_job_id: &AggregationJobId,
30+
) -> Result<AggregationJobResp, DapError> {
31+
Err(fatal_error!(err = "polling not implemented"))
32+
}
2333
}

crates/daphne-server/src/roles/leader.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,10 @@ impl DapLeader for crate::App {
121121
{
122122
self.send_http(meta, Method::PUT, url, payload).await
123123
}
124+
125+
async fn send_http_get(&self, meta: DapRequestMeta, url: Url) -> Result<DapResponse, DapError> {
126+
self.send_http(meta, Method::PUT, url, ()).await
127+
}
124128
}
125129

126130
impl crate::App {

crates/daphne-server/src/router/extractor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,12 @@ impl DecodeFromDapHttpBody for HashedAggregationJobReq {
7272
}
7373
}
7474

75-
/// Using `()` ignores the body of a request.
7675
impl DecodeFromDapHttpBody for CollectionPollReq {
7776
fn decode_from_http_body(_bytes: Bytes, _meta: &DapRequestMeta) -> Result<Self, DapAbort> {
7877
Ok(Self)
7978
}
8079
}
80+
8181
/// Using `()` ignores the body of a request.
8282
impl DecodeFromDapHttpBody for () {
8383
fn decode_from_http_body(_bytes: Bytes, _meta: &DapRequestMeta) -> Result<Self, DapAbort> {

crates/daphne-service-utils/build.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ fn main() {
1111
#[cfg(feature = "durable_requests")]
1212
compiler
1313
.file("./src/durable_requests/durable_request.capnp")
14-
.file("./src/durable_requests/bindings/aggregation_job_store.capnp");
14+
.file("./src/durable_requests/bindings/aggregation_job_store.capnp")
15+
.file("./src/durable_requests/bindings/aggregate_store_v2.capnp")
16+
.file("./src/durable_requests/bindings/agg_job_response_store.capnp")
17+
.file("./src/durable_requests/bindings/replay_checker.capnp");
1518

1619
#[cfg(feature = "compute-offload")]
1720
compiler.file("./src/compute_offload/compute_offload.capnp");

0 commit comments

Comments
 (0)