Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
733 changes: 409 additions & 324 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions crates/daphne-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
# Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause

[package]
Expand All @@ -17,7 +17,7 @@ all-features = true

[dependencies]
daphne = { path = "../daphne" }
daphne-service-utils = { path = "../daphne-service-utils", features = ["durable_requests"] }
daphne-service-utils = { path = "../daphne-service-utils", features = ["durable_requests", "compute-offload"] }
either.workspace = true
futures.workspace = true
hex.workspace = true
Expand All @@ -33,6 +33,7 @@ tokio.workspace = true
tower = { workspace = true, features = ["util"] }
tracing.workspace = true
url.workspace = true
rayon.workspace = true

[dependencies.axum]
workspace = true
Expand Down
7 changes: 7 additions & 0 deletions crates/daphne-server/docker/example-service.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,10 @@ RUN sed -i 's/localhost/leader_storage/g' configuration.toml
COPY --from=builder /dap/target/debug/examples/service .

ENTRYPOINT ["./service"]

FROM debian:bookworm AS compute-offload

COPY ./crates/daphne-server/examples/configuration-cpu-offload.toml configuration.toml
COPY --from=builder /dap/target/debug/examples/service .

ENTRYPOINT ["./service"]
34 changes: 34 additions & 0 deletions crates/daphne-server/examples/configuration-cpu-offload.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause

port = 5000

# None of these settings are relevant and can be deleted later when
# daphne-server stops being an aggregator
[storage_proxy]
url = "http://localhost:4001"
# SECRET: This is a test secret. In production, we'll generate and securely provision the token.
auth_token = 'this-is-the-storage-proxy-auth-token'

[service]
env = "oxy"
role = "helper"
max_batch_duration = 360000
min_batch_interval_start = 259200
max_batch_interval_end = 259200
supported_hpke_kems = ["x25519_hkdf_sha256"]
default_version = "v09"
report_storage_epoch_duration = 300000
base_url = "http://127.0.0.1:8788"
default_num_agg_span_shards = 4

[service.taskprov]
peer_auth.leader.expected_token = "I-am-the-leader" # SECRET
vdaf_verify_key_init = "b029a72fa327931a5cb643dcadcaafa098fcbfac07d990cb9e7c9a8675fafb18" # SECRET
hpke_collector_config = """{
"id": 23,
"kem_id": "p256_hkdf_sha256",
"kdf_id": "hkdf_sha256",
"aead_id": "aes128_gcm",
"public_key": "047dab625e0d269abcc28c611bebf5a60987ddf7e23df0e0aa343e5774ad81a1d0160d9252b82b4b5c52354205f5ec945645cb79facff8d85c9c31b490cdf35466"
}"""
8 changes: 4 additions & 4 deletions crates/daphne-server/src/roles/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@

use axum::async_trait;
use daphne::{
messages::{request::AggregationJobRequestHash, AggregationJobId, TaskId},
roles::DapHelper,
messages::{AggregationJobId, TaskId},
roles::{helper::AggregationJobRequestHash, DapHelper},
DapError, DapVersion,
};

#[async_trait]
impl DapHelper for crate::App {
async fn assert_agg_job_is_immutable(
async fn assert_agg_job_is_legal(
&self,
_id: AggregationJobId,
_version: DapVersion,
_task_id: &TaskId,
_req: &AggregationJobRequestHash,
_req_hash: &AggregationJobRequestHash,
) -> Result<(), DapError> {
// the server implementation can't check for this
Ok(())
Expand Down
84 changes: 84 additions & 0 deletions crates/daphne-server/src/router/compute_offload.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

use crate::App;
use axum::{async_trait, extract::FromRequest, response::IntoResponse, routing::post};
use daphne::{error::DapAbort, InitializedReport};
use daphne_service_utils::{
capnproto::{CapnprotoPayloadDecode, CapnprotoPayloadDecodeExt, CapnprotoPayloadEncodeExt},
compute_offload::{InitializeReports, InitializedReports},
};
use http::StatusCode;
use prio::codec::ParameterizedDecode;
use rayon::iter::{IntoParallelIterator as _, ParallelIterator};

pub(super) fn add_routes(router: super::Router<App>) -> super::Router<App> {
router.route(
"/compute_offload/initialize_reports",
post(initialize_reports),
)
}

struct CapnprotoExtractor<T>(T);

#[async_trait]
impl<S, T> FromRequest<S> for CapnprotoExtractor<T>
where
T: CapnprotoPayloadDecode,
{
type Rejection = StatusCode;

async fn from_request(
req: http::Request<axum::body::Body>,
_state: &S,
) -> Result<Self, Self::Rejection> {
let bytes = axum::body::to_bytes(req.into_body(), usize::MAX)
.await
.map_err(|_| StatusCode::BAD_REQUEST)?;
let t = T::decode_from_bytes(&bytes).map_err(|_| StatusCode::BAD_REQUEST)?;

Ok(CapnprotoExtractor(t))
}
}

#[tracing::instrument(skip_all, fields(%task_id, report_count = prep_inits.len()))]
async fn initialize_reports(
CapnprotoExtractor(InitializeReports {
hpke_keys,
valid_report_range,
task_id,
task_config,
agg_param,
prep_inits,
}): CapnprotoExtractor<InitializeReports<'static>>,
) -> impl IntoResponse {
tracing::info!("initializing reports");
let initialized_reports = prep_inits
.into_par_iter()
.map(|prep_init| {
InitializedReport::from_leader(
&hpke_keys.as_ref(),
valid_report_range.clone(),
&task_id,
&task_config,
prep_init.report_share,
prep_init.payload,
&daphne::DapAggregationParam::get_decoded_with_param(&task_config.vdaf, &agg_param)
.map_err(|e| DapAbort::from_codec_error(e, task_id))?,
)
})
.collect::<Result<Vec<_>, _>>();

match initialized_reports {
Ok(reports) => {
let body = InitializedReports {
vdaf: task_config.vdaf.into_owned(),
reports,
}
.encode_to_bytes();

(StatusCode::OK, body).into_response()
}
Err(error) => (StatusCode::BAD_REQUEST, axum::Json(error)).into_response(),
}
}
3 changes: 2 additions & 1 deletion crates/daphne-server/src/router/extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ use daphne::{
error::DapAbort,
fatal_error,
messages::{
request::{CollectionPollReq, HashedAggregationJobReq, RequestBody},
request::{CollectionPollReq, RequestBody},
taskprov::TaskprovAdvertisement,
AggregateShareReq, AggregationJobInitReq, CollectionReq, Report, TaskId,
},
roles::helper::HashedAggregationJobReq,
DapError, DapRequest, DapRequestMeta, DapVersion,
};
use daphne_service_utils::{bearer_token::BearerToken, http_headers};
Expand Down
7 changes: 5 additions & 2 deletions crates/daphne-server/src/router/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ use axum::{
routing::{post, put},
};
use daphne::{
messages::{request::HashedAggregationJobReq, AggregateShareReq},
roles::{helper, DapHelper},
messages::AggregateShareReq,
roles::{
helper::{self, HashedAggregationJobReq},
DapHelper,
},
};
use http::StatusCode;

Expand Down
5 changes: 4 additions & 1 deletion crates/daphne-server/src/router/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

mod aggregator;
mod compute_offload;
mod extractor;
mod helper;
mod leader;
Expand Down Expand Up @@ -99,6 +100,8 @@ pub fn new(role: DapAggregatorRole, aggregator: App) -> axum::Router<()> {
DapAggregatorRole::Helper => helper::add_helper_routes(router),
};

let router = compute_offload::add_routes(router);

#[cfg(feature = "test-utils")]
let router = test_routes::add_test_routes(router, role);

Expand Down
2 changes: 2 additions & 0 deletions crates/daphne-service-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ daphne = { path = "../daphne", default-features = false }
prio_draft09 = { workspace = true, optional = true }
prio = { workspace = true, optional = true }
serde.workspace = true
serde_json = { workspace = true, optional = true }
url = { workspace = true, optional = true }

[dev-dependencies]
Expand All @@ -28,6 +29,7 @@ capnpc = { workspace = true, optional = true }

[features]
test-utils = ["dep:url", "daphne/prometheus", "daphne/test-utils"]
compute-offload = ["dep:capnp", "dep:capnpc", "dep:serde_json", "dep:prio"]
durable_requests = [
"dep:capnp",
"dep:capnpc",
Expand Down
23 changes: 16 additions & 7 deletions crates/daphne-service-utils/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@
// SPDX-License-Identifier: BSD-3-Clause

fn main() {
#[cfg(feature = "durable_requests")]
::capnpc::CompilerCommand::new()
.file("./src/capnproto/base.capnp")
.file("./src/durable_requests/durable_request.capnp")
.file("./src/durable_requests/bindings/aggregation_job_store.capnp")
.run()
.expect("compiling schema");
#[cfg(any(feature = "durable_requests", feature = "compute-offload"))]
{
let mut compiler = ::capnpc::CompilerCommand::new();

compiler.file("./src/capnproto/base.capnp");

#[cfg(feature = "durable_requests")]
compiler
.file("./src/durable_requests/durable_request.capnp")
.file("./src/durable_requests/bindings/aggregation_job_store.capnp");

#[cfg(feature = "compute-offload")]
compiler.file("./src/compute_offload/compute_offload.capnp");

compiler.run().expect("compiling schema");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause

@0xd932f3d934afce3b;

# Utilities

using Base = import "../capnproto/base.capnp";

using VdafConfig = Text; # json encoded
using VdafVerifyKey = Base.U8L32;

struct TimeRange @0xf0d27aaa9b1959f7 {
start @0 :UInt64;
end @1 :UInt64;
}

# Top level message
struct InitializeReports @0x90aadb2f44c9fb78 {
hpkeKeys @0 :List(HpkeReceiverConfig);
validReportRange @1 :TimeRange;
taskId @2 :Base.TaskId;
taskConfig @3 :PartialDapTaskConfig;
aggParam @4 :Data; # encoded
prepInits @5 :List(PrepareInit);
}

struct HpkeReceiverConfig @0xeec9b4a50458edb7 {
struct HpkeConfig @0xa546066418a5cdc7 {
enum HpkeKemId @0xf4bbeaed8d1fd18a {
p256HkdfSha256 @0; x25519HkdfSha256 @1;
}
enum HpkeKdfId @0x9336afc63df27ba3 { hkdfSha256 @0; }
enum HpkeAeadId @0xd68d403e118c806c { aes128Gcm @0; }

id @0 :UInt8;
kemId @1 :HpkeKemId;
kdfId @2 :HpkeKdfId;
aeadId @3 :HpkeAeadId;
publicKey @4 :Data;
}

config @0 :HpkeConfig;
privateKey @1 :Data;
}

struct PartialDapTaskConfig @0xdcc9bf18fc62d406 {

version @0 :Base.DapVersion;
methodIsTaskprov @1 :Bool;
notAfter @2 :Base.Time;
vdaf @3 :VdafConfig;
vdafVerifyKey @4 :VdafVerifyKey;
}

struct ReportMetadata @0xefba178ad4584bc4 {

id @0 :Base.ReportId;
time @1 :Base.Time;
}

struct PrepareInit @0x8192568cb3d03f59 {

struct HpkeCiphertext @0xf0813319decf7eaf {
configId @0 :UInt8;
enc @1 :Data;
payload @2 :Data;
}

struct ReportShare @0xb4134aa2db41ef60 {
reportMetadata @0 :ReportMetadata;
publicShare @1 :Data;
encryptedInputShare @2 :HpkeCiphertext;
}

reportShare @0 :ReportShare;
payload @1 :Data;
}



struct InitializedReports {
struct InitializedReport {
using VdafPrepShare = Data;
using VdafPrepState = Data;

enum ReportError {
reserved @0;
batchCollected @1;
reportReplayed @2;
reportDropped @3;
hpkeUnknownConfigId @4;
hpkeDecryptError @5;
vdafPrepError @6;
batchSaturated @7;
taskExpired @8;
invalidMessage @9;
reportTooEarly @10;
taskNotStarted @11;
}


union {
ready :group {
metadata @0 :ReportMetadata;
publicShare @1 :Data;
prepShare @2 :VdafPrepShare;
prepState @3 :VdafPrepState;
peerPrepShare @4 :Data;
}
rejected :group {
metadata @5 :ReportMetadata;
failure @6 :ReportError;
}
}
}

vdafConfig @0 :VdafConfig;
reports @1 :List(InitializedReport);
}
Loading
Loading