Skip to content

Commit ab24dab

Browse files
committed
Robustify potential Bytes alignment
1 parent 8baf41e commit ab24dab

File tree

1 file changed

+51
-14
lines changed

1 file changed

+51
-14
lines changed

timely/examples/columnar.rs

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,13 @@ fn main() {
2323

2424
use columnar::Len;
2525

26+
let config = timely::Config {
27+
communication: timely::CommunicationConfig::ProcessBinary(7),
28+
worker: timely::WorkerConfig::default(),
29+
};
30+
2631
// initializes and runs a timely dataflow.
27-
timely::execute_from_args(std::env::args(), |worker| {
32+
timely::execute(config, |worker| {
2833
let mut input = <InputHandleCore<_, CapacityContainerBuilder<Container>>>::new();
2934
let mut probe = ProbeHandle::new();
3035

@@ -117,6 +122,11 @@ mod container {
117122
Typed(C),
118123
/// The binary variant of the container.
119124
Bytes(Bytes),
125+
/// Relocated, aligned binary data, if `Bytes` doesn't work for some reason.
126+
///
127+
/// Reasons could include misalignment, cloning of data, or wanting
128+
/// to release the `Bytes` as a scarce resource.
129+
Align(Box<[u64]>),
120130
}
121131

122132
impl<C: Default> Default for Column<C> {
@@ -128,6 +138,7 @@ mod container {
128138
match self {
129139
Column::Typed(t) => Column::Typed(t.clone()),
130140
Column::Bytes(_) => unimplemented!(),
141+
Column::Align(a) => Column::Align(a.clone()),
131142
}
132143
}
133144
}
@@ -144,24 +155,26 @@ mod container {
144155
fn len(&self) -> usize {
145156
match self {
146157
Column::Typed(t) => t.len(),
147-
Column::Bytes(b) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(&b[..]))).len(),
158+
Column::Bytes(b) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).len(),
159+
Column::Align(a) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(a)).len(),
148160
}
149161
}
150-
// Perhpas this should be an enum that allows the bytes to be un-set, but .. not sure what this should do.
162+
// This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into.
151163
fn clear(&mut self) {
152164
match self {
153165
Column::Typed(t) => t.clear(),
154-
Column::Bytes(_) => unimplemented!(),
166+
Column::Bytes(_) => *self = Column::Typed(C::default()),
167+
Column::Align(_) => *self = Column::Typed(C::default()),
155168
}
156-
// unimplemented!()
157169
}
158170

159171
type ItemRef<'a> = <C::Borrowed<'a> as Index>::Ref where Self: 'a;
160172
type Iter<'a> = IterOwn<C::Borrowed<'a>>;
161173
fn iter<'a>(&'a self) -> Self::Iter<'a> {
162174
match self {
163175
Column::Typed(t) => <C::Borrowed<'a> as FromBytes>::from_bytes(&mut t.as_bytes().map(|(_, x)| x)).into_iter(),
164-
Column::Bytes(b) => <C::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(&b[..]))).into_iter()
176+
Column::Bytes(b) => <C::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).into_iter(),
177+
Column::Align(a) => <C::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(a)).into_iter(),
165178
}
166179
}
167180

@@ -170,7 +183,8 @@ mod container {
170183
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
171184
match self {
172185
Column::Typed(t) => <C::Borrowed<'a> as FromBytes>::from_bytes(&mut t.as_bytes().map(|(_, x)| x)).into_iter(),
173-
Column::Bytes(b) => <C::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(&b[..]))).into_iter()
186+
Column::Bytes(b) => <C::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).into_iter(),
187+
Column::Align(a) => <C::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(a)).into_iter(),
174188
}
175189
}
176190
}
@@ -191,35 +205,58 @@ mod container {
191205
fn push_into(&mut self, item: T) {
192206
match self {
193207
Column::Typed(t) => t.push(item),
194-
Column::Bytes(_) => unimplemented!(),
208+
Column::Align(_) | Column::Bytes(_) => {
209+
// We really oughtn't be calling this in this case.
210+
// We could convert to owned, but need more constraints on `C`.
211+
unimplemented!("Pushing into Column::Bytes without first clearing");
212+
}
195213
}
196214
}
197215
}
198216

199217
use timely::dataflow::channels::ContainerBytes;
200218
impl<C: columnar::bytes::AsBytes> ContainerBytes for Column<C> {
201219
fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self {
202-
Self::Bytes(bytes)
220+
// Our expectation / hope is that `bytes` is `u64` aligned and sized.
221+
// If the alignment is borked, we can relocate. IF the size is borked,
222+
// not sure what we do in that case.
223+
assert!(bytes.len() % 8 == 0);
224+
if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) {
225+
Self::Bytes(bytes)
226+
}
227+
else {
228+
let mut alloc: Vec<u64> = vec![0; bytes.len() / 8];
229+
bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&bytes[..]);
230+
Self::Align(alloc.into())
231+
}
203232
}
204233

205234
fn length_in_bytes(&self) -> usize {
206235
match self {
207236
// We'll need one u64 for the length, then the length rounded up to a multiple of 8.
208237
Column::Typed(t) => t.as_bytes().map(|(_, x)| 8 * (1 + (x.len()/8) + if x.len() % 8 == 0 { 0 } else { 1 })).sum(),
209238
Column::Bytes(b) => b.len(),
239+
Column::Align(a) => 8 * a.len(),
210240
}
211241
}
212242

213243
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
214244
match self {
215245
Column::Typed(t) => {
216-
for (_align, _bytes) in t.as_bytes() {
217-
// Each byte slice is a u64 length in bytes,
218-
// followed by bytes padded to a multiple of eight bytes.
219-
// writer.write_all(&)
246+
// Columnar data is serialized as a sequence of `u64` values, with each `[u8]` slice
247+
// serialize as first its length in bytes, and then as many `u64` values as needed.
248+
// Padding should be added, but only for alignment; no specific values are required.
249+
for (align, bytes) in t.as_bytes() {
250+
assert!(align <= 8);
251+
let length: u64 = bytes.len().try_into().unwrap();
252+
writer.write_all(bytemuck::cast_slice(std::slice::from_ref(&length))).unwrap();
253+
writer.write_all(bytes).unwrap();
254+
let padding: usize = ((8 - (length % 8)) % 8).try_into().unwrap();
255+
writer.write_all(&[0; 8][..padding]).unwrap();
220256
}
221257
},
222-
Column::Bytes(b) => writer.write_all(&b[..]).unwrap(),
258+
Column::Bytes(b) => writer.write_all(b).unwrap(),
259+
Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
223260
}
224261
}
225262
}

0 commit comments

Comments
 (0)