Skip to content

Commit 9bf4244

Browse files
committed
Align ContainerBytes implementations with padding
1 parent ee58086 commit 9bf4244

File tree

2 files changed

+63
-8
lines changed

2 files changed

+63
-8
lines changed

timely/src/dataflow/channels/mod.rs

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ where
7474
let time_size = ::bincode::serialized_size(&time).expect("bincode::serialized_size() failed") as usize;
7575
// We expect to find the `data` payload at `8 + 8 + round_up(time_size)`;
7676
let bytes_read = 8 + 8 + ((time_size + 7) & !7);
77-
// let bytes_read = bytes.len() - slice.len();
7877
bytes.extract_to(bytes_read);
7978
let data: C = ContainerBytes::from_bytes(bytes);
8079
Self { time, data, from, seq }
@@ -113,6 +112,8 @@ pub trait ContainerBytes {
113112

114113
mod implementations {
115114

115+
use std::io::Write;
116+
116117
use serde::{Serialize, Deserialize};
117118
use crate::dataflow::channels::ContainerBytes;
118119

@@ -122,11 +123,16 @@ mod implementations {
122123
}
123124

124125
fn length_in_bytes(&self) -> usize {
125-
::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize
126+
let length = ::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize;
127+
(length + 7) & !7
126128
}
127129

128-
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
129-
::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
130+
fn into_bytes<W: Write>(&self, writer: &mut W) {
131+
let mut counter = WriteCounter::new(writer);
132+
::bincode::serialize_into(&mut counter, &self).expect("bincode::serialize_into() failed");
133+
let written = counter.count;
134+
let written_slop = ((written + 7) & !7) - written;
135+
counter.write(&[0u8; 8][..written_slop]).unwrap();
130136
}
131137
}
132138

@@ -137,11 +143,60 @@ mod implementations {
137143
}
138144

139145
fn length_in_bytes(&self) -> usize {
140-
::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize
146+
let length = ::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize;
147+
(length + 7) & !7
148+
}
149+
150+
fn into_bytes<W: Write>(&self, writer: &mut W) {
151+
let mut counter = WriteCounter::new(writer);
152+
::bincode::serialize_into(&mut counter, &self).expect("bincode::serialize_into() failed");
153+
let written = counter.count;
154+
let written_slop = ((written + 7) & !7) - written;
155+
counter.write(&[0u8; 8][..written_slop]).unwrap();
156+
}
157+
}
158+
159+
use write_counter::WriteCounter;
160+
/// A `Write` wrapper that counts the bytes written.
161+
mod write_counter {
162+
163+
use ::std::io::{Write, IoSlice, Result};
164+
use std::fmt::Arguments;
165+
166+
/// A write wrapper that tracks the bytes written.
167+
pub struct WriteCounter<W> {
168+
inner: W,
169+
pub count: usize,
170+
}
171+
172+
impl<W> WriteCounter<W> {
173+
/// Creates a new counter wrapper from a writer.
174+
pub fn new(inner: W) -> Self {
175+
Self { inner, count: 0 }
176+
}
141177
}
142178

143-
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
144-
::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
179+
impl<W: Write> Write for WriteCounter<W> {
180+
fn write(&mut self, buf: &[u8]) -> Result<usize> {
181+
let written = self.inner.write(buf)?;
182+
self.count += written;
183+
Ok(written)
184+
}
185+
fn flush(&mut self) -> Result<()> {
186+
self.inner.flush()
187+
}
188+
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
189+
let written = self.inner.write_vectored(bufs)?;
190+
self.count += written;
191+
Ok(written)
192+
}
193+
fn write_all(&mut self, buf: &[u8]) -> Result<()> {
194+
self.count += buf.len();
195+
self.inner.write_all(buf)
196+
}
197+
fn write_fmt(&mut self, _fmt: Arguments<'_>) -> Result<()> {
198+
unimplemented!()
199+
}
145200
}
146201
}
147202
}

timely/src/dataflow/operators/core/partition.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ pub trait Partition<G: Scope, C: Container> {
1212
/// Produces `parts` output streams, containing records produced and assigned by `route`.
1313
///
1414
/// # Examples
15-
/// ```ignore
15+
/// ```
1616
/// use timely::dataflow::operators::ToStream;
1717
/// use timely::dataflow::operators::core::{Partition, Inspect};
1818
/// use timely_container::CapacityContainerBuilder;

0 commit comments

Comments
 (0)