Skip to content

Commit a7f9e64

Browse files
committed
Implement the async worker
1 parent bfe5b34 commit a7f9e64

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+2547
-427
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,9 @@ tracing = "0.1.40"
9090
tracing-core = "0.1.32"
9191
tracing-subscriber = "0.3.18"
9292
url = { version = "2.5.4", features = ["serde"] }
93+
wasm-bindgen = "0.2.99"
9394
webpki = "0.22.4"
94-
worker = { version = "0.5", features = ["http"] }
95+
worker = "0.5"
9596
x509-parser = "0.15.1"
9697

9798
[workspace.dependencies.sentry]

Makefile

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@ helper:
2121
-c ./crates/daphne-server/examples/configuration-helper.toml
2222
h: helper
2323

24+
compute-offload:
25+
RUST_LOG=hyper=off,debug cargo run \
26+
--profile release-symbols \
27+
--features test-utils \
28+
--example service \
29+
-- \
30+
-c ./crates/daphne-server/examples/configuration-cpu-offload.toml
31+
co: compute-offload
32+
33+
2434
helper-worker:
2535
cd ./crates/daphne-worker-test/ && \
2636
wrangler dev -c wrangler.aggregator.toml --port 8788 -e helper

crates/dapf/src/acceptance/mod.rs

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use daphne::{
2727
BatchId, BatchSelector, PartialBatchSelector, TaskId,
2828
},
2929
metrics::DaphneMetrics,
30+
protocol::ReadyAggregationJobResp,
3031
testing::report_generator::ReportGenerator,
3132
vdaf::VdafConfig,
3233
DapAggregateShare, DapAggregateSpan, DapAggregationParam, DapBatchMode, DapMeasurement,
@@ -511,7 +512,7 @@ impl Test {
511512
let _guard = load_control.wait().await;
512513
info!("Starting AggregationJobInitReq");
513514
let start = Instant::now();
514-
let agg_job_resp = self
515+
let mut agg_job_resp = self
515516
.http_client
516517
.submit_aggregation_job_init_req(
517518
self.helper_url.join(&format!(
@@ -528,14 +529,42 @@ impl Test {
528529
)
529530
.await?;
530531
let duration = start.elapsed();
531-
info!("Finished AggregationJobInitReq in {duration:#?}");
532+
info!("Finished submitting AggregationJobInitReq in {duration:#?}");
533+
let mut poll_count = 1;
534+
let ready = loop {
535+
agg_job_resp = match agg_job_resp {
536+
messages::AggregationJobResp::Ready { prep_resps } => {
537+
if poll_count != 1 {
538+
info!(
539+
"Finished polling for AggregationJobResp after {:#?}",
540+
start.elapsed()
541+
);
542+
}
543+
break ReadyAggregationJobResp { prep_resps };
544+
}
545+
messages::AggregationJobResp::Processing => {
546+
if poll_count == 1 {
547+
info!("Polling for AggregationJobResp");
548+
}
549+
tokio::time::sleep(Duration::from_millis(poll_count * 200)).await;
550+
poll_count += 1;
551+
self.http_client
552+
.poll_aggregation_job_init(
553+
self.helper_url
554+
.join(&format!("tasks/{task_id}/aggregation_jobs/{agg_job_id}"))?,
555+
task_config.version,
556+
functions::helper::Options {
557+
taskprov_advertisement: taskprov_advertisement.as_ref(),
558+
bearer_token: self.bearer_token.as_ref(),
559+
},
560+
)
561+
.await?
562+
}
563+
};
564+
};
532565

533-
let agg_share_span = task_config.consume_agg_job_resp(
534-
task_id,
535-
agg_job_state,
536-
agg_job_resp.unwrap_ready(), // TODO: implement polling
537-
self.metrics(),
538-
)?;
566+
let agg_share_span =
567+
task_config.consume_agg_job_resp(task_id, agg_job_state, ready, self.metrics())?;
539568

540569
let aggregated_report_count = agg_share_span
541570
.iter()

crates/daphne-server/src/roles/aggregator.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
1+
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
22
// SPDX-License-Identifier: BSD-3-Clause
33

44
use std::{future::ready, num::NonZeroUsize, ops::Range, time::SystemTime};
@@ -79,6 +79,7 @@ impl DapAggregator for crate::App {
7979
#[tracing::instrument(skip(self))]
8080
async fn get_agg_share(
8181
&self,
82+
_version: DapVersion,
8283
task_id: &TaskId,
8384
batch_sel: &BatchSelector,
8485
) -> Result<DapAggregateShare, DapError> {
@@ -115,6 +116,7 @@ impl DapAggregator for crate::App {
115116
#[tracing::instrument(skip(self))]
116117
async fn mark_collected(
117118
&self,
119+
_version: DapVersion,
118120
task_id: &TaskId,
119121
batch_sel: &BatchSelector,
120122
) -> Result<(), DapError> {
@@ -255,6 +257,7 @@ impl DapAggregator for crate::App {
255257

256258
async fn is_batch_overlapping(
257259
&self,
260+
_version: DapVersion,
258261
task_id: &TaskId,
259262
batch_sel: &BatchSelector,
260263
) -> Result<bool, DapError> {
@@ -288,7 +291,12 @@ impl DapAggregator for crate::App {
288291
)
289292
}
290293

291-
async fn batch_exists(&self, task_id: &TaskId, batch_id: &BatchId) -> Result<bool, DapError> {
294+
async fn batch_exists(
295+
&self,
296+
_version: DapVersion,
297+
task_id: &TaskId,
298+
batch_id: &BatchId,
299+
) -> Result<bool, DapError> {
292300
let task_config = self
293301
.get_task_config_for(task_id)
294302
.await?

crates/daphne-server/src/roles/helper.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33

44
use axum::async_trait;
55
use daphne::{
6-
messages::{AggregationJobId, TaskId},
6+
fatal_error,
7+
messages::{AggregationJobId, AggregationJobResp, TaskId},
78
roles::{helper::AggregationJobRequestHash, DapHelper},
89
DapError, DapVersion,
910
};
@@ -20,4 +21,13 @@ impl DapHelper for crate::App {
2021
// the server implementation can't check for this
2122
Ok(())
2223
}
24+
25+
async fn poll_aggregated(
26+
&self,
27+
_version: DapVersion,
28+
_task_id: &TaskId,
29+
_agg_job_id: &AggregationJobId,
30+
) -> Result<AggregationJobResp, DapError> {
31+
Err(fatal_error!(err = "polling not implemented"))
32+
}
2333
}

crates/daphne-server/src/roles/leader.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,10 @@ impl DapLeader for crate::App {
121121
{
122122
self.send_http(meta, Method::PUT, url, payload).await
123123
}
124+
125+
async fn send_http_get(&self, meta: DapRequestMeta, url: Url) -> Result<DapResponse, DapError> {
126+
self.send_http(meta, Method::PUT, url, ()).await
127+
}
124128
}
125129

126130
impl crate::App {

crates/daphne-server/src/router/extractor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,12 @@ impl DecodeFromDapHttpBody for HashedAggregationJobReq {
7272
}
7373
}
7474

75-
/// Using `()` ignores the body of a request.
7675
impl DecodeFromDapHttpBody for CollectionPollReq {
7776
fn decode_from_http_body(_bytes: Bytes, _meta: &DapRequestMeta) -> Result<Self, DapAbort> {
7877
Ok(Self)
7978
}
8079
}
80+
8181
/// Using `()` ignores the body of a request.
8282
impl DecodeFromDapHttpBody for () {
8383
fn decode_from_http_body(_bytes: Bytes, _meta: &DapRequestMeta) -> Result<Self, DapAbort> {

crates/daphne-service-utils/build.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ fn main() {
1111
#[cfg(feature = "durable_requests")]
1212
compiler
1313
.file("./src/durable_requests/durable_request.capnp")
14-
.file("./src/durable_requests/bindings/aggregation_job_store.capnp");
14+
.file("./src/durable_requests/bindings/aggregation_job_store.capnp")
15+
.file("./src/durable_requests/bindings/aggregate_store_v2.capnp")
16+
.file("./src/durable_requests/bindings/agg_job_response_store.capnp")
17+
.file("./src/durable_requests/bindings/replay_checker.capnp");
1518

1619
#[cfg(feature = "compute-offload")]
1720
compiler.file("./src/compute_offload/compute_offload.capnp");

crates/daphne-service-utils/src/capnproto/base.capnp

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,29 @@ struct U8L16 @0x9e3f65b13f71cfcb {
2222
snd @1 :UInt64;
2323
}
2424

25-
struct PartialBatchSelector {
25+
struct PartialBatchSelector @0xae86084e56c22fc0 {
2626
union {
2727
timeInterval @0 :Void;
2828
leaderSelectedByBatchId @1 :BatchId;
2929
}
3030
}
3131

32+
enum ReportError @0xa76428617779e659 {
33+
reserved @0;
34+
batchCollected @1;
35+
reportReplayed @2;
36+
reportDropped @3;
37+
hpkeUnknownConfigId @4;
38+
hpkeDecryptError @5;
39+
vdafPrepError @6;
40+
batchSaturated @7;
41+
taskExpired @8;
42+
invalidMessage @9;
43+
reportTooEarly @10;
44+
taskNotStarted @11;
45+
}
46+
47+
3248
using ReportId = U8L16;
3349
using BatchId = U8L32;
3450
using TaskId = U8L32;

0 commit comments

Comments
 (0)