Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 119 additions & 95 deletions Cargo.lock

Large diffs are not rendered by default.

56 changes: 29 additions & 27 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,19 @@ anyhow = "1.0.95"
arbitrary = "1.3.2"
arcref = "0.2.0"
arrayref = "0.3.7"
arrow-arith = "55.2.0"
arrow-array = "55.2.0"
arrow-buffer = "55.2.0"
arrow-cast = "55.2.0"
arrow-data = "55.2.0"
arrow-ipc = "55.2.0"
arrow-ord = "55.2.0"
arrow-schema = "55.2.0"
arrow-select = "55.2.0"
arrow-string = "55.2.0"
arrow-arith = "56"
arrow-array = "56"
arrow-buffer = "56"
arrow-cast = "56"
arrow-data = "56"
arrow-ipc = "56"
arrow-ord = "56"
arrow-schema = "56"
arrow-select = "56"
arrow-string = "56"
async-compat = "0.2.5"
async-stream = "0.3.6"
async-trait = "0.1.88"
async-trait = "0.1.89"
bindgen = "0.72.0"
bit-vec = "0.8.0"
bitvec = "1.0.1"
Expand All @@ -86,15 +87,17 @@ crossbeam-deque = "0.8.6"
crossbeam-queue = "0.3.12"
crossterm = "0.29"
dashmap = "6.1.0"
datafusion = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "faca92d", default-features = false }
datafusion-catalog = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "faca92d" }
datafusion-common = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "faca92d" }
datafusion-common-runtime = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "faca92d" }
datafusion-datasource = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "faca92d", default-features = false }
datafusion-execution = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "faca92d" }
datafusion-expr = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "faca92d" }
datafusion-physical-expr = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "faca92d" }
datafusion-physical-plan = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "faca92d" }
datafusion = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d", default-features = false }
datafusion-catalog = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d" }
datafusion-common = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d" }
datafusion-common-runtime = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d" }
datafusion-datasource = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d", default-features = false }
datafusion-execution = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d" }
datafusion-expr = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d" }
datafusion-physical-expr = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d" }
datafusion-physical-expr-adapter = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d" }
datafusion-physical-expr-common = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d" }
datafusion-physical-plan = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d" }
dirs = "6.0.0"
divan = { package = "codspeed-divan-compat", version = "3.0" }
dyn-hash = "0.2.0"
Expand Down Expand Up @@ -137,7 +140,7 @@ opentelemetry = "0.30.0"
opentelemetry-otlp = "0.30.0"
opentelemetry_sdk = "0.30.0"
parking_lot = { version = "0.12.3", features = ["nightly"] }
parquet = "55.2.0"
parquet = "56"
paste = "1.0.15"
pco = "0.4.4"
pin-project = "1.1.5"
Expand Down Expand Up @@ -183,17 +186,16 @@ target-lexicon = "0.13"
tempfile = "3"
termtree = { version = "0.5" }
thiserror = "2.0.3"
tokio = { version = "1.46" }
tokio = { version = "1.47" }
tokio-stream = "0.1.17"
tokio-util = { version = "0.7.16" }
# replace these with releases
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs.git", rev = "d849ff430cd52250f6891ed4d5e3adad77bb2698" }
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs.git", rev = "d849ff430cd52250f6891ed4d5e3adad77bb2698" }
tpchgen = { version = "2" }
tpchgen-arrow = { version = "2" }
tracing = { version = "0.1.41" }
tracing-perfetto = "0.1.5"
tracing-subscriber = "0.3.20"
url = "2.5.4"
uuid = { version = "1.17", features = ["js"] }
url = "2.5.7"
uuid = { version = "1.18", features = ["js"] }
walkdir = "2.5.0"
wasm-bindgen-futures = "0.4.39"
witchcraft-metrics = "1.0.1"
Expand Down
1 change: 1 addition & 0 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ datafusion = { workspace = true, features = [
"parquet",
"datetime_expressions",
"nested_expressions",
"unicode_expressions",
] }
datafusion-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
Expand Down
168 changes: 168 additions & 0 deletions vortex-array/src/arrow/compute/to_arrow/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use arrow_array::types::{
};
use arrow_array::{
Array, ArrayRef as ArrowArrayRef, ArrowPrimitiveType, BooleanArray as ArrowBoolArray,
Decimal32Array as ArrowDecimal32Array, Decimal64Array as ArrowDecimal64Array,
Decimal128Array as ArrowDecimal128Array, Decimal256Array as ArrowDecimal256Array,
GenericByteArray, GenericByteViewArray, GenericListArray, NullArray as ArrowNullArray,
OffsetSizeTrait, PrimitiveArray as ArrowPrimitiveArray, StructArray as ArrowStructArray,
Expand Down Expand Up @@ -110,6 +111,34 @@ impl Kernel for ToArrowCanonical {
{
to_arrow_primitive::<Float64Type>(array)
}
(Canonical::Decimal(array), DataType::Decimal32(precision, scale)) => {
if array.decimal_dtype().precision() != *precision
|| array.decimal_dtype().scale() != *scale
{
vortex_bail!(
"ToArrowCanonical: target precision/scale {}/{} does not match array precision/scale {}/{}",
precision,
scale,
array.decimal_dtype().precision(),
array.decimal_dtype().scale()
);
}
to_arrow_decimal32(array)
}
(Canonical::Decimal(array), DataType::Decimal64(precision, scale)) => {
if array.decimal_dtype().precision() != *precision
|| array.decimal_dtype().scale() != *scale
{
vortex_bail!(
"ToArrowCanonical: target precision/scale {}/{} does not match array precision/scale {}/{}",
precision,
scale,
array.decimal_dtype().precision(),
array.decimal_dtype().scale()
);
}
to_arrow_decimal64(array)
}
(Canonical::Decimal(array), DataType::Decimal128(precision, scale)) => {
if array.decimal_dtype().precision() != *precision
|| array.decimal_dtype().scale() != *scale
Expand Down Expand Up @@ -217,6 +246,91 @@ fn to_arrow_primitive<T: ArrowPrimitiveType>(array: PrimitiveArray) -> VortexRes
)))
}

fn to_arrow_decimal32(array: DecimalArray) -> VortexResult<ArrowArrayRef> {
let null_buffer = array.validity_mask().to_null_buffer();
let buffer: Buffer<i32> = match array.values_type() {
DecimalValueType::I8 => {
Buffer::from_trusted_len_iter(array.buffer::<i8>().into_iter().map(|x| x.as_()))
}
DecimalValueType::I16 => {
Buffer::from_trusted_len_iter(array.buffer::<i16>().into_iter().map(|x| x.as_()))
}
DecimalValueType::I32 => array.buffer::<i32>(),
DecimalValueType::I64 => array
.buffer::<i64>()
.into_iter()
.map(|x| {
x.to_i32()
.ok_or_else(|| vortex_err!("i64 to i32 narrowing cannot be done safely"))
})
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
DecimalValueType::I128 => array
.buffer::<i128>()
.into_iter()
.map(|x| {
x.to_i32()
.ok_or_else(|| vortex_err!("i128 to i32 narrowing cannot be done safely"))
})
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
DecimalValueType::I256 => array
.buffer::<vortex_scalar::i256>()
.into_iter()
.map(|x| {
x.to_i32()
.ok_or_else(|| vortex_err!("i256 to i32 narrowing cannot be done safely"))
})
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
_ => vortex_bail!("unknown value type {:?}", array.values_type()),
};
Ok(Arc::new(
ArrowDecimal32Array::new(buffer.into_arrow_scalar_buffer(), null_buffer)
.with_precision_and_scale(
array.decimal_dtype().precision(),
array.decimal_dtype().scale(),
)?,
))
}

fn to_arrow_decimal64(array: DecimalArray) -> VortexResult<ArrowArrayRef> {
let null_buffer = array.validity_mask().to_null_buffer();
let buffer: Buffer<i64> = match array.values_type() {
DecimalValueType::I8 => {
Buffer::from_trusted_len_iter(array.buffer::<i8>().into_iter().map(|x| x.as_()))
}
DecimalValueType::I16 => {
Buffer::from_trusted_len_iter(array.buffer::<i16>().into_iter().map(|x| x.as_()))
}
DecimalValueType::I32 => {
Buffer::from_trusted_len_iter(array.buffer::<i32>().into_iter().map(|x| x.as_()))
}
DecimalValueType::I64 => array.buffer::<i64>(),
DecimalValueType::I128 => array
.buffer::<i128>()
.into_iter()
.map(|x| {
x.to_i64()
.ok_or_else(|| vortex_err!("i128 to i64 narrowing cannot be done safely"))
})
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
DecimalValueType::I256 => array
.buffer::<vortex_scalar::i256>()
.into_iter()
.map(|x| {
x.to_i64()
.ok_or_else(|| vortex_err!("i256 to i64 narrowing cannot be done safely"))
})
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
_ => vortex_bail!("unknown value type {:?}", array.values_type()),
};
Ok(Arc::new(
ArrowDecimal64Array::new(buffer.into_arrow_scalar_buffer(), null_buffer)
.with_precision_and_scale(
array.decimal_dtype().precision(),
array.decimal_dtype().scale(),
)?,
))
}

fn to_arrow_decimal128(array: DecimalArray) -> VortexResult<ArrowArrayRef> {
let null_buffer = array.validity_mask().to_null_buffer();
let buffer: Buffer<i128> = match array.values_type() {
Expand Down Expand Up @@ -540,6 +654,60 @@ mod tests {
assert_eq!(arrow_decimal.value(2), 12);
}

#[rstest]
#[case(0i8)]
#[case(0i16)]
#[case(0i32)]
#[case(0i64)]
#[case(0i128)]
#[case(vortex_scalar::i256::ZERO)]
fn to_arrow_decimal32<T: NativeDecimalType>(#[case] _decimal_type: T) {
use arrow_array::Decimal32Array;

let mut decimal = DecimalBuilder::new::<T>(2, 1, false.into());
decimal.append_value(10);
decimal.append_value(11);
decimal.append_value(12);

let decimal = decimal.finish();

let arrow_array = decimal.into_arrow(&DataType::Decimal32(2, 1)).unwrap();
let arrow_decimal = arrow_array
.as_any()
.downcast_ref::<Decimal32Array>()
.unwrap();
assert_eq!(arrow_decimal.value(0), 10);
assert_eq!(arrow_decimal.value(1), 11);
assert_eq!(arrow_decimal.value(2), 12);
}

#[rstest]
#[case(0i8)]
#[case(0i16)]
#[case(0i32)]
#[case(0i64)]
#[case(0i128)]
#[case(vortex_scalar::i256::ZERO)]
fn to_arrow_decimal64<T: NativeDecimalType>(#[case] _decimal_type: T) {
use arrow_array::Decimal64Array;

let mut decimal = DecimalBuilder::new::<T>(2, 1, false.into());
decimal.append_value(10);
decimal.append_value(11);
decimal.append_value(12);

let decimal = decimal.finish();

let arrow_array = decimal.into_arrow(&DataType::Decimal64(2, 1)).unwrap();
let arrow_decimal = arrow_array
.as_any()
.downcast_ref::<Decimal64Array>()
.unwrap();
assert_eq!(arrow_decimal.value(0), 10);
assert_eq!(arrow_decimal.value(1), 11);
assert_eq!(arrow_decimal.value(2), 12);
}

#[rstest]
#[case(0i8)]
#[case(0i16)]
Expand Down
34 changes: 29 additions & 5 deletions vortex-array/src/arrow/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::sync::Arc;

use arrow_array::cast::{AsArray, as_null_array};
use arrow_array::types::{
ByteArrayType, ByteViewType, Date32Type, Date64Type, Decimal128Type, Decimal256Type,
Float16Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type,
Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
ByteArrayType, ByteViewType, Date32Type, Date64Type, Decimal32Type, Decimal64Type,
Decimal128Type, Decimal256Type, Float16Type, Float32Type, Float64Type, Int8Type, Int16Type,
Int32Type, Int64Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
Time64NanosecondType, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
};
use arrow_array::{
Array as ArrowArray, ArrowPrimitiveType, BooleanArray as ArrowBooleanArray, GenericByteArray,
Expand Down Expand Up @@ -103,6 +103,24 @@ impl_from_arrow_primitive!(Float16Type);
impl_from_arrow_primitive!(Float32Type);
impl_from_arrow_primitive!(Float64Type);

impl FromArrowArray<&ArrowPrimitiveArray<Decimal32Type>> for ArrayRef {
fn from_arrow(array: &ArrowPrimitiveArray<Decimal32Type>, nullable: bool) -> Self {
let decimal_type = DecimalDType::new(array.precision(), array.scale());
let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
let validity = nulls(array.nulls(), nullable);
DecimalArray::new(buffer, decimal_type, validity).into_array()
}
}

impl FromArrowArray<&ArrowPrimitiveArray<Decimal64Type>> for ArrayRef {
fn from_arrow(array: &ArrowPrimitiveArray<Decimal64Type>, nullable: bool) -> Self {
let decimal_type = DecimalDType::new(array.precision(), array.scale());
let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
let validity = nulls(array.nulls(), nullable);
DecimalArray::new(buffer, decimal_type, validity).into_array()
}
}

impl FromArrowArray<&ArrowPrimitiveArray<Decimal128Type>> for ArrayRef {
fn from_arrow(array: &ArrowPrimitiveArray<Decimal128Type>, nullable: bool) -> Self {
let decimal_type = DecimalDType::new(array.precision(), array.scale());
Expand Down Expand Up @@ -413,6 +431,12 @@ impl FromArrowArray<&dyn ArrowArray> for ArrayRef {
}
ArrowTimeUnit::Second | ArrowTimeUnit::Millisecond => unreachable!(),
},
DataType::Decimal32(..) => {
Self::from_arrow(array.as_primitive::<Decimal32Type>(), nullable)
}
DataType::Decimal64(..) => {
Self::from_arrow(array.as_primitive::<Decimal64Type>(), nullable)
}
DataType::Decimal128(..) => {
Self::from_arrow(array.as_primitive::<Decimal128Type>(), nullable)
}
Expand Down
2 changes: 2 additions & 0 deletions vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ datafusion-datasource = { workspace = true, default-features = false }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-adapter = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions vortex-datafusion/src/convert/exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;
use arrow_schema::{DataType, Schema};
use datafusion_expr::Operator as DFOperator;
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr;
use datafusion_physical_plan::expressions as df_expr;
use vortex::error::{VortexResult, vortex_bail, vortex_err};
use vortex::expr::{BinaryExpr, ExprRef, LikeExpr, Operator, and, get_item, lit, root};
Expand Down Expand Up @@ -122,6 +123,12 @@ impl TryFromDataFusion<DFOperator> for Operator {
}

pub(crate) fn can_be_pushed_down(expr: &PhysicalExprRef, schema: &Schema) -> bool {
// We currently do not support pushdown of dynamic expressions in DF.
// See issue: https://github.com/vortex-data/vortex/issues/4034
if is_dynamic_physical_expr(expr) {
return false;
}

let expr = expr.as_any();
if let Some(binary) = expr.downcast_ref::<df_expr::BinaryExpr>() {
can_binary_be_pushed_down(binary, schema)
Expand Down
Loading
Loading