Skip to content

Commit 520ffc4

Browse files
authored
chore: port random access benchmark to layouts (#1246)
develop was a643065 layouts was this PR: 6dc6c53 ``` # critcmp develop layouts group develop layouts ----- ------- ------- random-access/parquet-tokio-local-disk 1.01 94.0±0.44ms ? ?/sec 1.00 93.3±0.62ms ? ?/sec random-access/vortex-local-fs 1.00 381.3±10.78µs ? ?/sec 34.91 13.3±0.13ms ? ?/sec random-access/vortex-tokio-local-disk 1.00 349.5±12.64µs ? ?/sec 29.46 10.3±0.14ms ? ?/sec ```
1 parent a643065 commit 520ffc4

File tree

7 files changed

+61
-143
lines changed

7 files changed

+61
-143
lines changed

bench-vortex/benches/compress_noci.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use vortex::array::{ChunkedArray, StructArray};
2626
use vortex::dtype::field::Field;
2727
use vortex::error::VortexResult;
2828
use vortex::sampling_compressor::compressors::fsst::FSSTCompressor;
29-
use vortex::sampling_compressor::{SamplingCompressor, ALL_COMPRESSORS_CONTEXT};
29+
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
3030
use vortex::serde::layouts::{
3131
LayoutBatchStreamBuilder, LayoutContext, LayoutDeserializer, LayoutWriter,
3232
};
@@ -128,7 +128,7 @@ fn vortex_decompress_read(runtime: &Runtime, buf: Arc<Vec<u8>>) -> VortexResult<
128128
let builder: LayoutBatchStreamBuilder<_> = LayoutBatchStreamBuilder::new(
129129
buf,
130130
LayoutDeserializer::new(
131-
ALL_COMPRESSORS_CONTEXT.clone(),
131+
ALL_ENCODINGS_CONTEXT.clone(),
132132
LayoutContext::default().into(),
133133
),
134134
);

bench-vortex/benches/random_access.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ fn random_access_vortex(c: &mut Criterion) {
3333
.iter(|| async { black_box(take_vortex_tokio(&taxi_vortex, &INDICES).await.unwrap()) })
3434
});
3535

36-
let local_fs = Arc::new(LocalFileSystem::new()) as Arc<dyn ObjectStore>;
37-
let local_fs_path = object_store::path::Path::from_filesystem_path(&taxi_vortex).unwrap();
3836
group.bench_function("vortex-local-fs", |b| {
37+
let local_fs = Arc::new(LocalFileSystem::new()) as Arc<dyn ObjectStore>;
38+
let local_fs_path = object_store::path::Path::from_filesystem_path(&taxi_vortex).unwrap();
3939
b.to_async(Runtime::new().unwrap()).iter(|| async {
4040
black_box(
41-
take_vortex_object_store(&local_fs, &local_fs_path, &INDICES)
41+
take_vortex_object_store(local_fs.clone(), local_fs_path.clone(), &INDICES)
4242
.await
4343
.unwrap(),
4444
)
@@ -65,7 +65,7 @@ fn random_access_vortex(c: &mut Criterion) {
6565

6666
b.to_async(Runtime::new().unwrap()).iter(|| async {
6767
black_box(
68-
take_vortex_object_store(&r2_fs, &r2_path, &INDICES)
68+
take_vortex_object_store(r2_fs.clone(), r2_path.clone(), &INDICES)
6969
.await
7070
.unwrap(),
7171
)

bench-vortex/src/lib.rs

Lines changed: 3 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,11 @@ use itertools::Itertools;
1111
use log::LevelFilter;
1212
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1313
use simplelog::{ColorChoice, Config, TermLogger, TerminalMode};
14-
use vortex::aliases::hash_set::HashSet;
1514
use vortex::array::ChunkedArray;
1615
use vortex::arrow::FromArrowType;
1716
use vortex::compress::CompressionStrategy;
1817
use vortex::dtype::DType;
1918
use vortex::fastlanes::DeltaEncoding;
20-
use vortex::sampling_compressor::compressors::alp::ALPCompressor;
21-
use vortex::sampling_compressor::compressors::alp_rd::ALPRDCompressor;
22-
use vortex::sampling_compressor::compressors::bitpacked::BITPACK_WITH_PATCHES;
23-
use vortex::sampling_compressor::compressors::date_time_parts::DateTimePartsCompressor;
24-
use vortex::sampling_compressor::compressors::dict::DictCompressor;
25-
use vortex::sampling_compressor::compressors::fsst::FSSTCompressor;
26-
use vortex::sampling_compressor::compressors::r#for::FoRCompressor;
27-
use vortex::sampling_compressor::compressors::roaring_bool::RoaringBoolCompressor;
28-
use vortex::sampling_compressor::compressors::runend::DEFAULT_RUN_END_COMPRESSOR;
29-
use vortex::sampling_compressor::compressors::sparse::SparseCompressor;
30-
use vortex::sampling_compressor::compressors::CompressorRef;
3119
use vortex::sampling_compressor::SamplingCompressor;
3220
use vortex::{Array, Context, IntoArray};
3321

@@ -51,22 +39,6 @@ pub static CTX: LazyLock<Arc<Context>> = LazyLock::new(|| {
5139
)
5240
});
5341

54-
pub static COMPRESSORS: LazyLock<HashSet<CompressorRef<'static>>> = LazyLock::new(|| {
55-
[
56-
&ALPCompressor as CompressorRef<'static>,
57-
&ALPRDCompressor,
58-
&DictCompressor,
59-
&BITPACK_WITH_PATCHES,
60-
&FoRCompressor,
61-
&FSSTCompressor,
62-
&DateTimePartsCompressor,
63-
&DEFAULT_RUN_END_COMPRESSOR,
64-
&RoaringBoolCompressor,
65-
&SparseCompressor,
66-
]
67-
.into()
68-
});
69-
7042
/// Creates a file if it doesn't already exist.
7143
/// NB: Does NOT modify the given path to ensure that it resides in the data directory.
7244
pub fn idempotent<T, E, P: IdempotentPath + ?Sized>(
@@ -172,10 +144,7 @@ pub fn fetch_taxi_data() -> Array {
172144
}
173145

174146
pub fn compress_taxi_data() -> Array {
175-
let uncompressed = fetch_taxi_data();
176-
let compressor: &dyn CompressionStrategy = &SamplingCompressor::new(COMPRESSORS.clone());
177-
178-
compressor.compress(&uncompressed).unwrap()
147+
CompressionStrategy::compress(&SamplingCompressor::default(), &fetch_taxi_data()).unwrap()
179148
}
180149

181150
pub struct CompressionRunStats {
@@ -235,7 +204,7 @@ mod test {
235204
use vortex::{Array, IntoCanonical};
236205

237206
use crate::taxi_data::taxi_data_parquet;
238-
use crate::{compress_taxi_data, setup_logger, COMPRESSORS};
207+
use crate::{compress_taxi_data, setup_logger};
239208

240209
#[ignore]
241210
#[test]
@@ -268,7 +237,7 @@ mod test {
268237
let file = File::open(taxi_data_parquet()).unwrap();
269238
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
270239
let reader = builder.with_limit(1).build().unwrap();
271-
let compressor: &dyn CompressionStrategy = &SamplingCompressor::new(COMPRESSORS.clone());
240+
let compressor: &dyn CompressionStrategy = &SamplingCompressor::default();
272241

273242
for record_batch in reader.map(|batch_result| batch_result.unwrap()) {
274243
let struct_arrow: ArrowStructArray = record_batch.into();

bench-vortex/src/public_bi_data.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use tokio::fs::File;
1313
use vortex::aliases::hash_map::HashMap;
1414
use vortex::array::ChunkedArray;
1515
use vortex::error::VortexResult;
16-
use vortex::{Array, ArrayDType, ArrayTrait, IntoArray};
16+
use vortex::{Array, ArrayDType, IntoArray};
1717

1818
use crate::data_downloads::{decompress_bz2, download_data, BenchmarkDataset, FileType};
1919
use crate::public_bi_data::PBIDataset::*;

bench-vortex/src/reader.rs

Lines changed: 47 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use arrow_array::{
1111
};
1212
use arrow_select::concat::concat_batches;
1313
use arrow_select::take::take_record_batch;
14-
use bytes::{Bytes, BytesMut};
1514
use futures::stream;
1615
use itertools::Itertools;
1716
use log::info;
@@ -23,22 +22,17 @@ use parquet::file::metadata::RowGroupMetaData;
2322
use serde::{Deserialize, Serialize};
2423
use stream::StreamExt;
2524
use vortex::aliases::hash_map::HashMap;
26-
use vortex::array::{ChunkedArray, PrimitiveArray};
25+
use vortex::array::ChunkedArray;
2726
use vortex::arrow::FromArrowType;
28-
use vortex::buffer::Buffer;
2927
use vortex::compress::CompressionStrategy;
3028
use vortex::dtype::DType;
31-
use vortex::error::{vortex_err, VortexResult};
32-
use vortex::sampling_compressor::SamplingCompressor;
33-
use vortex::serde::chunked_reader::ChunkedArrayReader;
34-
use vortex::serde::io::{ObjectStoreExt, VortexReadAt, VortexWrite};
35-
use vortex::serde::stream_reader::StreamArrayReader;
36-
use vortex::serde::stream_writer::StreamArrayWriter;
37-
use vortex::serde::DTypeReader;
38-
use vortex::stream::ArrayStreamExt;
39-
use vortex::{Array, ArrayDType, IntoArray, IntoCanonical};
40-
41-
use crate::{COMPRESSORS, CTX};
29+
use vortex::error::VortexResult;
30+
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
31+
use vortex::serde::io::{ObjectStoreReadAt, VortexReadAt, VortexWrite};
32+
use vortex::serde::layouts::{
33+
LayoutBatchStreamBuilder, LayoutContext, LayoutDeserializer, LayoutWriter,
34+
};
35+
use vortex::{Array, IntoArray, IntoCanonical};
4236

4337
pub const BATCH_SIZE: usize = 65_536;
4438

@@ -51,15 +45,18 @@ pub struct VortexFooter {
5145

5246
pub async fn open_vortex(path: &Path) -> VortexResult<Array> {
5347
let file = tokio::fs::File::open(path).await.unwrap();
54-
let reader = StreamArrayReader::try_new(file, CTX.clone())
55-
.await?
56-
.load_dtype()
57-
.await?;
58-
reader
59-
.into_array_stream()
60-
.collect_chunked()
61-
.await
62-
.map(IntoArray::into_array)
48+
49+
LayoutBatchStreamBuilder::new(
50+
file,
51+
LayoutDeserializer::new(
52+
ALL_ENCODINGS_CONTEXT.clone(),
53+
LayoutContext::default().into(),
54+
),
55+
)
56+
.build()
57+
.await?
58+
.read_all()
59+
.await
6360
}
6461

6562
pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
@@ -68,24 +65,11 @@ pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
6865
) -> VortexResult<()> {
6966
let chunked = compress_parquet_to_vortex(parquet_path.as_path())?;
7067

71-
let written = StreamArrayWriter::new(write)
72-
.write_array_stream(chunked.array_stream())
68+
LayoutWriter::new(write)
69+
.write_array_columns(chunked)
70+
.await?
71+
.finalize()
7372
.await?;
74-
75-
let layout = written.array_layouts()[0].clone();
76-
let mut w = written.into_inner();
77-
let mut s = flexbuffers::FlexbufferSerializer::new();
78-
VortexFooter {
79-
byte_offsets: layout.chunks.byte_offsets,
80-
row_offsets: layout.chunks.row_offsets,
81-
dtype_range: layout.dtype.begin..layout.dtype.end,
82-
}
83-
.serialize(&mut s)?;
84-
let footer_bytes = Buffer::from(Bytes::from(s.take_buffer()));
85-
let footer_len = footer_bytes.len() as u64;
86-
w.write_all(footer_bytes).await?;
87-
w.write_all(footer_len.to_le_bytes()).await?;
88-
8973
Ok(())
9074
}
9175

@@ -102,17 +86,9 @@ pub fn read_parquet_to_vortex<P: AsRef<Path>>(parquet_path: P) -> VortexResult<C
10286
ChunkedArray::try_new(chunks, dtype)
10387
}
10488

105-
pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult<ChunkedArray> {
89+
pub fn compress_parquet_to_vortex(parquet_path: &Path) -> VortexResult<Array> {
10690
let chunked = read_parquet_to_vortex(parquet_path)?;
107-
let compressor: &dyn CompressionStrategy = &SamplingCompressor::new(COMPRESSORS.clone());
108-
let dtype = chunked.dtype().clone();
109-
ChunkedArray::try_new(
110-
chunked
111-
.chunks()
112-
.map(|x| compressor.compress(&x))
113-
.collect::<VortexResult<Vec<_>>>()?,
114-
dtype,
115-
)
91+
CompressionStrategy::compress(&SamplingCompressor::default(), &chunked.into_array())
11692
}
11793

11894
pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResult<()> {
@@ -134,64 +110,37 @@ pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResu
134110
Ok(())
135111
}
136112

137-
pub async fn read_vortex_footer_format<R: VortexReadAt>(
138-
reader: R,
139-
len: u64,
140-
) -> VortexResult<ChunkedArrayReader<R>> {
141-
let mut buf = BytesMut::with_capacity(8);
142-
unsafe { buf.set_len(8) }
143-
buf = reader.read_at_into(len - 8, buf).await?;
144-
let footer_len = u64::from_le_bytes(buf.as_ref().try_into().unwrap()) as usize;
145-
146-
buf.reserve(footer_len - buf.len());
147-
unsafe { buf.set_len(footer_len) }
148-
buf = reader
149-
.read_at_into(len - footer_len as u64 - 8, buf)
150-
.await?;
151-
152-
let footer: VortexFooter = VortexFooter::deserialize(
153-
flexbuffers::Reader::get_root(buf.as_ref()).map_err(|e| vortex_err!("{}", e))?,
154-
)?;
155-
156-
let header_len = (footer.dtype_range.end - footer.dtype_range.start) as usize;
157-
buf.reserve(header_len - buf.len());
158-
unsafe { buf.set_len(header_len) }
159-
buf = reader.read_at_into(footer.dtype_range.start, buf).await?;
160-
let dtype = DTypeReader::new(buf).await?.read_dtype().await?;
161-
162-
ChunkedArrayReader::try_new(
113+
async fn take_vortex<T: VortexReadAt + Unpin + 'static>(
114+
reader: T,
115+
indices: &[u64],
116+
) -> VortexResult<Array> {
117+
LayoutBatchStreamBuilder::new(
163118
reader,
164-
CTX.clone(),
165-
dtype.into(),
166-
PrimitiveArray::from(footer.byte_offsets).into_array(),
167-
PrimitiveArray::from(footer.row_offsets).into_array(),
119+
LayoutDeserializer::new(
120+
ALL_ENCODINGS_CONTEXT.clone(),
121+
LayoutContext::default().into(),
122+
),
168123
)
124+
.with_indices(Array::from(indices.to_vec()))
125+
.build()
126+
.await?
127+
.read_all()
128+
.await
129+
// For equivalence.... we decompress to make sure we're not cheating too much.
130+
.and_then(IntoCanonical::into_canonical)
131+
.map(Array::from)
169132
}
170133

171134
pub async fn take_vortex_object_store(
172-
fs: &Arc<dyn ObjectStore>,
173-
path: &object_store::path::Path,
135+
fs: Arc<dyn ObjectStore>,
136+
path: object_store::path::Path,
174137
indices: &[u64],
175138
) -> VortexResult<Array> {
176-
let head = fs.head(path).await?;
177-
let indices_array = indices.to_vec().into_array();
178-
let taken = read_vortex_footer_format(fs.vortex_reader(path), head.size as u64)
179-
.await?
180-
.take_rows(&indices_array)
181-
.await?;
182-
// For equivalence.... we flatten to make sure we're not cheating too much.
183-
Ok(taken.into_canonical()?.into())
139+
take_vortex(ObjectStoreReadAt::new(fs.clone(), path), indices).await
184140
}
185141

186142
pub async fn take_vortex_tokio(path: &Path, indices: &[u64]) -> VortexResult<Array> {
187-
let len = File::open(path)?.metadata()?.len();
188-
let indices_array = indices.to_vec().into_array();
189-
let taken = read_vortex_footer_format(tokio::fs::File::open(path).await?, len)
190-
.await?
191-
.take_rows(&indices_array)
192-
.await?;
193-
// For equivalence.... we flatten to make sure we're not cheating too much.
194-
Ok(taken.into_canonical()?.into())
143+
take_vortex(tokio::fs::File::open(path).await?, indices).await
195144
}
196145

197146
pub async fn take_parquet_object_store(

pyvortex/src/dataset.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use vortex::arrow::infer_schema;
1212
use vortex::dtype::field::Field;
1313
use vortex::dtype::DType;
1414
use vortex::error::VortexResult;
15-
use vortex::sampling_compressor::ALL_COMPRESSORS_CONTEXT;
15+
use vortex::sampling_compressor::ALL_ENCODINGS_CONTEXT;
1616
use vortex::serde::io::{ObjectStoreReadAt, VortexReadAt};
1717
use vortex::serde::layouts::{
1818
LayoutBatchStream, LayoutBatchStreamBuilder, LayoutContext, LayoutDescriptorReader,
@@ -33,7 +33,7 @@ pub async fn layout_stream_from_reader<T: VortexReadAt + Unpin>(
3333
let mut builder = LayoutBatchStreamBuilder::new(
3434
reader,
3535
LayoutDeserializer::new(
36-
ALL_COMPRESSORS_CONTEXT.clone(),
36+
ALL_ENCODINGS_CONTEXT.clone(),
3737
LayoutContext::default().into(),
3838
),
3939
)
@@ -64,7 +64,7 @@ pub async fn read_array_from_reader<T: VortexReadAt + Unpin + 'static>(
6464

6565
pub async fn read_dtype_from_reader<T: VortexReadAt + Unpin>(reader: T) -> VortexResult<DType> {
6666
LayoutDescriptorReader::new(LayoutDeserializer::new(
67-
ALL_COMPRESSORS_CONTEXT.clone(),
67+
ALL_ENCODINGS_CONTEXT.clone(),
6868
LayoutContext::default().into(),
6969
))
7070
.read_footer(&reader, reader.size().await)

vortex-sampling-compressor/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ pub static FASTEST_COMPRESSORS: LazyLock<[CompressorRef<'static>; 7]> = LazyLock
7575
]
7676
});
7777

78-
pub static ALL_COMPRESSORS_CONTEXT: LazyLock<Arc<Context>> = LazyLock::new(|| {
78+
pub static ALL_ENCODINGS_CONTEXT: LazyLock<Arc<Context>> = LazyLock::new(|| {
7979
Arc::new(Context::default().with_encodings([
8080
&ALPEncoding as EncodingRef,
8181
&ByteBoolEncoding,

0 commit comments

Comments
 (0)