Skip to content

Commit dfdeaf0

Browse files
Cutover to Vortex Layouts (#1899)
Given this branch now builds, it's worth keeping it that way while we port functionality into the new layouts. --------- Co-authored-by: Joe Isaacs <[email protected]>
1 parent fc6deac commit dfdeaf0

File tree

23 files changed

+371
-279
lines changed

23 files changed

+371
-279
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bench-vortex/src/clickbench.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ use datafusion::datasource::listing::{
77
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
88
};
99
use datafusion::prelude::{ParquetReadOptions, SessionContext};
10-
use futures::{stream, StreamExt, TryStreamExt};
10+
use futures::executor::block_on;
11+
use itertools::Itertools;
12+
use rayon::prelude::*;
1113
use tokio::fs::{create_dir_all, OpenOptions};
1214
use vortex::aliases::hash_map::HashMap;
1315
use vortex::array::{ChunkedArray, StructArray};
1416
use vortex::dtype::DType;
1517
use vortex::error::vortex_err;
16-
use vortex::file::{VortexFileWriter, VORTEX_FILE_EXTENSION};
18+
use vortex::file::v2::VortexWriteOptions;
19+
use vortex::file::VORTEX_FILE_EXTENSION;
1720
use vortex::sampling_compressor::SamplingCompressor;
1821
use vortex::variants::StructArrayTrait;
1922
use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
@@ -149,7 +152,9 @@ pub async fn register_vortex_files(
149152
let vortex_dir = input_path.join("vortex");
150153
create_dir_all(&vortex_dir).await?;
151154

152-
stream::iter(0..100)
155+
(0..100)
156+
.collect_vec()
157+
.par_iter()
153158
.map(|idx| {
154159
let parquet_file_path = input_path
155160
.join("parquet")
@@ -158,7 +163,7 @@ pub async fn register_vortex_files(
158163
let session = session.clone();
159164
let schema = schema.clone();
160165

161-
tokio::spawn(async move {
166+
block_on(async move {
162167
let output_path = output_path.clone();
163168
idempotent_async(&output_path, move |vtx_file| async move {
164169
eprintln!("Processing file {idx}");
@@ -219,19 +224,17 @@ pub async fn register_vortex_files(
219224
.open(&vtx_file)
220225
.await?;
221226

222-
let mut writer = VortexFileWriter::new(f);
223-
writer = writer.write_array_columns(data).await?;
224-
writer.finalize().await?;
227+
VortexWriteOptions::default()
228+
.write(f, data.into_array_stream())
229+
.await?;
225230

226231
anyhow::Ok(())
227232
})
228233
.await
229234
.expect("Failed to write Vortex file")
230235
})
231236
})
232-
.buffered(16)
233-
.try_collect::<Vec<_>>()
234-
.await?;
237+
.collect::<Vec<_>>();
235238

236239
let format = Arc::new(VortexFormat::new(CTX.clone()));
237240
let table_path = vortex_dir

bench-vortex/src/reader.rs

Lines changed: 26 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::fs::File;
22
use std::ops::Range;
33
use std::path::{Path, PathBuf};
44
use std::process::Command;
5-
use std::sync::{Arc, LazyLock};
5+
use std::sync::Arc;
66

77
use arrow_array::types::Int64Type;
88
use arrow_array::{
@@ -24,18 +24,16 @@ use stream::StreamExt;
2424
use vortex::aliases::hash_map::HashMap;
2525
use vortex::array::ChunkedArray;
2626
use vortex::arrow::FromArrowType;
27-
use vortex::buffer::Buffer;
2827
use vortex::compress::CompressionStrategy;
2928
use vortex::dtype::DType;
3029
use vortex::error::VortexResult;
31-
use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder};
32-
use vortex::io::{IoDispatcher, ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite};
30+
use vortex::file::v2::{VortexOpenOptions, VortexWriteOptions};
31+
use vortex::io::{ObjectStoreReadAt, TokioFile, VortexReadAt, VortexWrite};
3332
use vortex::sampling_compressor::{SamplingCompressor, ALL_ENCODINGS_CONTEXT};
33+
use vortex::scan::Scan;
34+
use vortex::stream::ArrayStreamExt;
3435
use vortex::{ArrayData, IntoArrayData, IntoCanonical};
3536

36-
static DISPATCHER: LazyLock<Arc<IoDispatcher>> =
37-
LazyLock::new(|| Arc::new(IoDispatcher::default()));
38-
3937
pub const BATCH_SIZE: usize = 65_536;
4038

4139
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -48,19 +46,12 @@ pub struct VortexFooter {
4846
pub async fn open_vortex(path: &Path) -> VortexResult<ArrayData> {
4947
let file = TokioFile::open(path).unwrap();
5048

51-
VortexReadBuilder::new(
52-
file,
53-
LayoutDeserializer::new(
54-
ALL_ENCODINGS_CONTEXT.clone(),
55-
LayoutContext::default().into(),
56-
),
57-
)
58-
.with_io_dispatcher(DISPATCHER.clone())
59-
.build()
60-
.await?
61-
.into_stream()
62-
.read_all()
63-
.await
49+
VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone())
50+
.open(file)
51+
.await?
52+
.scan(Scan::all())?
53+
.into_array_data()
54+
.await
6455
}
6556

6657
pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
@@ -69,11 +60,10 @@ pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
6960
) -> VortexResult<()> {
7061
let chunked = compress_parquet_to_vortex(parquet_path.as_path())?;
7162

72-
VortexFileWriter::new(write)
73-
.write_array_columns(chunked)
74-
.await?
75-
.finalize()
63+
VortexWriteOptions::default()
64+
.write(write, chunked.into_array_stream())
7665
.await?;
66+
7767
Ok(())
7868
}
7969

@@ -116,25 +106,19 @@ pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResu
116106

117107
async fn take_vortex<T: VortexReadAt + Unpin + 'static>(
118108
reader: T,
119-
indices: &[u64],
109+
_indices: &[u64],
120110
) -> VortexResult<ArrayData> {
121-
VortexReadBuilder::new(
122-
reader,
123-
LayoutDeserializer::new(
124-
ALL_ENCODINGS_CONTEXT.clone(),
125-
LayoutContext::default().into(),
126-
),
127-
)
128-
.with_io_dispatcher(DISPATCHER.clone())
129-
.with_indices(Buffer::copy_from(indices).into_array())
130-
.build()
131-
.await?
132-
.into_stream()
133-
.read_all()
134-
.await
135-
// For equivalence.... we decompress to make sure we're not cheating too much.
136-
.and_then(IntoCanonical::into_canonical)
137-
.map(ArrayData::from)
111+
VortexOpenOptions::new(ALL_ENCODINGS_CONTEXT.clone())
112+
.open(reader)
113+
.await?
114+
// FIXME(ngates): support row indices
115+
// .scan_rows(Scan::all(), indices.iter().copied())?
116+
.scan(Scan::all())?
117+
.into_array_data()
118+
.await?
119+
// For equivalence.... we decompress to make sure we're not cheating too much.
120+
.into_canonical()
121+
.map(ArrayData::from)
138122
}
139123

140124
pub async fn take_vortex_object_store(

bench-vortex/src/tpch/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use vortex::aliases::hash_map::HashMap;
1616
use vortex::array::{ChunkedArray, StructArray};
1717
use vortex::arrow::FromArrowArray;
1818
use vortex::dtype::DType;
19-
use vortex::file::{VortexFileWriter, VORTEX_FILE_EXTENSION};
19+
use vortex::file::VORTEX_FILE_EXTENSION;
2020
use vortex::sampling_compressor::SamplingCompressor;
2121
use vortex::variants::StructArrayTrait;
2222
use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
@@ -31,6 +31,7 @@ mod execute;
3131
pub mod schema;
3232

3333
pub use execute::*;
34+
use vortex::file::v2::VortexWriteOptions;
3435

3536
pub const EXPECTED_ROW_COUNTS: [usize; 23] = [
3637
0, 4, 460, 11620, 5, 5, 1, 4, 2, 175, 37967, 1048, 2, 42, 1, 1, 18314, 1, 57, 1, 186, 411, 7,
@@ -275,9 +276,9 @@ async fn register_vortex_file(
275276
.open(&vtx_file)
276277
.await?;
277278

278-
let mut writer = VortexFileWriter::new(f);
279-
writer = writer.write_array_columns(data).await?;
280-
writer.finalize().await?;
279+
VortexWriteOptions::default()
280+
.write(f, data.into_array_stream())
281+
.await?;
281282

282283
anyhow::Ok(())
283284
})

vortex-array/src/array/chunked/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::fmt::{Debug, Display};
66

77
use futures_util::stream;
88
use serde::{Deserialize, Serialize};
9-
use vortex_buffer::Buffer;
9+
use vortex_buffer::BufferMut;
1010
use vortex_dtype::{DType, Nullability, PType};
1111
use vortex_error::{vortex_bail, vortex_panic, VortexExpect as _, VortexResult, VortexUnwrap};
1212

@@ -51,7 +51,8 @@ impl ChunkedArray {
5151
}
5252
}
5353

54-
let chunk_offsets = Buffer::from_iter(
54+
let mut chunk_offsets = BufferMut::<u64>::with_capacity(chunks.len() + 1);
55+
chunk_offsets.extend(
5556
[0u64]
5657
.into_iter()
5758
.chain(chunks.iter().map(|c| c.len() as u64))

vortex-array/src/array/varbinview/compute/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ impl TakeFn<VarBinViewArray> for VarBinViewEncoding {
5757
fn take(&self, array: &VarBinViewArray, indices: &ArrayData) -> VortexResult<ArrayData> {
5858
// Compute the new validity
5959
let validity = array.validity().take(indices)?;
60-
6160
let indices = indices.clone().into_primitive()?;
6261

6362
let views_buffer = match_each_integer_ptype!(indices.ptype(), |$I| {

vortex-buffer/src/buffer_mut.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,19 @@ impl<T> BufferMut<T> {
258258
T: Copy,
259259
{
260260
self.reserve(n);
261+
unsafe { self.push_n_unchecked(item, n) }
262+
}
261263

264+
/// Appends n scalars to the buffer.
265+
///
266+
/// ## Safety
267+
///
268+
/// The caller must ensure there is sufficient capacity in the array.
269+
#[inline]
270+
pub unsafe fn push_n_unchecked(&mut self, item: T, n: usize)
271+
where
272+
T: Copy,
273+
{
262274
let mut dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
263275
// SAFETY: we checked the capacity in the reserve call
264276
unsafe {

vortex-datafusion/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ vortex-error = { workspace = true, features = ["datafusion"] }
4242
vortex-expr = { workspace = true, features = ["datafusion"] }
4343
vortex-file = { workspace = true, features = ["object_store"] }
4444
vortex-io = { workspace = true, features = ["object_store", "tokio"] }
45+
vortex-scan = { workspace = true }
4546

4647
[dev-dependencies]
4748
anyhow = { workspace = true }

vortex-datafusion/src/persistent/cache.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@ use chrono::{DateTime, Utc};
44
use moka::future::Cache;
55
use object_store::path::Path;
66
use object_store::{ObjectMeta, ObjectStore};
7+
use vortex_array::ContextRef;
78
use vortex_error::{vortex_err, VortexError, VortexResult};
8-
use vortex_file::{read_initial_bytes, InitialRead};
9+
use vortex_file::v2::{FileLayout, VortexOpenOptions};
910
use vortex_io::ObjectStoreReadAt;
1011

1112
#[derive(Debug, Clone)]
12-
pub struct InitialReadCache {
13-
inner: Cache<Key, InitialRead>,
13+
pub struct FileLayoutCache {
14+
inner: Cache<Key, FileLayout>,
1415
}
1516

1617
#[derive(Hash, Eq, PartialEq, Debug)]
@@ -28,28 +29,31 @@ impl From<&ObjectMeta> for Key {
2829
}
2930
}
3031

31-
impl InitialReadCache {
32+
impl FileLayoutCache {
3233
pub fn new(size_mb: usize) -> Self {
3334
let inner = Cache::builder()
34-
.weigher(|k: &Key, v: &InitialRead| (k.location.as_ref().len() + v.buf.len()) as u32)
3535
.max_capacity(size_mb as u64 * (2 << 20))
36-
.eviction_listener(|k, _v, cause| {
36+
.eviction_listener(|k: Arc<Key>, _v, cause| {
3737
log::trace!("Removed {} due to {:?}", k.location, cause);
3838
})
3939
.build();
4040

4141
Self { inner }
4242
}
43+
4344
pub async fn try_get(
4445
&self,
4546
object: &ObjectMeta,
4647
store: Arc<dyn ObjectStore>,
47-
) -> VortexResult<InitialRead> {
48+
) -> VortexResult<FileLayout> {
4849
self.inner
4950
.try_get_with(Key::from(object), async {
5051
let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone());
51-
let initial_read = read_initial_bytes(&os_read_at, object.size as u64).await?;
52-
VortexResult::Ok(initial_read)
52+
let vxf = VortexOpenOptions::new(ContextRef::default())
53+
.with_file_size(object.size as u64)
54+
.open(os_read_at)
55+
.await?;
56+
VortexResult::Ok(vxf.file_layout().clone())
5357
})
5458
.await
5559
.map_err(|e: Arc<VortexError>| match Arc::try_unwrap(e) {

vortex-datafusion/src/persistent/execution.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use itertools::Itertools;
1414
use vortex_array::ContextRef;
1515
use vortex_dtype::FieldName;
1616

17-
use super::cache::InitialReadCache;
17+
use super::cache::FileLayoutCache;
1818
use crate::persistent::opener::VortexFileOpener;
1919

2020
#[derive(Debug, Clone)]
@@ -25,7 +25,7 @@ pub struct VortexExec {
2525
plan_properties: PlanProperties,
2626
projected_statistics: Statistics,
2727
ctx: ContextRef,
28-
initial_read_cache: InitialReadCache,
28+
initial_read_cache: FileLayoutCache,
2929
}
3030

3131
impl VortexExec {
@@ -34,7 +34,7 @@ impl VortexExec {
3434
metrics: ExecutionPlanMetricsSet,
3535
predicate: Option<Arc<dyn PhysicalExpr>>,
3636
ctx: ContextRef,
37-
initial_read_cache: InitialReadCache,
37+
initial_read_cache: FileLayoutCache,
3838
) -> DFResult<Self> {
3939
let projected_schema = project_schema(
4040
&file_scan_config.file_schema,
@@ -122,17 +122,18 @@ impl ExecutionPlan for VortexExec {
122122
projection
123123
.iter()
124124
.map(|i| FieldName::from(arrow_schema.fields[*i].name().clone()))
125-
.collect_vec()
125+
.collect()
126126
});
127127

128-
let opener = VortexFileOpener {
129-
ctx: self.ctx.clone(),
128+
// TODO(joe): apply the predicate/filter mapping to vortex-expr once.
129+
let opener = VortexFileOpener::new(
130+
self.ctx.clone(),
130131
object_store,
131132
projection,
132-
predicate: self.predicate.clone(),
133-
initial_read_cache: self.initial_read_cache.clone(),
133+
self.predicate.clone(),
134134
arrow_schema,
135-
};
135+
self.initial_read_cache.clone(),
136+
)?;
136137
let stream = FileStream::new(&self.file_scan_config, partition, opener, &self.metrics)?;
137138

138139
Ok(Box::pin(stream))

0 commit comments

Comments
 (0)