Skip to content

Commit 193d991

Browse files
authored
Vortex layout strategy (#2217)
Just a quick hack together of a layout strategy that includes size+row-based chunking and the sampling compressor.
1 parent 4fb5514 commit 193d991

File tree

33 files changed

+365
-500
lines changed

33 files changed

+365
-500
lines changed

Cargo.lock

Lines changed: 3 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bench-vortex/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ tokio = { workspace = true, features = ["full"] }
6363
uuid = { workspace = true, features = ["v4"] }
6464
vortex = { workspace = true, features = ["object_store", "parquet"] }
6565
vortex-datafusion = { workspace = true }
66-
vortex-mask = { workspace = true }
6766
xshell = { workspace = true }
6867

6968
[dev-dependencies]

bench-vortex/benches/sel_vec.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use vortex::array::PrimitiveArray;
77
use vortex::compute::filter;
88
use vortex::dtype::{DType, Nullability, PType};
99
use vortex::encodings::alp::{ALPArray, ALPEncoding};
10+
use vortex::mask::Mask;
1011
use vortex::sampling_compressor::compressors::alp::ALPCompressor;
1112
use vortex::sampling_compressor::compressors::bitpacked::{
1213
BitPackedCompressor, BITPACK_NO_PATCHES, BITPACK_WITH_PATCHES,
@@ -16,7 +17,6 @@ use vortex::sampling_compressor::compressors::EncodingCompressor;
1617
use vortex::sampling_compressor::SamplingCompressor;
1718
use vortex::variants::PrimitiveArrayTrait;
1819
use vortex::{Array, Encoding, IntoArray, IntoCanonical};
19-
use vortex_mask::Mask;
2020

2121
// criterion benchmark setup:
2222
fn bench_sel_vec(c: &mut Criterion) {

bench-vortex/src/bin/clickbench.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -102,16 +102,9 @@ fn main() {
102102
});
103103

104104
let formats = if args.only_vortex {
105-
vec![Format::OnDiskVortex {
106-
enable_compression: true,
107-
}]
105+
vec![Format::OnDiskVortex]
108106
} else {
109-
vec![
110-
Format::Parquet,
111-
Format::OnDiskVortex {
112-
enable_compression: true,
113-
},
114-
]
107+
vec![Format::Parquet, Format::OnDiskVortex]
115108
};
116109

117110
let queries = match args.queries.clone() {
@@ -140,9 +133,7 @@ fn main() {
140133
.await
141134
.unwrap()
142135
}),
143-
Format::OnDiskVortex {
144-
enable_compression: true,
145-
} => {
136+
Format::OnDiskVortex => {
146137
runtime.block_on(async {
147138
clickbench::register_vortex_files(
148139
context.clone(),

bench-vortex/src/bin/tpch_benchmark.rs

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -87,20 +87,9 @@ async fn bench_main(
8787

8888
// The formats to run against (vs the baseline)
8989
let formats = if only_vortex {
90-
vec![
91-
Format::Arrow,
92-
Format::OnDiskVortex {
93-
enable_compression: true,
94-
},
95-
]
90+
vec![Format::Arrow, Format::OnDiskVortex]
9691
} else {
97-
vec![
98-
Format::Arrow,
99-
Format::Parquet,
100-
Format::OnDiskVortex {
101-
enable_compression: true,
102-
},
103-
]
92+
vec![Format::Arrow, Format::Parquet, Format::OnDiskVortex]
10493
};
10594

10695
// Load datasets

bench-vortex/src/clickbench.rs

Lines changed: 14 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,12 @@ use datafusion::datasource::listing::{
99
use datafusion::prelude::{ParquetReadOptions, SessionContext};
1010
use futures::{stream, StreamExt, TryStreamExt};
1111
use tokio::fs::{create_dir_all, OpenOptions};
12-
use vortex::aliases::hash_map::HashMap;
13-
use vortex::array::{ChunkedArray, StructArray};
12+
use vortex::arrow::FromArrowType;
1413
use vortex::dtype::DType;
15-
use vortex::error::vortex_err;
14+
use vortex::error::{vortex_err, VortexError};
1615
use vortex::file::{VortexWriteOptions, VORTEX_FILE_EXTENSION};
17-
use vortex::sampling_compressor::SamplingCompressor;
18-
use vortex::variants::StructArrayTrait;
19-
use vortex::{Array, IntoArray, IntoArrayVariant};
16+
use vortex::stream::ArrayStreamAdapter;
17+
use vortex::Array;
2018
use vortex_datafusion::persistent::VortexFormat;
2119

2220
use crate::{idempotent_async, CTX};
@@ -156,7 +154,6 @@ pub async fn register_vortex_files(
156154
.join(format!("hits_{idx}.parquet"));
157155
let output_path = vortex_dir.join(format!("hits_{idx}.{VORTEX_FILE_EXTENSION}"));
158156
let session = session.clone();
159-
let schema = schema.clone();
160157

161158
tokio::spawn(async move {
162159
let output_path = output_path.clone();
@@ -168,49 +165,17 @@ pub async fn register_vortex_files(
168165
ParquetReadOptions::default(),
169166
)
170167
.await?
171-
.collect()
168+
.execute_stream()
172169
.await?;
173170

174-
// Create a ChunkedArray from the set of chunks.
175-
let sts = record_batches
176-
.into_iter()
177-
.map(Array::try_from)
178-
.map(|a| a.unwrap().into_struct().unwrap())
179-
.collect::<Vec<_>>();
180-
181-
let mut arrays_map: HashMap<Arc<str>, Vec<Array>> = HashMap::default();
182-
let mut types_map: HashMap<Arc<str>, DType> = HashMap::default();
183-
184-
for st in sts.into_iter() {
185-
let struct_dtype = st.dtype().as_struct().unwrap();
186-
let names = struct_dtype.names().iter();
187-
let types = struct_dtype.fields();
188-
189-
for (field_name, field_type) in names.zip(types) {
190-
let val = arrays_map.entry(field_name.clone()).or_default();
191-
val.push(st.maybe_null_field_by_name(field_name.as_ref()).unwrap());
192-
193-
types_map.insert(field_name.clone(), field_type.clone());
194-
}
195-
}
196-
197-
let fields = schema
198-
.fields()
199-
.iter()
200-
.map(|field| {
201-
let name: Arc<str> = field.name().as_str().into();
202-
let dtype = types_map[&name].clone();
203-
let chunks = arrays_map.remove(&name).unwrap();
204-
let chunked_child = ChunkedArray::try_new(chunks, dtype).unwrap();
205-
206-
(name, chunked_child.into_array())
207-
})
208-
.collect::<Vec<_>>();
209-
210-
let data = StructArray::from_fields(&fields)?.into_array();
211-
212-
let compressor = SamplingCompressor::default();
213-
let data = compressor.compress(&data, None)?.into_array();
171+
// Convert the arrow schema to a Vortex DType
172+
let array_stream = ArrayStreamAdapter::new(
173+
// TODO(ngates): or should we use the provided schema?
174+
DType::from_arrow(record_batches.schema()),
175+
record_batches.map(|batch| {
176+
batch.map_err(VortexError::from).and_then(Array::try_from)
177+
}),
178+
);
214179

215180
let f = OpenOptions::new()
216181
.write(true)
@@ -219,9 +184,7 @@ pub async fn register_vortex_files(
219184
.open(&vtx_file)
220185
.await?;
221186

222-
VortexWriteOptions::default()
223-
.write(f, data.into_array_stream())
224-
.await?;
187+
VortexWriteOptions::default().write(f, array_stream).await?;
225188

226189
anyhow::Ok(())
227190
})

bench-vortex/src/data_downloads.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ pub fn decompress_bz2(input_path: PathBuf, output_path: PathBuf) -> PathBuf {
9494
pub trait BenchmarkDataset {
9595
fn as_uncompressed(&self);
9696
fn to_vortex_array(&self) -> VortexResult<Array>;
97-
fn compress_to_vortex(&self) -> VortexResult<()>;
9897
fn write_as_parquet(&self);
9998
fn write_as_vortex(&self) -> impl Future<Output = ()>;
10099
fn list_files(&self, file_type: FileType) -> Vec<PathBuf>;

bench-vortex/src/lib.rs

Lines changed: 4 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,6 @@ pub mod taxi_data;
4848
pub mod tpch;
4949
pub mod vortex_utils;
5050

51-
// Sizes match default compressor configuration
52-
const TARGET_BLOCK_BYTESIZE: usize = 16 * (1 << 20);
53-
const TARGET_BLOCK_SIZE: usize = 64 * (1 << 10);
54-
5551
#[macro_export]
5652
macro_rules! feature_flagged_allocator {
5753
() => {
@@ -81,7 +77,7 @@ pub enum Format {
8177
Arrow,
8278
Parquet,
8379
InMemoryVortex,
84-
OnDiskVortex { enable_compression: bool },
80+
OnDiskVortex,
8581
}
8682

8783
impl std::fmt::Display for Format {
@@ -93,8 +89,8 @@ impl std::fmt::Display for Format {
9389
Format::InMemoryVortex => {
9490
write!(f, "in_memory_vortex")
9591
}
96-
Format::OnDiskVortex { enable_compression } => {
97-
write!(f, "on_disk_vortex(compressed={enable_compression})")
92+
Format::OnDiskVortex => {
93+
write!(f, "on_disk_vortex(compressed=true)")
9894
}
9995
}
10096
}
@@ -107,12 +103,7 @@ impl Format {
107103
Format::Arrow => "arrow".to_string(),
108104
Format::Parquet => "parquet".to_string(),
109105
Format::InMemoryVortex => "vortex-in-memory".to_string(),
110-
Format::OnDiskVortex { enable_compression } => if *enable_compression {
111-
"vortex-file-compressed"
112-
} else {
113-
"vortex-file-uncompressed"
114-
}
115-
.to_string(),
106+
Format::OnDiskVortex => "vortex-file-compressed".to_string(),
116107
}
117108
}
118109
}
@@ -404,65 +395,3 @@ pub fn generate_struct_of_list_of_ints_array(
404395
DType::Struct(struct_dtype.clone(), Nullability::NonNullable),
405396
)
406397
}
407-
408-
#[cfg(test)]
409-
mod test {
410-
use std::fs::File;
411-
use std::ops::Deref;
412-
use std::sync::Arc;
413-
414-
use arrow_array::{ArrayRef as ArrowArrayRef, StructArray as ArrowStructArray};
415-
use log::LevelFilter;
416-
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
417-
use vortex::arrow::{FromArrowArray, IntoArrowArray};
418-
use vortex::compress::CompressionStrategy;
419-
use vortex::sampling_compressor::SamplingCompressor;
420-
use vortex::Array;
421-
422-
use crate::taxi_data::taxi_data_parquet;
423-
use crate::{compress_taxi_data, setup_logger};
424-
425-
#[ignore]
426-
#[test]
427-
fn compression_ratio() {
428-
setup_logger(LevelFilter::Debug);
429-
_ = compress_taxi_data();
430-
}
431-
432-
#[ignore]
433-
#[test]
434-
fn round_trip_arrow() {
435-
let file = File::open(taxi_data_parquet()).unwrap();
436-
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
437-
let reader = builder.with_limit(1).build().unwrap();
438-
439-
for record_batch in reader.map(|batch_result| batch_result.unwrap()) {
440-
let struct_arrow: ArrowStructArray = record_batch.into();
441-
let arrow_array: ArrowArrayRef = Arc::new(struct_arrow);
442-
let vortex_array = Array::from_arrow(arrow_array.clone(), false);
443-
let vortex_as_arrow = vortex_array.into_arrow_preferred().unwrap();
444-
assert_eq!(vortex_as_arrow.deref(), arrow_array.deref());
445-
}
446-
}
447-
448-
// Ignoring since Struct arrays don't currently support equality.
449-
// https://github.com/apache/arrow-rs/issues/5199
450-
#[ignore]
451-
#[test]
452-
fn round_trip_arrow_compressed() {
453-
let file = File::open(taxi_data_parquet()).unwrap();
454-
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
455-
let reader = builder.with_limit(1).build().unwrap();
456-
let compressor: &dyn CompressionStrategy = &SamplingCompressor::default();
457-
458-
for record_batch in reader.map(|batch_result| batch_result.unwrap()) {
459-
let struct_arrow: ArrowStructArray = record_batch.into();
460-
let arrow_array: ArrowArrayRef = Arc::new(struct_arrow);
461-
let vortex_array = Array::from_arrow(arrow_array.clone(), false);
462-
463-
let compressed = compressor.compress(&vortex_array).unwrap();
464-
let compressed_as_arrow = compressed.into_arrow_preferred().unwrap();
465-
assert_eq!(compressed_as_arrow.deref(), arrow_array.deref());
466-
}
467-
}
468-
}

bench-vortex/src/public_bi_data.rs

Lines changed: 7 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,13 @@ use reqwest::Url;
1212
use tokio::fs::File;
1313
use vortex::aliases::hash_map::HashMap;
1414
use vortex::array::ChunkedArray;
15-
use vortex::error::VortexResult;
15+
use vortex::error::{VortexExpect, VortexResult};
1616
use vortex::{Array, IntoArray};
1717

1818
use crate::data_downloads::{decompress_bz2, download_data, BenchmarkDataset, FileType};
1919
use crate::public_bi_data::PBIDataset::*;
2020
use crate::reader::{
21-
compress_parquet_to_vortex, open_vortex, read_parquet_to_vortex, rewrite_parquet_as_vortex,
22-
write_csv_as_parquet,
21+
open_vortex, read_parquet_to_vortex, rewrite_parquet_as_vortex, write_csv_as_parquet,
2322
};
2423
use crate::{idempotent, IdempotentPath};
2524

@@ -561,34 +560,13 @@ impl BenchmarkDataset for BenchmarkDatasets {
561560
let arrays = self
562561
.list_files(FileType::Parquet)
563562
.iter()
564-
.map(|f| read_parquet_to_vortex(f.as_path()))
563+
.flat_map(|f| {
564+
read_parquet_to_vortex(f.as_path()).vortex_expect("Failed to read parquet")
565+
})
565566
.collect::<VortexResult<Vec<_>>>()?;
566-
assert!(!arrays.is_empty());
567-
let dtype = arrays[0].dtype().clone();
568-
ChunkedArray::try_new(
569-
arrays.iter().flat_map(|x| x.chunks()).collect::<Vec<_>>(),
570-
dtype,
571-
)
572-
.map(|x| x.into_array())
573-
}
574-
575-
fn compress_to_vortex(&self) -> VortexResult<()> {
576-
self.write_as_parquet();
577-
for f in self.list_files(FileType::Parquet) {
578-
info!(
579-
"Compressing and writing {} to vortex",
580-
f.to_str().unwrap_or("None")
581-
);
582-
let from_vortex = compress_parquet_to_vortex(f.as_path())?;
583-
let vx_size = from_vortex.nbytes();
584567

585-
info!(
586-
"Vortex size: {}, {}B",
587-
format_size(vx_size as u64, DECIMAL),
588-
vx_size
589-
);
590-
}
591-
Ok(())
568+
assert!(!arrays.is_empty());
569+
Ok(ChunkedArray::from_iter(arrays).into_array())
592570
}
593571

594572
fn write_as_parquet(&self) {

0 commit comments

Comments
 (0)