Skip to content

Commit 6e9d779

Browse files
authored
feat: add into_arrow to IntoCanonicalVTable (#1604)
Historically, we've gated the ability to go from Vortex -> Arrow arrays behind the `Canonical` type, which picks one "blessed" Arrow encoding for each of our DTypes. Since the introduction of VarBinView in #1082, we are in a position where there are now 2 Vortex string encodings that can each be directly converted to Arrow. What's more, FSSTArray internally uses a `VarBin` array to encode the FSST-compressed strings. It delegates in its CompareFn implementation to running a comparison against the values, which are `VarBin` that will use the default `compare` codepath which does `into_canonical()?.into_arrow()?` and then uses the Arrow codec. This is slow now, because VarBin.into_canonical() will iterate over all the strings to build a canonical `VarBinView`. This requires a full decompress which makes the pushdown pointless. This PR augments the existing `IntoCanonicalVTable` allowing encodings to implement their own `into_arrow()` method. The default continues to call `into_canonical().into_arrow()`, but we implement a fast version for VarBin.
1 parent 6a11488 commit 6e9d779

File tree

11 files changed

+47
-32
lines changed

11 files changed

+47
-32
lines changed

bench-vortex/benches/compress.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ fn vortex_decompress_read(runtime: &Runtime, buf: Buffer) -> VortexResult<Vec<Ar
146146
let mut batches = vec![];
147147
let mut stream = builder.build().await?;
148148
while let Some(batch) = stream.next().await {
149-
batches.push(batch?.into_canonical()?.into_arrow()?);
149+
batches.push(batch?.into_arrow()?);
150150
}
151151
Ok(batches)
152152
}

bench-vortex/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ mod test {
352352
let struct_arrow: ArrowStructArray = record_batch.into();
353353
let arrow_array: ArrowArrayRef = Arc::new(struct_arrow);
354354
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false);
355-
let vortex_as_arrow = vortex_array.into_canonical().unwrap().into_arrow().unwrap();
355+
let vortex_as_arrow = vortex_array.into_arrow().unwrap();
356356
assert_eq!(vortex_as_arrow.deref(), arrow_array.deref());
357357
}
358358
}
@@ -373,7 +373,7 @@ mod test {
373373
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false);
374374

375375
let compressed = compressor.compress(&vortex_array).unwrap();
376-
let compressed_as_arrow = compressed.into_canonical().unwrap().into_arrow().unwrap();
376+
let compressed_as_arrow = compressed.into_arrow().unwrap();
377377
assert_eq!(compressed_as_arrow.deref(), arrow_array.deref());
378378
}
379379
}

pyvortex/src/array.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,7 @@ impl PyArray {
122122
if let Ok(chunked_array) = ChunkedArray::try_from(vortex.clone()) {
123123
let chunks: Vec<ArrayRef> = chunked_array
124124
.chunks()
125-
.map(|chunk| -> PyResult<ArrayRef> {
126-
let canonical = chunk.into_canonical()?;
127-
Ok(canonical.into_arrow()?)
128-
})
125+
.map(|chunk| -> PyResult<ArrayRef> { Ok(chunk.into_arrow()?) })
129126
.collect::<PyResult<Vec<ArrayRef>>>()?;
130127
if chunks.is_empty() {
131128
return Err(PyValueError::new_err("No chunks in array"));
@@ -145,8 +142,7 @@ impl PyArray {
145142
} else {
146143
Ok(vortex
147144
.clone()
148-
.into_canonical()
149-
.and_then(|arr| arr.into_arrow())?
145+
.into_arrow()?
150146
.into_data()
151147
.to_pyarrow(py)?
152148
.into_bound(py))

vortex-array/src/array/varbin/flatten.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use arrow_array::ArrayRef;
12
use arrow_schema::DataType;
23
use vortex_dtype::DType;
34
use vortex_error::VortexResult;
@@ -21,6 +22,12 @@ impl IntoCanonical for VarBinArray {
2122

2223
VarBinViewArray::try_from(ArrayData::from_arrow(array, nullable)).map(Canonical::VarBinView)
2324
}
25+
26+
fn into_arrow(self) -> VortexResult<ArrayRef> {
27+
// Specialized implementation of `into_arrow` for VarBin since it has a direct
28+
// Arrow representation.
29+
varbin_to_arrow(&self)
30+
}
2431
}
2532

2633
#[cfg(test)]

vortex-array/src/arrow/datum.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ impl TryFrom<ArrayData> for Datum {
2121
.unwrap_or_default()
2222
{
2323
Ok(Self {
24-
array: slice(array, 0, 1)?.into_canonical()?.into_arrow()?,
24+
array: slice(array, 0, 1)?.into_arrow()?,
2525
is_scalar: true,
2626
})
2727
} else {
2828
Ok(Self {
29-
array: array.into_canonical()?.into_arrow()?,
29+
array: array.into_arrow()?,
3030
is_scalar: false,
3131
})
3232
}

vortex-array/src/canonical.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ impl Canonical {
8484
// Convert storage array directly into arrow, losing type information
8585
// that will let us round-trip.
8686
// TODO(aduffy): https://github.com/spiraldb/vortex/issues/1167
87-
a.storage().into_canonical()?.into_arrow()?
87+
a.storage().into_arrow()?
8888
}
8989
}
9090
})
@@ -234,7 +234,7 @@ fn list_to_arrow(list: ListArray) -> VortexResult<ArrayRef> {
234234
list.validity().nullability().into(),
235235
));
236236

237-
let values = list.elements().into_canonical()?.into_arrow()?;
237+
let values = list.elements().into_arrow()?;
238238
let nulls = list.logical_validity().to_null_buffer()?;
239239

240240
Ok(match offsets.ptype() {
@@ -330,7 +330,7 @@ fn temporal_to_arrow(temporal_array: TemporalArray) -> VortexResult<ArrayRef> {
330330
})
331331
}
332332

333-
/// Support trait for transmuting an array into its [vortex_dtype::DType]'s canonical encoding.
333+
/// Support trait for transmuting an array into the canonical encoding for its [vortex_dtype::DType].
334334
///
335335
/// This conversion ensures that the array's encoding matches one of the builtin canonical
336336
/// encodings, each of which has a corresponding [Canonical] variant.
@@ -340,12 +340,21 @@ fn temporal_to_arrow(temporal_array: TemporalArray) -> VortexResult<ArrayRef> {
340340
/// The DType of the array will be unchanged by canonicalization.
341341
pub trait IntoCanonical {
342342
fn into_canonical(self) -> VortexResult<Canonical>;
343+
344+
fn into_arrow(self) -> VortexResult<ArrayRef>
345+
where
346+
Self: Sized,
347+
{
348+
self.into_canonical()?.into_arrow()
349+
}
343350
}
344351

345352
/// Encoding VTable for canonicalizing an array.
346353
#[allow(clippy::wrong_self_convention)]
347354
pub trait IntoCanonicalVTable {
348355
fn into_canonical(&self, array: ArrayData) -> VortexResult<Canonical>;
356+
357+
fn into_arrow(&self, array: ArrayData) -> VortexResult<ArrayRef>;
349358
}
350359

351360
/// Implement the [IntoCanonicalVTable] for all encodings with arrays implementing [IntoCanonical].
@@ -359,6 +368,10 @@ where
359368
canonical.inherit_statistics(data.statistics());
360369
Ok(canonical)
361370
}
371+
372+
fn into_arrow(&self, array: ArrayData) -> VortexResult<ArrayRef> {
373+
E::Array::try_from(array)?.into_arrow()
374+
}
362375
}
363376

364377
/// Trait for types that can be converted from an owned type into an owned array variant.
@@ -525,8 +538,6 @@ mod test {
525538
.unwrap();
526539

527540
let arrow_struct = nested_struct_array
528-
.into_canonical()
529-
.unwrap()
530541
.into_arrow()
531542
.unwrap()
532543
.as_any()
@@ -597,12 +608,7 @@ mod test {
597608

598609
assert_eq!(
599610
&arrow_struct,
600-
vortex_struct
601-
.into_canonical()
602-
.unwrap()
603-
.into_arrow()
604-
.unwrap()
605-
.as_struct()
611+
vortex_struct.into_arrow().unwrap().as_struct()
606612
);
607613
}
608614
}

vortex-array/src/compute/filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ pub fn filter(array: &ArrayData, mask: FilterMask) -> VortexResult<ArrayData> {
102102
array.encoding().id(),
103103
);
104104

105-
let array_ref = array.clone().into_canonical()?.into_arrow()?;
105+
let array_ref = array.clone().into_arrow()?;
106106
let mask_array = BooleanArray::new(mask.to_boolean_buffer()?, None);
107107
let filtered = arrow_select::filter::filter(array_ref.as_ref(), &mask_array)?;
108108

vortex-array/src/encoding/opaque.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::any::Any;
22
use std::fmt::{Debug, Display, Formatter};
33
use std::sync::Arc;
44

5+
use arrow_array::ArrayRef;
56
use vortex_error::{vortex_bail, vortex_panic, VortexResult};
67

78
use crate::compute::ComputeVTable;
@@ -47,6 +48,13 @@ impl IntoCanonicalVTable for OpaqueEncoding {
4748
self.0
4849
)
4950
}
51+
52+
fn into_arrow(&self, _array: ArrayData) -> VortexResult<ArrayRef> {
53+
vortex_bail!(
54+
"OpaqueEncoding: into_arrow cannot be called for opaque array ({})",
55+
self.0
56+
)
57+
}
5058
}
5159

5260
impl ComputeVTable for OpaqueEncoding {}

vortex-datafusion/src/memory/plans.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ impl Stream for RowIndicesStream {
160160
.conjunction_expr
161161
.evaluate(vortex_struct.as_ref())
162162
.map_err(|e| DataFusionError::External(e.into()))?
163-
.into_canonical()?
164163
.into_arrow()?;
165164

166165
// Convert the `selection` BooleanArray into a UInt64Array of indices.
@@ -349,9 +348,8 @@ where
349348
// We should find a way to avoid decoding the filter columns and only decode the other
350349
// columns, then stitch the StructArray back together from those.
351350
let projected_for_output = chunk.project(this.output_projection)?;
352-
let decoded = take(projected_for_output, &row_indices, TakeOptions::default())?
353-
.into_canonical()?
354-
.into_arrow()?;
351+
let decoded =
352+
take(projected_for_output, &row_indices, TakeOptions::default())?.into_arrow()?;
355353

356354
// Send back a single record batch of the decoded data.
357355
let output_batch = RecordBatch::from(decoded.as_struct());

vortex-datafusion/src/persistent/statistics.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@ pub fn array_to_col_statistics(array: &StructArray) -> VortexResult<ColumnStatis
1414
let mut stats = ColumnStatistics::new_unknown();
1515

1616
if let Some(null_count_array) = array.field_by_name(Stat::NullCount.name()) {
17-
let array = null_count_array.into_canonical()?.into_arrow()?;
17+
let array = null_count_array.into_arrow()?;
1818
let array = array.as_primitive::<UInt64Type>();
1919

2020
let null_count = array.iter().map(|v| v.unwrap_or_default()).sum::<u64>();
2121
stats.null_count = Precision::Exact(null_count as usize);
2222
}
2323

2424
if let Some(max_value_array) = array.field_by_name(Stat::Max.name()) {
25-
let array = max_value_array.into_canonical()?.into_arrow()?;
25+
let array = max_value_array.into_arrow()?;
2626
let mut acc = MaxAccumulator::try_new(array.data_type())?;
2727
acc.update_batch(&[array])?;
2828

@@ -31,7 +31,7 @@ pub fn array_to_col_statistics(array: &StructArray) -> VortexResult<ColumnStatis
3131
}
3232

3333
if let Some(min_value_array) = array.field_by_name(Stat::Min.name()) {
34-
let array = min_value_array.into_canonical()?.into_arrow()?;
34+
let array = min_value_array.into_arrow()?;
3535
let mut acc = MinAccumulator::try_new(array.data_type())?;
3636
acc.update_batch(&[array])?;
3737

@@ -46,7 +46,7 @@ pub fn uncompressed_col_size(array: &StructArray) -> VortexResult<Option<u64>> {
4646
match array.field_by_name(Stat::UncompressedSizeInBytes.name()) {
4747
None => Ok(None),
4848
Some(array) => {
49-
let array = array.into_canonical()?.into_arrow()?;
49+
let array = array.into_arrow()?;
5050
let array = array.as_primitive::<UInt64Type>();
5151

5252
let uncompressed_size = array.iter().map(|v| v.unwrap_or_default()).sum::<u64>();

0 commit comments

Comments
 (0)