Skip to content

Commit 595155c

Browse files
committed
Make CapnprotoPayload{En,De}code composable.
1 parent 4f5e776 commit 595155c

File tree

3 files changed

+30
-29
lines changed

3 files changed

+30
-29
lines changed

crates/daphne-server/src/storage_proxy_connection/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl<'d, B: DurableMethod + Debug, P: AsRef<[u8]>> RequestBuilder<'d, B, P> {
9696

9797
impl<'d, B: DurableMethod> RequestBuilder<'d, B, [u8; 0]> {
9898
pub fn encode<T: CapnprotoPayloadEncode>(self, payload: &T) -> RequestBuilder<'d, B, Vec<u8>> {
99-
self.with_body(payload.encode_to_bytes().unwrap())
99+
self.with_body(payload.encode_to_bytes())
100100
}
101101

102102
pub fn with_body<T: AsRef<[u8]>>(self, payload: T) -> RequestBuilder<'d, B, T> {

crates/daphne-service-utils/src/capnproto_payload.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
22
// SPDX-License-Identifier: BSD-3-Clause
33

4+
use capnp::traits::{FromPointerBuilder, FromPointerReader};
5+
46
pub trait CapnprotoPayloadEncode {
5-
fn encode_to_builder(&self) -> capnp::message::Builder<capnp::message::HeapAllocator>;
7+
type Builder<'a>: FromPointerBuilder<'a>;
8+
9+
fn encode_to_builder(&self, builder: Self::Builder<'_>);
610
}
711

812
pub trait CapnprotoPayloadEncodeExt {
9-
fn encode_to_bytes(&self) -> capnp::Result<Vec<u8>>;
13+
fn encode_to_bytes(&self) -> Vec<u8>;
1014
}
1115

1216
pub trait CapnprotoPayloadDecode {
13-
fn decode_from_reader(
14-
reader: capnp::message::Reader<capnp::serialize::OwnedSegments>,
15-
) -> capnp::Result<Self>
17+
type Reader<'a>: FromPointerReader<'a>;
18+
19+
fn decode_from_reader(reader: Self::Reader<'_>) -> capnp::Result<Self>
1620
where
1721
Self: Sized;
1822
}
@@ -27,11 +31,12 @@ impl<T> CapnprotoPayloadEncodeExt for T
2731
where
2832
T: CapnprotoPayloadEncode,
2933
{
30-
fn encode_to_bytes(&self) -> capnp::Result<Vec<u8>> {
34+
fn encode_to_bytes(&self) -> Vec<u8> {
35+
let mut message = capnp::message::Builder::new_default();
36+
self.encode_to_builder(message.init_root::<T::Builder<'_>>());
3137
let mut buf = Vec::new();
32-
let message = self.encode_to_builder();
33-
capnp::serialize_packed::write_message(&mut buf, &message)?;
34-
Ok(buf)
38+
capnp::serialize_packed::write_message(&mut buf, &message).expect("infalible");
39+
buf
3540
}
3641
}
3742

@@ -49,6 +54,7 @@ where
4954
capnp::message::ReaderOptions::new(),
5055
)?;
5156

57+
let reader = reader.get_root::<T::Reader<'_>>()?;
5258
T::decode_from_reader(reader)
5359
}
5460
}

crates/daphne-service-utils/src/durable_requests/bindings/aggregate_store.rs

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,16 @@ pub struct AggregateStoreMergeOptions {
6969
}
7070

7171
impl CapnprotoPayloadEncode for AggregateStoreMergeReq {
72-
fn encode_to_builder(&self) -> capnp::message::Builder<capnp::message::HeapAllocator> {
72+
type Builder<'a> = aggregate_store_merge_req::Builder<'a>;
73+
74+
fn encode_to_builder(&self, mut builder: Self::Builder<'_>) {
7375
let Self {
7476
contained_reports,
7577
agg_share_delta,
7678
options,
7779
} = self;
78-
let mut message = capnp::message::Builder::new_default();
79-
let mut request = message.init_root::<aggregate_store_merge_req::Builder>();
8080
{
81-
let mut contained_reports = request.reborrow().init_contained_reports(
81+
let mut contained_reports = builder.reborrow().init_contained_reports(
8282
contained_reports
8383
.len()
8484
.try_into()
@@ -94,7 +94,7 @@ impl CapnprotoPayloadEncode for AggregateStoreMergeReq {
9494
}
9595
}
9696
{
97-
let mut agg_share_delta_packet = request.reborrow().init_agg_share_delta();
97+
let mut agg_share_delta_packet = builder.reborrow().init_agg_share_delta();
9898
agg_share_delta_packet.set_report_count(agg_share_delta.report_count);
9999
agg_share_delta_packet.set_min_time(agg_share_delta.min_time);
100100
agg_share_delta_packet.set_max_time(agg_share_delta.max_time);
@@ -157,20 +157,18 @@ impl CapnprotoPayloadEncode for AggregateStoreMergeReq {
157157
let AggregateStoreMergeOptions {
158158
skip_replay_protection,
159159
} = options;
160-
let mut options_packet = request.init_options();
160+
let mut options_packet = builder.init_options();
161161
options_packet.set_skip_replay_protection(*skip_replay_protection);
162162
}
163-
message
164163
}
165164
}
166165

167166
impl CapnprotoPayloadDecode for AggregateStoreMergeReq {
168-
fn decode_from_reader(
169-
reader: capnp::message::Reader<capnp::serialize::OwnedSegments>,
170-
) -> capnp::Result<Self> {
171-
let request = reader.get_root::<aggregate_store_merge_req::Reader>()?;
167+
type Reader<'a> = aggregate_store_merge_req::Reader<'a>;
168+
169+
fn decode_from_reader(reader: Self::Reader<'_>) -> capnp::Result<Self> {
172170
let agg_share_delta = {
173-
let agg_share_delta = request.get_agg_share_delta()?;
171+
let agg_share_delta = reader.get_agg_share_delta()?;
174172
let data = {
175173
macro_rules! make_decode {
176174
($func_name:ident, $agg_share_type:ident, $field_trait:ident, $field_error:ident) => {
@@ -238,8 +236,7 @@ impl CapnprotoPayloadDecode for AggregateStoreMergeReq {
238236
}
239237
};
240238
let contained_reports = {
241-
request
242-
.reborrow()
239+
reader
243240
.get_contained_reports()?
244241
.into_iter()
245242
.map(|report| {
@@ -257,7 +254,7 @@ impl CapnprotoPayloadDecode for AggregateStoreMergeReq {
257254
contained_reports,
258255
agg_share_delta,
259256
options: AggregateStoreMergeOptions {
260-
skip_replay_protection: request.get_options()?.get_skip_replay_protection(),
257+
skip_replay_protection: reader.get_options()?.get_skip_replay_protection(),
261258
},
262259
})
263260
}
@@ -352,8 +349,7 @@ mod test {
352349
},
353350
};
354351
let other =
355-
AggregateStoreMergeReq::decode_from_bytes(&this.encode_to_bytes().unwrap())
356-
.unwrap();
352+
AggregateStoreMergeReq::decode_from_bytes(&this.encode_to_bytes()).unwrap();
357353
assert_eq!(this, other);
358354
}
359355
}
@@ -411,8 +407,7 @@ mod test {
411407
},
412408
};
413409
let other =
414-
AggregateStoreMergeReq::decode_from_bytes(&this.encode_to_bytes().unwrap())
415-
.unwrap();
410+
AggregateStoreMergeReq::decode_from_bytes(&this.encode_to_bytes()).unwrap();
416411
assert_eq!(this, other);
417412
}
418413
}

0 commit comments

Comments
 (0)