Skip to content

Commit 93cf70c

Browse files
committed
Implement an aggregator in the worker
1 parent f839d01 commit 93cf70c

File tree

27 files changed

+4653
-501
lines changed

27 files changed

+4653
-501
lines changed

Cargo.lock

Lines changed: 342 additions & 408 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ hpke-rs = "0.2.0"
5656
hpke-rs-crypto = "0.2.0"
5757
hpke-rs-rust-crypto = "0.2.0"
5858
http = "1"
59+
http-body-util = "0.1.2"
5960
mappable-rc = "0.1.1"
6061
matchit = "0.7.3"
6162
p256 = { version = "0.13.2", features = ["ecdsa-core", "ecdsa", "pem"] }
@@ -76,6 +77,7 @@ serde = { version = "1.0.203", features = ["derive"] }
7677
serde-wasm-bindgen = "0.6.5"
7778
serde_json = "1.0.118"
7879
serde_yaml = "0.9.33"
80+
static_assertions = "1"
7981
strum = { version = "0.26.3", features = ["derive"] }
8082
subtle = "2.6.1"
8183
thiserror = "1.0.61"
@@ -87,7 +89,7 @@ tracing-core = "0.1.32"
8789
tracing-subscriber = "0.3.18"
8890
url = { version = "2.5.4", features = ["serde"] }
8991
webpki = "0.22.4"
90-
worker = { version = "0.4", features = ["http"] }
92+
worker = { version = "0.5", features = ["http"] }
9193
x509-parser = "0.15.1"
9294

9395
[workspace.dependencies.sentry]

Makefile

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@ helper:
2121
-c ./crates/daphne-server/examples/configuration-helper.toml
2222
h: helper
2323

24+
helper-worker:
25+
cd ./crates/daphne-worker-test/ && \
26+
wrangler dev -c wrangler.aggregator.toml --port 8788 -e helper
27+
hw: helper-worker
28+
29+
leader-worker:
30+
cd ./crates/daphne-worker-test/ && \
31+
wrangler dev -c wrangler.aggregator.toml --port 8788 -e leader
32+
lw: leader-worker
33+
2434
storage-proxy:
2535
docker compose -f ./crates/daphne-worker-test/docker-compose-storage-proxy.yaml up --build
2636
s: storage-proxy

crates/daphne-worker-test/src/lib.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright (c) 2022 Cloudflare, Inc. All rights reserved.
22
// SPDX-License-Identifier: BSD-3-Clause
33

4-
use daphne_worker::initialize_tracing;
4+
use daphne_worker::{aggregator::App, initialize_tracing};
55
use tracing::info;
66
use worker::{event, Env, HttpRequest};
77

@@ -26,5 +26,29 @@ pub async fn main(
2626

2727
info!(method = ?req.method(), "{}", req.uri().path());
2828

29-
Ok(daphne_worker::storage_proxy::handle_request(req, env, &prometheus::Registry::new()).await)
29+
let registry = prometheus::Registry::new();
30+
let response = match env
31+
.var("DAP_WORKER_MODE")
32+
.map(|t| t.to_string())
33+
.ok()
34+
.as_deref()
35+
{
36+
Some("storage-proxy") | None => {
37+
daphne_worker::storage_proxy::handle_request(req, env, &registry).await
38+
}
39+
Some("aggregator") => {
40+
daphne_worker::aggregator::handle_dap_request(
41+
App::new(env, &registry, None).unwrap(),
42+
req,
43+
)
44+
.await
45+
}
46+
Some(invalid) => {
47+
return Err(worker::Error::RustError(format!(
48+
"{invalid} is not a valid DAP_WORKER_MODE"
49+
)))
50+
}
51+
};
52+
53+
Ok(response)
3054
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
# Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
2+
# SPDX-License-Identifier: BSD-3-Clause
3+
4+
main = "build/worker/shim.mjs"
5+
compatibility_date = "2023-12-21"
6+
7+
# Don't ask to send metrics to Cloudflare. The worker may be run from a container.
8+
send_metrics = false
9+
10+
# Before starting the worker, run `worker-build`.
11+
[build]
12+
command = "worker-build --dev"
13+
14+
[[rules]]
15+
globs = ["**/*.wasm"]
16+
type = "CompiledWasm"
17+
fallthrough = false
18+
19+
# NOTE: Variables marked as SECRET need to be provisioned securely in
20+
# production. In particular, they will not be passed as environment variables
21+
# as they are here. See
22+
# https://developers.cloudflare.com/workers/wrangler/commands/#secret.
23+
24+
[env.helper]
25+
name = "daphne-helper-aggregator"
26+
27+
[env.helper.vars]
28+
DAP_DEPLOYMENT = "dev"
29+
DAP_WORKER_MODE = "aggregator"
30+
DAP_DURABLE_HELPER_STATE_STORE_GC_AFTER_SECS = "30"
31+
DAP_DURABLE_AGGREGATE_STORE_GC_AFTER_SECS = "30"
32+
33+
# SECRET
34+
TASKPROV_SECRETS_ENABLED = "true"
35+
TASKPROV_SECRETS_VDAF_VERIFY_KEY_INIT = "b029a72fa327931a5cb643dcadcaafa098fcbfac07d990cb9e7c9a8675fafb18"
36+
TASKPROV_SECRETS_PEER_AUTH_EXPECT_LEADER_TOKEN = "I-am-the-leader"
37+
38+
[env.helper.vars.SERVICE_CONFIG]
39+
env = "oxy"
40+
role = "helper"
41+
max_batch_duration = 360000
42+
min_batch_interval_start = 259200
43+
max_batch_interval_end = 259200
44+
supported_hpke_kems = ["x25519_hkdf_sha256"]
45+
default_version = "v09"
46+
report_storage_epoch_duration = 300000
47+
base_url = "http://127.0.0.1:8788"
48+
default_num_agg_span_shards = 4
49+
50+
[env.helper.vars.TASKPROV_HPKE_COLLECTOR_CONFIG]
51+
id = 23
52+
kem_id = "p256_hkdf_sha256"
53+
kdf_id = "hkdf_sha256"
54+
aead_id = "aes128_gcm"
55+
public_key = "047dab625e0d269abcc28c611bebf5a60987ddf7e23df0e0aa343e5774ad81a1d0160d9252b82b4b5c52354205f5ec945645cb79facff8d85c9c31b490cdf35466"
56+
57+
[dev]
58+
ip = "0.0.0.0"
59+
60+
[env.helper.durable_objects]
61+
bindings = [
62+
{ name = "DAP_AGGREGATE_STORE", class_name = "AggregateStore" },
63+
{ name = "DAP_TEST_STATE_CLEANER", class_name = "TestStateCleaner" },
64+
]
65+
66+
67+
[[env.helper.kv_namespaces]]
68+
binding = "DAP_CONFIG"
69+
# KV bindings are in a looked up in a namespace identified by a 16-byte id number.
70+
# This number is assigned by calling
71+
#
72+
# wrangler kv:namespace create <NAME>
73+
#
74+
# for some unique name you specify, and it returns a unique id number to use.
75+
# Here we should use something like "leader" for the <NAME>.
76+
id = "<assign-one-for-the-leader>"
77+
# A "preview id" is an id used when running in "wrangler dev" mode locally, and
78+
# can just be made up. We generated the number below with the following python
79+
# code:
80+
#
81+
# import secrets
82+
# print(secrets.token_hex(16))
83+
#
84+
preview_id = "24c4dc92d5cf4680e508fe18eb8f0281"
85+
86+
[env.leader]
87+
name = "daphne-leader-aggregator"
88+
89+
[env.leader.vars]
90+
DAP_DEPLOYMENT = "dev"
91+
DAP_WORKER_MODE = "aggregator"
92+
DAP_DURABLE_HELPER_STATE_STORE_GC_AFTER_SECS = "30"
93+
DAP_DURABLE_AGGREGATE_STORE_GC_AFTER_SECS = "30"
94+
95+
# SECRET
96+
TASKPROV_SECRETS_ENABLED = "true"
97+
TASKPROV_SECRETS_VDAF_VERIFY_KEY_INIT = "b029a72fa327931a5cb643dcadcaafa098fcbfac07d990cb9e7c9a8675fafb18"
98+
TASKPROV_SECRETS_PEER_AUTH_EXPECT_COLLECTOR_TOKEN = "I-am-the-collector"
99+
TASKPROV_SECRETS_SELF_BEARER_TOKEN = "I-am-the-leader"
100+
101+
[env.leader.vars.SERVICE_CONFIG]
102+
env = "oxy"
103+
role = "leader"
104+
max_batch_duration = 360000
105+
min_batch_interval_start = 259200
106+
max_batch_interval_end = 259200
107+
supported_hpke_kems = ["x25519_hkdf_sha256"]
108+
default_version = "v09"
109+
report_storage_epoch_duration = 300000
110+
base_url = "http://127.0.0.1:8787"
111+
default_num_agg_span_shards = 4
112+
113+
[env.leader.vars.TASKPROV_HPKE_COLLECTOR_CONFIG]
114+
id = 23
115+
kem_id = "p256_hkdf_sha256"
116+
kdf_id = "hkdf_sha256"
117+
aead_id = "aes128_gcm"
118+
public_key = "047dab625e0d269abcc28c611bebf5a60987ddf7e23df0e0aa343e5774ad81a1d0160d9252b82b4b5c52354205f5ec945645cb79facff8d85c9c31b490cdf35466"
119+
120+
[env.leader.durable_objects]
121+
bindings = [
122+
{ name = "DAP_AGGREGATE_STORE", class_name = "AggregateStore" },
123+
{ name = "DAP_TEST_STATE_CLEANER", class_name = "TestStateCleaner" },
124+
]
125+
126+
[[env.leader.kv_namespaces]]
127+
binding = "DAP_CONFIG"
128+
# KV bindings are in a looked up in a namespace identified by a 16-byte id number.
129+
# This number is assigned by calling
130+
#
131+
# wrangler kv:namespace create <NAME>
132+
#
133+
# for some unique name you specify, and it returns a unique id number to use.
134+
# Here we should use something like "leader" for the <NAME>.
135+
id = "<assign-one-for-the-leader>"
136+
# A "preview id" is an id used when running in "wrangler dev" mode locally, and
137+
# can just be made up. We generated the number below with the following python
138+
# code:
139+
#
140+
# import secrets
141+
# print(secrets.token_hex(16))
142+
#
143+
preview_id = "24c4dc92d5cf4680e508fe18eb8f0281"
144+
145+
[[migrations]]
146+
tag = "v1"
147+
new_classes = [
148+
"AggregateStore",
149+
"GarbageCollector",
150+
"HelperStateStore",
151+
]
152+
153+
[[migrations]]
154+
tag = "v2"
155+
renamed_classes = [
156+
{ from = "GarbageCollector", to = "TestStateCleaner" },
157+
]

crates/daphne-worker/Cargo.toml

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,44 @@ description = "Workers backend for Daphne"
1616
crate-type = ["cdylib", "rlib"]
1717

1818
[dependencies]
19-
axum.workspace = true
19+
async-trait = { workspace = true }
2020
axum-extra = { workspace = true, features = ["typed-header"] }
2121
bytes.workspace = true
2222
chrono = { workspace = true, default-features = false, features = ["clock", "wasmbind"] }
2323
constcat.workspace = true
2424
daphne = { path = "../daphne", features = ["prometheus"] }
25-
futures = { workspace = true, optional = true }
25+
either = { workspace = true }
26+
futures = { workspace = true }
2627
# We don't use getrandom directly but this allows us to enable the 'js' feature
2728
# of getrandom in the crates we depend on, that depend on getrandom
2829
getrandom = { workspace = true, features = ["js"] }
2930
headers.workspace = true
3031
hex.workspace = true
32+
http-body-util.workspace = true
3133
http.workspace = true
32-
prio_draft09.workspace = true
34+
mappable-rc.workspace = true
35+
p256 = { workspace = true }
3336
prio.workspace = true
37+
prio_draft09.workspace = true
3438
prometheus.workspace = true
3539
rand.workspace = true
40+
reqwest.workspace = true
3641
serde-wasm-bindgen.workspace = true
3742
serde.workspace = true
3843
serde_json.workspace = true
44+
static_assertions.workspace = true
45+
thiserror.workspace = true
46+
tower-service.workspace = true
47+
tower.workspace = true
3948
tracing-core.workspace = true
4049
tracing-subscriber = { workspace = true, features = ["env-filter", "json"]}
4150
tracing.workspace = true
4251
url.workspace = true
4352
worker.workspace = true
44-
tower-service.workspace = true
53+
54+
[dependencies.axum]
55+
workspace = true
56+
features = ["query", "json", "http1", "http2"]
4557

4658
[dependencies.daphne-service-utils]
4759
path = "../daphne-service-utils"
@@ -50,10 +62,13 @@ features = ["durable_requests"]
5062
[dev-dependencies]
5163
daphne = { path = "../daphne", features = ["test-utils"] }
5264
paste.workspace = true
65+
rcgen.workspace = true
5366
reqwest.workspace = true # used in doc tests
67+
tokio.workspace = true
68+
webpki.workspace = true
5469

5570
[features]
56-
test-utils = ["daphne-service-utils/test-utils", "dep:futures"]
71+
test-utils = ["daphne-service-utils/test-utils"]
5772

5873
[lints]
5974
workspace = true

crates/daphne-worker/clippy.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@
22
# SPDX-License-Identifier: BSD-3-Clause
33

44
disallowed-methods = [
5-
{ path = "std::time::Instant::now", reason = "not implemented in wasm" },
5+
{ path = "std::time::Instant::now", reason = "not implemented in wasm. Use worker::Date::now()" },
6+
{ path = "std::time::SystemTime::now", reason = "not implemented in wasm. Use worker::Date::now()" },
67
]

0 commit comments

Comments
 (0)