Skip to content

Commit 67f95ba

Browse files
committed
Faster pipelines
Signed-off-by: Nicholas Gates <[email protected]>
1 parent 80ca709 commit 67f95ba

File tree

5 files changed

+79
-25
lines changed

5 files changed

+79
-25
lines changed

encodings/fastlanes/benches/pipeline_v2_bitpacking_basic.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@ const BENCH_PARAMS: &[(usize, f64)] = &[
2626
(10_000, 1.0),
2727
(100_000, 0.5),
2828
(100_000, 1.0),
29-
(1_000_000, 0.5),
30-
(1_000_000, 1.0),
31-
(10_000_000, 0.5),
32-
(10_000_000, 1.0),
3329
];
3430

3531
#[divan::bench(args = BENCH_PARAMS)]

encodings/fastlanes/src/bitpacking/array/bitpack_pipeline.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ impl PipelinedNode for BitPackedArray {
5757
Ok(Box::new(AlignedBitPackedKernel::<T>::new(
5858
packed_bit_width,
5959
packed_buffer,
60+
// FIXME(ngates): if we make sure the mask has offset zero, we know that split_off
61+
// inside the kernel is free.
6062
self.validity.to_mask(self.len()).into_mut(),
6163
)) as Box<dyn Kernel>)
6264
})
@@ -180,7 +182,7 @@ impl<BP: PhysicalPType<Physical: BitPacking>> Kernel for AlignedBitPackedKernel<
180182
BitPacking::unchecked_unpack(
181183
self.packed_bit_width,
182184
packed_bytes,
183-
transmute(elements.as_mut()),
185+
transmute::<&mut [BP], &mut [BP::Physical]>(elements.as_mut()),
184186
);
185187
}
186188

vortex-buffer/src/bit/buf.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,9 @@ mod tests {
568568
fn test_slice_offset_calculation() {
569569
let buf = BitBuffer::collect_bool(16, |_| true);
570570
let sliced = buf.slice(10..16);
571-
assert_eq!(sliced.offset(), 10);
571+
assert_eq!(sliced.len(), 6);
572+
// Ensure the offset is modulo 8
573+
assert_eq!(sliced.offset(), 2);
572574
}
573575

574576
#[rstest]

vortex-buffer/src/bit/buf_mut.rs

Lines changed: 63 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -437,35 +437,43 @@ impl BitBufferMut {
437437
///
438438
/// Unlike bytes, if the split position is not on a byte-boundary this operation will copy
439439
/// data into the result type, and mutate self.
440+
#[must_use = "consider BitBufferMut::truncate if you don't need the other half"]
440441
pub fn split_off(&mut self, at: usize) -> Self {
441-
assert!(at <= self.len, "index {at} exceeds len {}", self.len);
442+
assert!(
443+
at <= self.capacity(),
444+
"index {at} exceeds capacity {}",
445+
self.capacity()
446+
);
442447

443-
let new_offset = self.offset;
444-
let new_len = self.len - at;
448+
// The length of the tail is any bits after `at`
449+
let tail_len = self.len.saturating_sub(at);
450+
let byte_pos = (self.offset + at).div_ceil(8);
445451

446452
// If we are splitting on a byte boundary, we can just slice the buffer
447-
if (self.offset + at) % 8 == 0 {
448-
let byte_pos = (self.offset + at) / 8;
449-
let new_buffer = self.buffer.split_off(byte_pos);
450-
self.len = at;
453+
// Or if `at > self.len`, then the tail is empty anyway and we can just return as much
454+
// of the existing capacity as possible.
455+
if at > self.len() || ((self.offset + at) % 8 == 0) {
456+
let tail_buffer = self.buffer.split_off(byte_pos);
457+
self.len = self.len.min(at);
458+
459+
// Return the tail buffer
451460
return Self {
452-
buffer: new_buffer,
453-
offset: new_offset,
454-
len: new_len,
461+
buffer: tail_buffer,
462+
offset: 0,
463+
len: tail_len,
455464
};
456465
}
457466

458-
// Otherwise, we need to copy bits into a new buffer
459-
let mut new_buffer = BitBufferMut::with_capacity(new_len);
460-
for i in 0..new_len {
461-
let value = self.value(at + i);
462-
new_buffer.append(value);
463-
}
467+
// Otherwise, we truncate ourselves, and copy any bits into a new tail buffer.
468+
// Note that in this case we do not preserve the capacity.
469+
let u64_cap = tail_len.div_ceil(8);
470+
let mut tail_buffer_u64 = BufferMut::<u64>::with_capacity(u64_cap);
471+
tail_buffer_u64.extend(
472+
BitChunks::new(self.buffer.as_slice(), self.offset + at, tail_len).iter_padded(),
473+
);
464474

465-
// Truncate self to the split position
466475
self.truncate(at);
467-
468-
new_buffer
476+
BitBufferMut::from_buffer(tail_buffer_u64.into_byte_buffer(), 0, tail_len)
469477
}
470478

471479
/// Absorbs a mutable buffer that was previously split off.
@@ -1058,4 +1066,40 @@ mod tests {
10581066
assert_eq!(bit_buf.value(i), i % 2 == 0);
10591067
}
10601068
}
1069+
1070+
#[test]
1071+
fn test_split_off() {
1072+
// Test splitting at various positions and across a byte boundary
1073+
for i in 0..10 {
1074+
let buf = bitbuffer![0 1 0 1 0 1 0 1 0 1];
1075+
1076+
let mut buf_mut = buf.clone().into_mut();
1077+
assert_eq!(buf_mut.len(), 10);
1078+
1079+
let tail = buf_mut.split_off(i);
1080+
assert_eq!(buf_mut.len(), i);
1081+
assert_eq!(buf_mut.freeze(), buf.slice(0..i));
1082+
1083+
assert_eq!(tail.len(), 10 - i);
1084+
assert_eq!(tail.freeze(), buf.slice(i..10));
1085+
}
1086+
}
1087+
1088+
#[test]
1089+
fn test_split_off_with_offset() {
1090+
// Test splitting at various positions and across a byte boundary
1091+
for i in 0..10 {
1092+
let buf = bitbuffer![0 1 0 1 0 1 0 1 0 1 0 1].slice(2..);
1093+
1094+
let mut buf_mut = buf.clone().into_mut();
1095+
assert_eq!(buf_mut.len(), 10);
1096+
1097+
let tail = buf_mut.split_off(i);
1098+
assert_eq!(buf_mut.len(), i);
1099+
assert_eq!(buf_mut.freeze(), buf.slice(0..i));
1100+
1101+
assert_eq!(tail.len(), 10 - i);
1102+
assert_eq!(tail.freeze(), buf.slice(i..10));
1103+
}
1104+
}
10611105
}

vortex-buffer/src/buffer_mut.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,16 @@ impl<T> BufferMut<T> {
391391
self.length += other.length;
392392
}
393393

394+
/// Return the [`ByteBufferMut`] for this [`BufferMut`].
395+
pub fn into_byte_buffer(self) -> ByteBufferMut {
396+
ByteBufferMut {
397+
bytes: self.bytes,
398+
length: self.length * size_of::<T>(),
399+
alignment: self.alignment,
400+
_marker: Default::default(),
401+
}
402+
}
403+
394404
/// Freeze the `BufferMut` into a `Buffer`.
395405
pub fn freeze(self) -> Buffer<T> {
396406
Buffer {

0 commit comments

Comments
 (0)