Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/daphneci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
toolchain: 1.83.0
components: clippy, rustfmt
override: true
- name: Machete
Expand Down
21 changes: 18 additions & 3 deletions crates/daphne-server/src/roles/helper.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

use axum::async_trait;
use daphne::roles::DapHelper;
use daphne::{
messages::{request::AggregationJobRequestHash, AggregationJobId, TaskId},
roles::DapHelper,
DapError, DapVersion,
};

#[async_trait]
impl DapHelper for crate::App {}
impl DapHelper for crate::App {
async fn assert_agg_job_is_immutable(
&self,
_id: AggregationJobId,
_version: DapVersion,
_task_id: &TaskId,
_req: &AggregationJobRequestHash,
) -> Result<(), DapError> {
// the server implementation can't check for this
Ok(())
}
}
13 changes: 12 additions & 1 deletion crates/daphne-server/src/router/extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use daphne::{
error::DapAbort,
fatal_error,
messages::{
request::{CollectionPollReq, RequestBody},
request::{CollectionPollReq, HashedAggregationJobReq, RequestBody},
taskprov::TaskprovAdvertisement,
AggregateShareReq, AggregationJobInitReq, CollectionReq, Report, TaskId,
},
Expand Down Expand Up @@ -60,6 +60,17 @@ impl_decode_from_dap_http_body!(
CollectionReq,
);

impl DecodeFromDapHttpBody for HashedAggregationJobReq {
fn decode_from_http_body(bytes: Bytes, meta: &DapRequestMeta) -> Result<Self, DapAbort> {
let mut cursor = Cursor::new(bytes.as_ref());
// Check that media type matches.
meta.get_checked_media_type(DapMediaType::AggregationJobInitReq)?;
// Decode the body
HashedAggregationJobReq::decode_with_param(&meta.version, &mut cursor)
.map_err(|e| DapAbort::from_codec_error(e, meta.task_id))
}
}
Comment on lines +63 to +72
Copy link
Contributor

@cjpatton cjpatton Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hashing with HashedAggregationJobReq::decode_with_param() is a little awkward because you have to keep track of the start and end of the message in the cursor. Also, and this is very nit picky: HashedAggregationJobReq is not a DAP protocol message, so arguably doesn't belong in the messages module.

I think I would have done this differently:

impl DecodeFromDapHttpBody for HashedAggregationJobReq {
    fn decode_from_http_body(bytes: Bytes, meta: &DapRequestMeta) -> Result<Self, DapAbort> {
        // Check that media type matches.
        meta.get_checked_media_type(DapMediaType::AggregationJobInitReq)?;
        // Decode the body
        let agg_job_req = AggregationJobReq::get_decoded_with_param(&meta.version, bytes.as_ref())
            .map_err(|e| DapAbort::from_codec_error(e, meta.task_id))?;
        Ok(Self{
             agg_job_req:
             hash: sha256(bytes.as_ref()),
        })
    }
}

Note that this also enforces that there is nothing left to read in bytes.


/// Using `()` ignores the body of a request.
impl DecodeFromDapHttpBody for CollectionPollReq {
fn decode_from_http_body(_bytes: Bytes, _meta: &DapRequestMeta) -> Result<Self, DapAbort> {
Expand Down
4 changes: 2 additions & 2 deletions crates/daphne-server/src/router/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use axum::{
routing::{post, put},
};
use daphne::{
messages::{AggregateShareReq, AggregationJobInitReq},
messages::{request::HashedAggregationJobReq, AggregateShareReq},
roles::{helper, DapHelper},
};
use http::StatusCode;
Expand Down Expand Up @@ -38,7 +38,7 @@ pub(super) fn add_helper_routes(router: super::Router<App>) -> super::Router<App
)]
async fn agg_job(
State(app): State<Arc<App>>,
DapRequestExtractor(req): DapRequestExtractor<FROM_LEADER, AggregationJobInitReq>,
DapRequestExtractor(req): DapRequestExtractor<FROM_LEADER, HashedAggregationJobReq>,
) -> AxumDapResponse {
let timer = std::time::Instant::now();

Expand Down
9 changes: 7 additions & 2 deletions crates/daphne-service-utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
# Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause

[package]
Expand Down Expand Up @@ -28,7 +28,12 @@ capnpc = { workspace = true, optional = true }

[features]
test-utils = ["dep:url", "daphne/prometheus", "daphne/test-utils"]
durable_requests = ["dep:capnp", "dep:capnpc", "dep:prio_draft09", "dep:prio"]
durable_requests = [
"dep:capnp",
"dep:capnpc",
"dep:prio_draft09",
"dep:prio",
]
experimental = ["daphne/experimental"]

[lints]
Expand Down
3 changes: 2 additions & 1 deletion crates/daphne-service-utils/build.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

fn main() {
#[cfg(feature = "durable_requests")]
::capnpc::CompilerCommand::new()
.file("./src/capnproto/base.capnp")
.file("./src/durable_requests/durable_request.capnp")
.file("./src/durable_requests/bindings/aggregation_job_store.capnp")
.run()
.expect("compiling schema");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause

@0xa11edd1197dbcf0b;

using Base = import "../../capnproto/base.capnp";

struct NewJobRequest {
id @0 :Base.AggregationJobId;
aggJobHash @1 :Data;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

use crate::{
aggregation_job_store_capnp::new_job_request,
capnproto::{CapnprotoPayloadDecode, CapnprotoPayloadEncode},
durable_requests::ObjectIdFrom,
};
use daphne::{
messages::{AggregationJobId, TaskId},
DapVersion,
};
use serde::{Deserialize, Serialize};
use std::borrow::Cow;

super::define_do_binding! {
const BINDING = "AGGREGATION_JOB_STORE";

enum Command {
NewJob = "/new-job",
ListJobIds = "/job-ids",
}

fn name((version, task_id): (DapVersion, &'n TaskId)) -> ObjectIdFrom {
ObjectIdFrom::Name(format!("{version}/task/{task_id}"))
}
}

#[derive(Debug)]
pub struct NewJobRequest<'h> {
pub id: AggregationJobId,
pub agg_job_hash: Cow<'h, [u8]>,
}

impl CapnprotoPayloadEncode for NewJobRequest<'_> {
type Builder<'a> = new_job_request::Builder<'a>;

fn encode_to_builder(&self, mut builder: Self::Builder<'_>) {
self.id.encode_to_builder(builder.reborrow().init_id());
builder.set_agg_job_hash(&self.agg_job_hash);
}
}

impl CapnprotoPayloadDecode for NewJobRequest<'static> {
type Reader<'a> = new_job_request::Reader<'a>;

fn decode_from_reader(reader: Self::Reader<'_>) -> capnp::Result<Self>
where
Self: Sized,
{
Ok(Self {
id: <_>::decode_from_reader(reader.get_id()?)?,
agg_job_hash: reader.get_agg_job_hash()?.to_vec().into(),
})
}
}

#[derive(Debug, Serialize, Deserialize)]
pub enum NewJobResponse {
Ok,
/// Request would change an existing aggregation job's parameters.
IllegalJobParameters,
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

//! This module defines the durable objects' binding and methods as implementors of the
Expand All @@ -7,6 +7,7 @@
//! It also defines types that are used as the body of requests sent to these objects.

mod aggregate_store;
pub mod aggregation_job_store;
#[cfg(feature = "test-utils")]
mod test_state_cleaner;

Expand Down
13 changes: 12 additions & 1 deletion crates/daphne-service-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

#![cfg_attr(not(test), deny(unused_crate_dependencies))]
Expand Down Expand Up @@ -32,3 +32,14 @@ mod durable_request_capnp {
"/src/durable_requests/durable_request_capnp.rs"
));
}

#[cfg(feature = "durable_requests")]
mod aggregation_job_store_capnp {
#![allow(dead_code)]
#![allow(clippy::pedantic)]
#![allow(clippy::needless_lifetimes)]
include!(concat!(
env!("OUT_DIR"),
"/src/durable_requests/bindings/aggregation_job_store_capnp.rs"
));
}
10 changes: 9 additions & 1 deletion crates/daphne-worker-test/src/durable.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

use daphne_worker::durable::{self, instantiate_durable_object};
Expand All @@ -10,3 +10,11 @@ instantiate_durable_object! {
daphne_worker::tracing_utils::initialize_tracing(env);
}
}

instantiate_durable_object! {
struct AggregationJobStore < durable::AggregationJobStore;

fn init_user_data(_state: State, env: Env) {
daphne_worker::tracing_utils::initialize_tracing(env);
}
}
3 changes: 3 additions & 0 deletions crates/daphne-worker-test/wrangler.aggregator.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ ip = "0.0.0.0"
bindings = [
{ name = "DAP_AGGREGATE_STORE", class_name = "AggregateStore" },
{ name = "DAP_TEST_STATE_CLEANER", class_name = "TestStateCleaner" },
{ name = "AGGREGATION_JOB_STORE", class_name = "AggregationJobStore" },
]


Expand Down Expand Up @@ -128,6 +129,7 @@ public_key = "047dab625e0d269abcc28c611bebf5a60987ddf7e23df0e0aa343e5774ad81a1d0
bindings = [
{ name = "DAP_AGGREGATE_STORE", class_name = "AggregateStore" },
{ name = "DAP_TEST_STATE_CLEANER", class_name = "TestStateCleaner" },
{ name = "AGGREGATION_JOB_STORE", class_name = "AggregationJobStore" },
]

[[env.leader.kv_namespaces]]
Expand All @@ -154,4 +156,5 @@ tag = "v1"
new_classes = [
"AggregateStore",
"GarbageCollector",
"AggregationJobStore",
]
41 changes: 39 additions & 2 deletions crates/daphne-worker/src/aggregator/roles/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,43 @@
// SPDX-License-Identifier: BSD-3-Clause

use crate::aggregator::App;
use daphne::roles::DapHelper;
use daphne::{
error::DapAbort,
fatal_error,
messages::{request::AggregationJobRequestHash, AggregationJobId, TaskId},
roles::DapHelper,
DapError, DapVersion,
};
use daphne_service_utils::durable_requests::bindings::aggregation_job_store;
use std::borrow::Cow;

impl DapHelper for App {}
#[axum::async_trait]
impl DapHelper for App {
async fn assert_agg_job_is_immutable(
&self,
id: AggregationJobId,
version: DapVersion,
task_id: &TaskId,
req: &AggregationJobRequestHash,
) -> Result<(), DapError> {
let response = self
.durable()
.with_retry()
.request(aggregation_job_store::Command::NewJob, (version, task_id))
.encode(&aggregation_job_store::NewJobRequest {
id,
agg_job_hash: Cow::Borrowed(req.get()),
})
.send::<aggregation_job_store::NewJobResponse>()
.await
.map_err(|e| fatal_error!(err = ?e, "failed to store aggregation job hash"))?;

match response {
aggregation_job_store::NewJobResponse::Ok => Ok(()),
aggregation_job_store::NewJobResponse::IllegalJobParameters => Err(
DapAbort::BadRequest("aggregation job replay changes parameters".to_string())
.into(),
),
}
}
}
17 changes: 12 additions & 5 deletions crates/daphne-worker/src/aggregator/router/extractor.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

use std::io::Cursor;

use axum::{
async_trait,
body::Bytes,
Expand All @@ -13,7 +11,7 @@ use daphne::{
error::DapAbort,
fatal_error,
messages::{
request::{CollectionPollReq, RequestBody},
request::{CollectionPollReq, HashedAggregationJobReq, RequestBody},
taskprov::TaskprovAdvertisement,
AggregateShareReq, AggregationJobInitReq, CollectionReq, Report, TaskId,
},
Expand Down Expand Up @@ -41,11 +39,10 @@ macro_rules! impl_decode_from_dap_http_body {
bytes: Bytes,
meta: &DapRequestMeta,
) -> Result<Self, DapAbort> {
let mut cursor = Cursor::new(bytes.as_ref());
// Check that media type matches.
meta.get_checked_media_type(DapMediaType::$type)?;
// Decode the body
$type::decode_with_param(&meta.version, &mut cursor)
$type::get_decoded_with_param(&meta.version, bytes.as_ref())
.map_err(|e| DapAbort::from_codec_error(e, meta.task_id))
}
}
Expand All @@ -60,6 +57,16 @@ impl_decode_from_dap_http_body!(
CollectionReq,
);

impl DecodeFromDapHttpBody for HashedAggregationJobReq {
fn decode_from_http_body(bytes: Bytes, meta: &DapRequestMeta) -> Result<Self, DapAbort> {
// Check that media type matches.
meta.get_checked_media_type(DapMediaType::AggregationJobInitReq)?;
// Decode the body
HashedAggregationJobReq::get_decoded_with_param(&meta.version, bytes.as_ref())
.map_err(|e| DapAbort::from_codec_error(e, meta.task_id))
}
}

/// Using `()` ignores the body of a request.
impl DecodeFromDapHttpBody for CollectionPollReq {
fn decode_from_http_body(_bytes: Bytes, _meta: &DapRequestMeta) -> Result<Self, DapAbort> {
Expand Down
4 changes: 2 additions & 2 deletions crates/daphne-worker/src/aggregator/router/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use axum::{
routing::{post, put},
};
use daphne::{
messages::{AggregateShareReq, AggregationJobInitReq},
messages::{request::HashedAggregationJobReq, AggregateShareReq},
roles::{helper, DapHelper},
};
use http::StatusCode;
Expand Down Expand Up @@ -39,7 +39,7 @@ pub(super) fn add_helper_routes(router: super::Router<App>) -> super::Router<App
#[worker::send]
async fn agg_job(
State(app): State<Arc<App>>,
DapRequestExtractor(req): DapRequestExtractor<FROM_LEADER, AggregationJobInitReq>,
DapRequestExtractor(req): DapRequestExtractor<FROM_LEADER, HashedAggregationJobReq>,
) -> AxumDapResponse {
let now = worker::Date::now();

Expand Down
Loading
Loading