Skip to content

Commit 766f574

Browse files
committed
add extra memcpy at the end of pipeline
Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
1 parent 7aec2a6 commit 766f574

File tree

1 file changed

+86
-9
lines changed

1 file changed

+86
-9
lines changed

vortex/benches/pipeline.rs

Lines changed: 86 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ struct BenchmarkBuffers {
172172
/// Output buffer for batch decompression.
173173
alp_decoded: Vec<f32>,
174174
/// Output buffer for pipeline decompression.
175-
alp_decoded_pipeline: Vec<f32>,
175+
pipeline_output: Vec<f32>,
176176
/// Output buffer for in-place batch decompression.
177177
alp_decoded_inplace_batch: Vec<f32>,
178178
/// Output buffer for in-place pipeline decompression.
@@ -216,7 +216,7 @@ fn setup(size: usize) -> (InputData, BenchmarkBuffers) {
216216
bitpacked_output: vec![0u32; size],
217217
for_decoded: vec![0i32; size],
218218
alp_decoded: vec![0.0f32; size],
219-
alp_decoded_pipeline: vec![0.0f32; size],
219+
pipeline_output: vec![0.0f32; size],
220220
alp_decoded_inplace_batch: vec![0.0f32; size],
221221
alp_decoded_inplace_pipeline: vec![0.0f32; size],
222222
};
@@ -343,6 +343,70 @@ fn decompress_pipeline(
343343
}
344344
}
345345

346+
/// Pipeline decompression that processes data chunk by chunk with an extra copy.
347+
///
348+
/// This version intentionally adds an extra copy step to measure the performance impact.
349+
/// It writes to an intermediate ALP buffer before copying to the final output.
350+
fn decompress_pipeline_extra_copy(
351+
bitpacked: &[u32],
352+
reference: i32,
353+
exponents: Exponents,
354+
unpack_buffer: &mut [u32],
355+
for_buffer: &mut [i32],
356+
alp_buffer: &mut [f32],
357+
output: &mut [f32],
358+
) {
359+
debug_assert!(bitpacked.len().is_multiple_of(S));
360+
debug_assert_eq!(output.len(), bitpacked.len() * T / W);
361+
debug_assert!(unpack_buffer.len() >= N);
362+
debug_assert!(for_buffer.len() >= N);
363+
debug_assert!(alp_buffer.len() >= N);
364+
365+
// Use only the first N elements of the pre-allocated buffers.
366+
let unpack_chunk = &mut unpack_buffer[..N];
367+
let for_chunk = &mut for_buffer[..N];
368+
let alp_chunk = &mut alp_buffer[..N];
369+
370+
let mut input_offset = 0;
371+
let mut output_offset = 0;
372+
373+
// Process each 1024-element chunk.
374+
while input_offset < bitpacked.len() {
375+
// Stage 1: Bitpacking decompression.
376+
// SAFETY: Bounds are verified by debug_assert and loop conditions.
377+
unsafe {
378+
let input = bitpacked.get_unchecked(input_offset..input_offset + S);
379+
BitPacking::unchecked_unpack(W, input, unpack_chunk);
380+
}
381+
382+
// Stage 2: FoR decompression.
383+
// SAFETY: Buffer sizes are verified to be N.
384+
unsafe {
385+
for i in 0..N {
386+
let unpacked = *unpack_chunk.get_unchecked(i) as i32;
387+
*for_chunk.get_unchecked_mut(i) = unpacked.wrapping_add(reference);
388+
}
389+
}
390+
391+
// Stage 3: ALP decompression into intermediate buffer.
392+
// SAFETY: Buffer sizes are verified to be N.
393+
unsafe {
394+
for i in 0..N {
395+
let for_decoded = *for_chunk.get_unchecked(i);
396+
*alp_chunk.get_unchecked_mut(i) = f32::decode_single(for_decoded, exponents);
397+
}
398+
}
399+
400+
// Stage 4: Copy from intermediate ALP buffer to final output.
401+
// SAFETY: Buffer sizes are verified to be N.
402+
let output_chunk = unsafe { output.get_unchecked_mut(output_offset..output_offset + N) };
403+
output_chunk.copy_from_slice(alp_chunk);
404+
405+
input_offset += S;
406+
output_offset += N;
407+
}
408+
}
409+
346410
/// In-place pipeline decompression that processes data chunk by chunk directly in the output buffer.
347411
///
348412
/// Combines the benefits of pipeline processing with minimal memory usage.
@@ -641,7 +705,24 @@ mod decompress_benchmarks {
641705
input_data.exponents,
642706
&mut buffers.bitpacked_output,
643707
&mut buffers.for_decoded,
644-
&mut buffers.alp_decoded_pipeline,
708+
&mut buffers.pipeline_output,
709+
);
710+
});
711+
}
712+
713+
#[divan::bench(consts = BENCHMARK_SIZES)]
714+
fn pipeline_extra_copy<const SIZE: usize>(bencher: Bencher) {
715+
let (input_data, mut buffers) = setup(SIZE);
716+
717+
bencher.bench_local(|| {
718+
decompress_pipeline_extra_copy(
719+
&input_data.bitpacked,
720+
input_data.reference,
721+
input_data.exponents,
722+
&mut buffers.bitpacked_output,
723+
&mut buffers.for_decoded,
724+
&mut buffers.alp_decoded,
725+
&mut buffers.pipeline_output,
645726
);
646727
});
647728
}
@@ -715,13 +796,9 @@ mod correctness_verification {
715796
input_data.exponents,
716797
&mut buffers.bitpacked_output,
717798
&mut buffers.for_decoded,
718-
&mut buffers.alp_decoded_pipeline,
719-
);
720-
compare_outputs(
721-
"pipeline",
722-
&buffers.alp_decoded,
723-
&buffers.alp_decoded_pipeline,
799+
&mut buffers.pipeline_output,
724800
);
801+
compare_outputs("pipeline", &buffers.alp_decoded, &buffers.pipeline_output);
725802

726803
// Run in-place batch decompression and compare with batch.
727804
decompress_in_place_batch(

0 commit comments

Comments
 (0)