Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/daphneci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
toolchain: 1.83.0
components: clippy, rustfmt
override: true
- name: Machete
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 18 additions & 3 deletions crates/daphne-server/src/roles/helper.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

use axum::async_trait;
use daphne::roles::DapHelper;
use daphne::{
messages::{AggregationJobId, AggregationJobInitReq, TaskId},
roles::DapHelper,
DapError, DapVersion,
};

#[async_trait]
impl DapHelper for crate::App {}
impl DapHelper for crate::App {
async fn assert_agg_job_is_immutable(
&self,
_id: AggregationJobId,
_version: DapVersion,
_task_id: &TaskId,
_req: &AggregationJobInitReq,
) -> Result<(), DapError> {
// the server implementation can't check for this
Ok(())
}
}
11 changes: 9 additions & 2 deletions crates/daphne-service-utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
# Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause

[package]
Expand All @@ -18,6 +18,7 @@ prio_draft09 = { workspace = true, optional = true }
prio = { workspace = true, optional = true }
serde.workspace = true
url = { workspace = true, optional = true }
ring = { workspace = true, optional = true }

[dev-dependencies]
daphne = { path = "../daphne", default-features = false, features = ["prometheus"] }
Expand All @@ -28,7 +29,13 @@ capnpc = { workspace = true, optional = true }

[features]
test-utils = ["dep:url", "daphne/prometheus", "daphne/test-utils"]
durable_requests = ["dep:capnp", "dep:capnpc", "dep:prio_draft09", "dep:prio"]
durable_requests = [
"dep:capnp",
"dep:capnpc",
"dep:prio_draft09",
"dep:prio",
"dep:ring"
]
experimental = ["daphne/experimental"]

[lints]
Expand Down
3 changes: 2 additions & 1 deletion crates/daphne-service-utils/build.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

fn main() {
#[cfg(feature = "durable_requests")]
::capnpc::CompilerCommand::new()
.file("./src/capnproto/base.capnp")
.file("./src/durable_requests/durable_request.capnp")
.file("./src/durable_requests/bindings/aggregation_job_store.capnp")
.run()
.expect("compiling schema");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause

@0xa11edd1197dbcf0b;

using Base = import "../../capnproto/base.capnp";

struct NewJobRequest {
id @0 :Base.AggregationJobId;
aggJobHash @1 :Data;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

use crate::{
aggregation_job_store_capnp::new_job_request,
capnproto::{CapnprotoPayloadDecode, CapnprotoPayloadEncode},
durable_requests::ObjectIdFrom,
};
use daphne::{
messages::{AggregationJobId, AggregationJobInitReq, PartialBatchSelector, TaskId},
DapVersion,
};
use serde::{Deserialize, Serialize};
use std::{ops::Deref, slice};

super::define_do_binding! {
const BINDING = "AGGREGATION_JOB_STORE";

enum Command {
NewJob = "/new-job",
ListJobIds = "/job-ids",
}

fn name((version, task_id): (DapVersion, &'n TaskId)) -> ObjectIdFrom {
ObjectIdFrom::Name(format!("{version}/task/{task_id}"))
}
}

#[derive(Debug)]
pub struct AggregationJobReqHash(Vec<u8>);

impl Deref for AggregationJobReqHash {
type Target = [u8];
fn deref(&self) -> &Self::Target {
&self.0
}
}

impl From<&AggregationJobInitReq> for AggregationJobReqHash {
fn from(req: &AggregationJobInitReq) -> Self {
let AggregationJobInitReq {
agg_param,
part_batch_sel,
prep_inits,
} = req;

let mut context = ring::digest::Context::new(&ring::digest::SHA256);
context.update(agg_param);
context.update(match part_batch_sel {
PartialBatchSelector::TimeInterval => &[0],
PartialBatchSelector::LeaderSelectedByBatchId { batch_id } => batch_id.as_ref(),
});
for p in prep_inits {
let daphne::messages::PrepareInit {
report_share:
daphne::messages::ReportShare {
report_metadata: daphne::messages::ReportMetadata { id, time },
public_share,
encrypted_input_share:
daphne::messages::HpkeCiphertext {
config_id,
enc,
payload: cypher_text_payload,
},
},
payload,
} = p;

context.update(payload);
context.update(public_share);
context.update(id.as_ref());
context.update(&time.to_be_bytes());
context.update(cypher_text_payload);
context.update(slice::from_ref(config_id));
context.update(enc);
}
Self(context.finish().as_ref().to_vec())
}
}

#[derive(Debug)]
pub struct NewJobRequest {
pub id: AggregationJobId,
pub agg_job_hash: AggregationJobReqHash,
}

impl CapnprotoPayloadEncode for NewJobRequest {
type Builder<'a> = new_job_request::Builder<'a>;

fn encode_to_builder(&self, mut builder: Self::Builder<'_>) {
self.id.encode_to_builder(builder.reborrow().init_id());
builder.set_agg_job_hash(&self.agg_job_hash.0);
}
}

impl CapnprotoPayloadDecode for NewJobRequest {
type Reader<'a> = new_job_request::Reader<'a>;

fn decode_from_reader(reader: Self::Reader<'_>) -> capnp::Result<Self>
where
Self: Sized,
{
Ok(Self {
id: <_>::decode_from_reader(reader.get_id()?)?,
agg_job_hash: AggregationJobReqHash(reader.get_agg_job_hash()?.to_vec()),
})
}
}

#[derive(Debug, Serialize, Deserialize)]
pub enum NewJobResponse {
Ok,
/// Request would change an existing aggregation job's parameters.
IllegalJobParameters,
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

//! This module defines the durable objects' binding and methods as implementors of the
Expand All @@ -7,6 +7,7 @@
//! It also defines types that are used as the body of requests sent to these objects.

mod aggregate_store;
pub mod aggregation_job_store;
#[cfg(feature = "test-utils")]
mod test_state_cleaner;

Expand Down
13 changes: 12 additions & 1 deletion crates/daphne-service-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

#![cfg_attr(not(test), deny(unused_crate_dependencies))]
Expand Down Expand Up @@ -32,3 +32,14 @@ mod durable_request_capnp {
"/src/durable_requests/durable_request_capnp.rs"
));
}

#[cfg(feature = "durable_requests")]
mod aggregation_job_store_capnp {
#![allow(dead_code)]
#![allow(clippy::pedantic)]
#![allow(clippy::needless_lifetimes)]
include!(concat!(
env!("OUT_DIR"),
"/src/durable_requests/bindings/aggregation_job_store_capnp.rs"
));
}
10 changes: 9 additions & 1 deletion crates/daphne-worker-test/src/durable.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

use daphne_worker::durable::{self, instantiate_durable_object};
Expand All @@ -10,3 +10,11 @@ instantiate_durable_object! {
daphne_worker::tracing_utils::initialize_tracing(env);
}
}

instantiate_durable_object! {
struct AggregationJobStore < durable::AggregationJobStore;

fn init_user_data(_state: State, env: Env) {
daphne_worker::tracing_utils::initialize_tracing(env);
}
}
3 changes: 3 additions & 0 deletions crates/daphne-worker-test/wrangler.aggregator.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ ip = "0.0.0.0"
bindings = [
{ name = "DAP_AGGREGATE_STORE", class_name = "AggregateStore" },
{ name = "DAP_TEST_STATE_CLEANER", class_name = "TestStateCleaner" },
{ name = "AGGREGATION_JOB_STORE", class_name = "AggregationJobStore" },
]


Expand Down Expand Up @@ -128,6 +129,7 @@ public_key = "047dab625e0d269abcc28c611bebf5a60987ddf7e23df0e0aa343e5774ad81a1d0
bindings = [
{ name = "DAP_AGGREGATE_STORE", class_name = "AggregateStore" },
{ name = "DAP_TEST_STATE_CLEANER", class_name = "TestStateCleaner" },
{ name = "AGGREGATION_JOB_STORE", class_name = "AggregationJobStore" },
]

[[env.leader.kv_namespaces]]
Expand All @@ -154,4 +156,5 @@ tag = "v1"
new_classes = [
"AggregateStore",
"GarbageCollector",
"AggregationJobStore",
]
40 changes: 38 additions & 2 deletions crates/daphne-worker/src/aggregator/roles/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,42 @@
// SPDX-License-Identifier: BSD-3-Clause

use crate::aggregator::App;
use daphne::roles::DapHelper;
use daphne::{
error::DapAbort,
fatal_error,
messages::{AggregationJobId, AggregationJobInitReq, TaskId},
roles::DapHelper,
DapError, DapVersion,
};
use daphne_service_utils::durable_requests::bindings::aggregation_job_store;

impl DapHelper for App {}
#[axum::async_trait]
impl DapHelper for App {
async fn assert_agg_job_is_immutable(
&self,
id: AggregationJobId,
version: DapVersion,
task_id: &TaskId,
req: &AggregationJobInitReq,
) -> Result<(), DapError> {
let response = self
.durable()
.with_retry()
.request(aggregation_job_store::Command::NewJob, (version, task_id))
.encode(&aggregation_job_store::NewJobRequest {
id,
agg_job_hash: req.into(),
})
.send::<aggregation_job_store::NewJobResponse>()
.await
.map_err(|e| fatal_error!(err = ?e, "failed to store aggregation job hash"))?;

match response {
aggregation_job_store::NewJobResponse::Ok => Ok(()),
aggregation_job_store::NewJobResponse::IllegalJobParameters => Err(
DapAbort::BadRequest("aggregation job replay changes parameters".to_string())
.into(),
),
}
}
}
Loading
Loading