Skip to content

Commit b415fa0

Browse files
committed
Allow containers to specify their own serialization
1 parent b411957 commit b415fa0

File tree

3 files changed

+70
-8
lines changed

3 files changed

+70
-8
lines changed

timely/src/dataflow/channels/mod.rs

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,79 @@ impl<T, C: Container> Message<T, C> {
6161
impl<T, C> crate::communication::Bytesable for Message<T, C>
6262
where
6363
T: Serialize + for<'a> Deserialize<'a>,
64-
C: Serialize + for<'a> Deserialize<'a>,
64+
// C: Serialize + for<'a> Deserialize<'a>,
65+
C: ContainerBytes,
6566
{
66-
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
67-
::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
67+
fn from_bytes(mut bytes: crate::bytes::arc::Bytes) -> Self {
68+
let mut slice = &bytes[..];
69+
let from: usize = ::bincode::deserialize(&mut slice).expect("bincode::deserialize() failed");
70+
let seq: usize = ::bincode::deserialize(&mut slice).expect("bincode::deserialize() failed");
71+
let time: T = ::bincode::deserialize(&mut slice).expect("bincode::deserialize() failed");
72+
let bytes_read = bytes.len() - slice.len();
73+
bytes.extract_to(bytes_read);
74+
let data: C = ContainerBytes::from_bytes(bytes);
75+
Self { time, data, from, seq }
6876
}
6977

7078
fn length_in_bytes(&self) -> usize {
71-
::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize
79+
::bincode::serialized_size(&self.from).expect("bincode::serialized_size() failed") as usize +
80+
::bincode::serialized_size(&self.seq).expect("bincode::serialized_size() failed") as usize +
81+
::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize +
82+
self.data.length_in_bytes()
7283
}
7384

7485
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
75-
::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
86+
::bincode::serialize_into(&mut *writer, &self.from).expect("bincode::serialize_into() failed");
87+
::bincode::serialize_into(&mut *writer, &self.seq).expect("bincode::serialize_into() failed");
88+
::bincode::serialize_into(&mut *writer, &self.time).expect("bincode::serialize_into() failed");
89+
self.data.into_bytes(&mut *writer);
7690
}
77-
}
91+
}
92+
93+
94+
/// A container-oriented version of `Bytesable` that can be implemented here for `Vec<T>` and other containers.
95+
pub trait ContainerBytes {
96+
/// Wrap bytes as `Self`.
97+
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self;
98+
99+
/// The number of bytes required to serialize the data.
100+
fn length_in_bytes(&self) -> usize;
101+
102+
/// Writes the binary representation into `writer`.
103+
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W);
104+
}
105+
106+
mod implementations {
107+
108+
use serde::{Serialize, Deserialize};
109+
use crate::dataflow::channels::ContainerBytes;
110+
111+
impl<T: Serialize + for<'a> Deserialize<'a>> ContainerBytes for Vec<T> {
112+
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
113+
::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
114+
}
115+
116+
fn length_in_bytes(&self) -> usize {
117+
::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize
118+
}
119+
120+
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
121+
::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
122+
}
123+
}
124+
125+
use crate::container::flatcontainer::FlatStack;
126+
impl<T: Serialize + for<'a> Deserialize<'a> + crate::container::flatcontainer::Region> ContainerBytes for FlatStack<T> {
127+
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
128+
::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
129+
}
130+
131+
fn length_in_bytes(&self) -> usize {
132+
::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize
133+
}
134+
135+
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
136+
::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
137+
}
138+
}
139+
}

timely/src/dataflow/channels/pact.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ where
6868
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
6969
impl<T: Timestamp, C, H: 'static> ParallelizationContract<T, C> for ExchangeCore<C, H>
7070
where
71-
C: ExchangeData + PushPartitioned,
71+
C: ExchangeData + PushPartitioned + crate::dataflow::channels::ContainerBytes,
7272
for<'a> H: FnMut(&C::Item<'a>) -> u64
7373
{
7474
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<Message<T, C>>>>, H>;

timely/src/dataflow/operators/core/exchange.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub trait Exchange<C: PushPartitioned> {
3030

3131
impl<G: Scope, C> Exchange<C> for StreamCore<G, C>
3232
where
33-
C: PushPartitioned + ExchangeData,
33+
C: PushPartitioned + ExchangeData + crate::dataflow::channels::ContainerBytes,
3434
{
3535
fn exchange<F>(&self, route: F) -> StreamCore<G, C>
3636
where

0 commit comments

Comments
 (0)