Skip to content

Commit 0447fd6

Browse files
committed
Forbid changing aggregation job parameters
1 parent d3880e1 commit 0447fd6

File tree

19 files changed

+426
-28
lines changed

19 files changed

+426
-28
lines changed

.github/workflows/daphneci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
uses: actions-rs/toolchain@v1
2020
with:
2121
profile: minimal
22-
toolchain: stable
22+
toolchain: 1.83.0
2323
components: clippy, rustfmt
2424
override: true
2525
- name: Machete

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,23 @@
1-
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
1+
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
22
// SPDX-License-Identifier: BSD-3-Clause
33

44
use axum::async_trait;
5-
use daphne::roles::DapHelper;
5+
use daphne::{
6+
messages::{AggregationJobId, AggregationJobInitReq, TaskId},
7+
roles::DapHelper,
8+
DapError, DapVersion,
9+
};
610

711
#[async_trait]
8-
impl DapHelper for crate::App {}
12+
impl DapHelper for crate::App {
13+
async fn assert_agg_job_is_immutable(
14+
&self,
15+
_id: AggregationJobId,
16+
_version: DapVersion,
17+
_task_id: &TaskId,
18+
_req: &AggregationJobInitReq,
19+
) -> Result<(), DapError> {
20+
// the server implementation can't check for this
21+
Ok(())
22+
}
23+
}

crates/daphne-service-utils/Cargo.toml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
1+
# Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
22
# SPDX-License-Identifier: BSD-3-Clause
33

44
[package]
@@ -18,6 +18,7 @@ prio_draft09 = { workspace = true, optional = true }
1818
prio = { workspace = true, optional = true }
1919
serde.workspace = true
2020
url = { workspace = true, optional = true }
21+
ring = { workspace = true, optional = true }
2122

2223
[dev-dependencies]
2324
daphne = { path = "../daphne", default-features = false, features = ["prometheus"] }
@@ -28,7 +29,13 @@ capnpc = { workspace = true, optional = true }
2829

2930
[features]
3031
test-utils = ["dep:url", "daphne/prometheus", "daphne/test-utils"]
31-
durable_requests = ["dep:capnp", "dep:capnpc", "dep:prio_draft09", "dep:prio"]
32+
durable_requests = [
33+
"dep:capnp",
34+
"dep:capnpc",
35+
"dep:prio_draft09",
36+
"dep:prio",
37+
"dep:ring"
38+
]
3239
experimental = ["daphne/experimental"]
3340

3441
[lints]
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
1+
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
22
// SPDX-License-Identifier: BSD-3-Clause
33

44
fn main() {
55
#[cfg(feature = "durable_requests")]
66
::capnpc::CompilerCommand::new()
77
.file("./src/capnproto/base.capnp")
88
.file("./src/durable_requests/durable_request.capnp")
9+
.file("./src/durable_requests/bindings/aggregation_job_store.capnp")
910
.run()
1011
.expect("compiling schema");
1112
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
2+
# SPDX-License-Identifier: BSD-3-Clause
3+
4+
@0xa11edd1197dbcf0b;
5+
6+
using Base = import "../../capnproto/base.capnp";
7+
8+
struct NewJobRequest {
9+
id @0 :Base.AggregationJobId;
10+
aggJobHash @1 :Data;
11+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
2+
// SPDX-License-Identifier: BSD-3-Clause
3+
4+
use crate::{
5+
aggregation_job_store_capnp::new_job_request,
6+
capnproto::{CapnprotoPayloadDecode, CapnprotoPayloadEncode},
7+
durable_requests::ObjectIdFrom,
8+
};
9+
use daphne::{
10+
messages::{AggregationJobId, AggregationJobInitReq, PartialBatchSelector, TaskId},
11+
DapVersion,
12+
};
13+
use serde::{Deserialize, Serialize};
14+
use std::{ops::Deref, slice};
15+
16+
super::define_do_binding! {
17+
const BINDING = "AGGREGATION_JOB_STORE";
18+
19+
enum Command {
20+
NewJob = "/new-job",
21+
ListJobIds = "/job-ids",
22+
}
23+
24+
fn name((version, task_id): (DapVersion, &'n TaskId)) -> ObjectIdFrom {
25+
ObjectIdFrom::Name(format!("{version}/task/{task_id}"))
26+
}
27+
}
28+
29+
#[derive(Debug)]
30+
pub struct AggregationJobReqHash(Vec<u8>);
31+
32+
impl Deref for AggregationJobReqHash {
33+
type Target = [u8];
34+
fn deref(&self) -> &Self::Target {
35+
&self.0
36+
}
37+
}
38+
39+
impl From<&AggregationJobInitReq> for AggregationJobReqHash {
40+
fn from(req: &AggregationJobInitReq) -> Self {
41+
let AggregationJobInitReq {
42+
agg_param,
43+
part_batch_sel,
44+
prep_inits,
45+
} = req;
46+
47+
let mut context = ring::digest::Context::new(&ring::digest::SHA256);
48+
context.update(agg_param);
49+
context.update(match part_batch_sel {
50+
PartialBatchSelector::TimeInterval => &[0],
51+
PartialBatchSelector::LeaderSelectedByBatchId { batch_id } => batch_id.as_ref(),
52+
});
53+
for p in prep_inits {
54+
let daphne::messages::PrepareInit {
55+
report_share:
56+
daphne::messages::ReportShare {
57+
report_metadata: daphne::messages::ReportMetadata { id, time },
58+
public_share,
59+
encrypted_input_share:
60+
daphne::messages::HpkeCiphertext {
61+
config_id,
62+
enc,
63+
payload: cypher_text_payload,
64+
},
65+
},
66+
payload,
67+
} = p;
68+
69+
context.update(payload);
70+
context.update(public_share);
71+
context.update(id.as_ref());
72+
context.update(&time.to_be_bytes());
73+
context.update(cypher_text_payload);
74+
context.update(slice::from_ref(config_id));
75+
context.update(enc);
76+
}
77+
Self(context.finish().as_ref().to_vec())
78+
}
79+
}
80+
81+
#[derive(Debug)]
82+
pub struct NewJobRequest {
83+
pub id: AggregationJobId,
84+
pub agg_job_hash: AggregationJobReqHash,
85+
}
86+
87+
impl CapnprotoPayloadEncode for NewJobRequest {
88+
type Builder<'a> = new_job_request::Builder<'a>;
89+
90+
fn encode_to_builder(&self, mut builder: Self::Builder<'_>) {
91+
self.id.encode_to_builder(builder.reborrow().init_id());
92+
builder.set_agg_job_hash(&self.agg_job_hash.0);
93+
}
94+
}
95+
96+
impl CapnprotoPayloadDecode for NewJobRequest {
97+
type Reader<'a> = new_job_request::Reader<'a>;
98+
99+
fn decode_from_reader(reader: Self::Reader<'_>) -> capnp::Result<Self>
100+
where
101+
Self: Sized,
102+
{
103+
Ok(Self {
104+
id: <_>::decode_from_reader(reader.get_id()?)?,
105+
agg_job_hash: AggregationJobReqHash(reader.get_agg_job_hash()?.to_vec()),
106+
})
107+
}
108+
}
109+
110+
#[derive(Debug, Serialize, Deserialize)]
111+
pub enum NewJobResponse {
112+
Ok,
113+
/// Request would change an existing aggregation job's parameters.
114+
IllegalJobParameters,
115+
}

crates/daphne-service-utils/src/durable_requests/bindings/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
1+
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
22
// SPDX-License-Identifier: BSD-3-Clause
33

44
//! This module defines the durable objects' binding and methods as implementors of the
@@ -7,6 +7,7 @@
77
//! It also defines types that are used as the body of requests sent to these objects.
88
99
mod aggregate_store;
10+
pub mod aggregation_job_store;
1011
#[cfg(feature = "test-utils")]
1112
mod test_state_cleaner;
1213

crates/daphne-service-utils/src/lib.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
1+
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
22
// SPDX-License-Identifier: BSD-3-Clause
33

44
#![cfg_attr(not(test), deny(unused_crate_dependencies))]
@@ -32,3 +32,14 @@ mod durable_request_capnp {
3232
"/src/durable_requests/durable_request_capnp.rs"
3333
));
3434
}
35+
36+
#[cfg(feature = "durable_requests")]
37+
mod aggregation_job_store_capnp {
38+
#![allow(dead_code)]
39+
#![allow(clippy::pedantic)]
40+
#![allow(clippy::needless_lifetimes)]
41+
include!(concat!(
42+
env!("OUT_DIR"),
43+
"/src/durable_requests/bindings/aggregation_job_store_capnp.rs"
44+
));
45+
}

crates/daphne-worker-test/src/durable.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
1+
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
22
// SPDX-License-Identifier: BSD-3-Clause
33

44
use daphne_worker::durable::{self, instantiate_durable_object};
@@ -10,3 +10,11 @@ instantiate_durable_object! {
1010
daphne_worker::tracing_utils::initialize_tracing(env);
1111
}
1212
}
13+
14+
instantiate_durable_object! {
15+
struct AggregationJobStore < durable::AggregationJobStore;
16+
17+
fn init_user_data(_state: State, env: Env) {
18+
daphne_worker::tracing_utils::initialize_tracing(env);
19+
}
20+
}

0 commit comments

Comments
 (0)