Skip to content

Commit 7305206

Browse files
committed
Align Bytesable writes to u64
1 parent 12a07d9 commit 7305206

File tree

2 files changed

+24
-10
lines changed

2 files changed

+24
-10
lines changed

timely/src/dataflow/channels/mod.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@ impl<T, C: Container> Message<T, C> {
5757
}
5858

5959
// Instructions for serialization of `Message`.
60-
// Intended to swap out the constraint on `C` for `C: Bytesable`.
60+
//
61+
// Serialization of each field is meant to be `u64` aligned, so that each has tha ability
62+
// to be decoded using safe transmutation, e.g. `bytemuck`.
6163
impl<T, C> crate::communication::Bytesable for Message<T, C>
6264
where
6365
T: Serialize + for<'a> Deserialize<'a>,
@@ -69,24 +71,29 @@ where
6971
let from: usize = slice.read_u64::<byteorder::LittleEndian>().unwrap().try_into().unwrap();
7072
let seq: usize = slice.read_u64::<byteorder::LittleEndian>().unwrap().try_into().unwrap();
7173
let time: T = ::bincode::deserialize_from(&mut slice).expect("bincode::deserialize() failed");
72-
let bytes_read = bytes.len() - slice.len();
74+
let time_size = ::bincode::serialized_size(&time).expect("bincode::serialized_size() failed") as usize;
75+
// We expect to find the `data` payload at `8 + 8 + round_up(time_size)`;
76+
let bytes_read = 8 + 8 + ((time_size + 7) & !7);
77+
// let bytes_read = bytes.len() - slice.len();
7378
bytes.extract_to(bytes_read);
7479
let data: C = ContainerBytes::from_bytes(bytes);
7580
Self { time, data, from, seq }
7681
}
7782

7883
fn length_in_bytes(&self) -> usize {
84+
let time_size = ::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize;
7985
// 16 comes from the two `u64` fields: `from` and `seq`.
80-
16 +
81-
::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize +
82-
self.data.length_in_bytes()
86+
16 + ((time_size + 7) & !7) + self.data.length_in_bytes()
8387
}
8488

8589
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
8690
use byteorder::WriteBytesExt;
8791
writer.write_u64::<byteorder::LittleEndian>(self.from.try_into().unwrap()).unwrap();
8892
writer.write_u64::<byteorder::LittleEndian>(self.seq.try_into().unwrap()).unwrap();
8993
::bincode::serialize_into(&mut *writer, &self.time).expect("bincode::serialize_into() failed");
94+
let time_size = ::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize;
95+
let time_slop = ((time_size + 7) & !7) - time_size;
96+
writer.write(&[0u8; 8][..time_slop]).unwrap();
9097
self.data.into_bytes(&mut *writer);
9198
}
9299
}

timely/src/lib.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ impl<T: Clone+'static> Data for T { }
108108
///
109109
/// The `ExchangeData` trait extends `Data` with any requirements imposed by the `timely_communication`
110110
/// `Data` trait, which describes requirements for communication along channels.
111-
pub trait ExchangeData: Data + encoding::Data { }
112-
impl<T: Data + encoding::Data> ExchangeData for T { }
111+
pub trait ExchangeData: Data + encoding::Data + columnar::Columnar { }
112+
impl<T: Data + encoding::Data + columnar::Columnar> ExchangeData for T { }
113113

114114
#[doc = include_str!("../../README.md")]
115115
#[cfg(doctest)]
@@ -141,18 +141,25 @@ mod encoding {
141141
}
142142
}
143143

144+
// We will pad out anything we write to make the result `u64` aligned.
144145
impl<T: Data> Bytesable for Bincode<T> {
145146
fn from_bytes(bytes: Bytes) -> Self {
146147
let typed = ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed");
148+
let typed_size = ::bincode::serialized_size(&typed).expect("bincode::serialized_size() failed") as usize;
149+
assert_eq!(bytes.len(), (typed_size + 7) & !7);
147150
Bincode { payload: typed }
148151
}
149152

150153
fn length_in_bytes(&self) -> usize {
151-
::bincode::serialized_size(&self.payload).expect("bincode::serialized_size() failed") as usize
154+
let typed_size = ::bincode::serialized_size(&self.payload).expect("bincode::serialized_size() failed") as usize;
155+
(typed_size + 7) & !7
152156
}
153157

154-
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
155-
::bincode::serialize_into(writer, &self.payload).expect("bincode::serialize_into() failed");
158+
fn into_bytes<W: ::std::io::Write>(&self, mut writer: &mut W) {
159+
let typed_size = ::bincode::serialized_size(&self.payload).expect("bincode::serialized_size() failed") as usize;
160+
let typed_slop = ((typed_size + 7) & !7) - typed_size;
161+
::bincode::serialize_into(&mut writer, &self.payload).expect("bincode::serialize_into() failed");
162+
writer.write(&[0u8; 8][..typed_slop]).unwrap();
156163
}
157164
}
158165

0 commit comments

Comments
 (0)