Skip to content

Commit 543779a

Browse files
committed
Allow containers to specify their own serialization
1 parent b411957 commit 543779a

File tree

3 files changed

+72
-8
lines changed

3 files changed

+72
-8
lines changed

timely/src/dataflow/channels/mod.rs

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,81 @@ impl<T, C: Container> Message<T, C> {
6161
impl<T, C> crate::communication::Bytesable for Message<T, C>
6262
where
6363
T: Serialize + for<'a> Deserialize<'a>,
64-
C: Serialize + for<'a> Deserialize<'a>,
64+
// C: Serialize + for<'a> Deserialize<'a>,
65+
C: ContainerBytes,
6566
{
66-
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
67-
::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
67+
fn from_bytes(mut bytes: crate::bytes::arc::Bytes) -> Self {
68+
// ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
69+
let mut slice = &bytes[..];
70+
let from: usize = ::bincode::deserialize(&mut slice).expect("bincode::deserialize() failed");
71+
let seq: usize = ::bincode::deserialize(&mut slice).expect("bincode::deserialize() failed");
72+
let time: T = ::bincode::deserialize(&mut slice).expect("bincode::deserialize() failed");
73+
let bytes_read = bytes.len() - slice.len();
74+
bytes.extract_to(bytes_read);
75+
let data: C = ContainerBytes::from_bytes(bytes);
76+
Self { time, data, from, seq }
6877
}
6978

7079
fn length_in_bytes(&self) -> usize {
71-
::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize
80+
::bincode::serialized_size(&self.from).expect("bincode::serialized_size() failed") as usize +
81+
::bincode::serialized_size(&self.seq).expect("bincode::serialized_size() failed") as usize +
82+
::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize +
83+
self.data.length_in_bytes()
7284
}
7385

7486
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
75-
::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
87+
// ::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
88+
::bincode::serialize_into(&mut *writer, &self.from).expect("bincode::serialize_into() failed");
89+
::bincode::serialize_into(&mut *writer, &self.seq).expect("bincode::serialize_into() failed");
90+
::bincode::serialize_into(&mut *writer, &self.time).expect("bincode::serialize_into() failed");
91+
self.data.into_bytes(&mut *writer);
7692
}
77-
}
93+
}
94+
95+
96+
/// A container-oriented version of `Bytesable` that can be implemented here for `Vec<T>` and other containers.
97+
pub trait ContainerBytes {
98+
/// Wrap bytes as `Self`.
99+
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self;
100+
101+
/// The number of bytes required to serialize the data.
102+
fn length_in_bytes(&self) -> usize;
103+
104+
/// Writes the binary representation into `writer`.
105+
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W);
106+
}
107+
108+
mod implementations {
109+
110+
use serde::{Serialize, Deserialize};
111+
use crate::dataflow::channels::ContainerBytes;
112+
113+
impl<T: Serialize + for<'a> Deserialize<'a>> ContainerBytes for Vec<T> {
114+
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
115+
::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
116+
}
117+
118+
fn length_in_bytes(&self) -> usize {
119+
::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize
120+
}
121+
122+
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
123+
::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
124+
}
125+
}
126+
127+
use crate::container::flatcontainer::FlatStack;
128+
impl<T: Serialize + for<'a> Deserialize<'a> + crate::container::flatcontainer::Region> ContainerBytes for FlatStack<T> {
129+
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
130+
::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
131+
}
132+
133+
fn length_in_bytes(&self) -> usize {
134+
::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize
135+
}
136+
137+
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
138+
::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
139+
}
140+
}
141+
}

timely/src/dataflow/channels/pact.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ where
6868
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
6969
impl<T: Timestamp, C, H: 'static> ParallelizationContract<T, C> for ExchangeCore<C, H>
7070
where
71-
C: ExchangeData + PushPartitioned,
71+
C: ExchangeData + PushPartitioned + crate::dataflow::channels::ContainerBytes,
7272
for<'a> H: FnMut(&C::Item<'a>) -> u64
7373
{
7474
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<Message<T, C>>>>, H>;

timely/src/dataflow/operators/core/exchange.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub trait Exchange<C: PushPartitioned> {
3030

3131
impl<G: Scope, C> Exchange<C> for StreamCore<G, C>
3232
where
33-
C: PushPartitioned + ExchangeData,
33+
C: PushPartitioned + ExchangeData + crate::dataflow::channels::ContainerBytes,
3434
{
3535
fn exchange<F>(&self, route: F) -> StreamCore<G, C>
3636
where

0 commit comments

Comments
 (0)