Skip to content

Commit 972f92b

Browse files
AdamGSzhuqi-lucas
authored andcommitted
Bump DataFusion to 50 and arrow to 56 (vortex-data#4577)
Just made sure there's no breaking changes coming our way. Includes support for the two new Arrow decimal types (32 and 64 bits), but without converting from Vortex dtype into them. Codspeed improvements are due to a new inline hint on arrow's `BitIterator::next` (found by @0ax1 🥳). Open issues: - [ ] apache/datafusion#17489 - mostly affects if/how much support we want to provide to decimals, or whether we need to provide some other solution for DataFusion. - [ ] Issue with followups - vortex-data#4668 --------- Signed-off-by: Adam Gutglick <[email protected]> (cherry picked from commit cc426c8)
1 parent e7e62f9 commit 972f92b

File tree

10 files changed

+264
-8086
lines changed

10 files changed

+264
-8086
lines changed

Cargo.lock

Lines changed: 0 additions & 8035 deletions
This file was deleted.

Cargo.toml

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,19 @@ anyhow = "1.0.95"
5959
arbitrary = "1.3.2"
6060
arcref = "0.2.0"
6161
arrayref = "0.3.7"
62-
arrow-arith = "55.2.0"
63-
arrow-array = "55.2.0"
64-
arrow-buffer = "55.2.0"
65-
arrow-cast = "55.2.0"
66-
arrow-data = "55.2.0"
67-
arrow-ipc = "55.2.0"
68-
arrow-ord = "55.2.0"
69-
arrow-schema = "55.2.0"
70-
arrow-select = "55.2.0"
71-
arrow-string = "55.2.0"
62+
arrow-arith = "56"
63+
arrow-array = "56"
64+
arrow-buffer = "56"
65+
arrow-cast = "56"
66+
arrow-data = "56"
67+
arrow-ipc = "56"
68+
arrow-ord = "56"
69+
arrow-schema = "56"
70+
arrow-select = "56"
71+
arrow-string = "56"
72+
async-compat = "0.2.5"
7273
async-stream = "0.3.6"
73-
async-trait = "0.1.88"
74+
async-trait = "0.1.89"
7475
bindgen = "0.72.0"
7576
bit-vec = "0.8.0"
7677
bitvec = "1.0.1"
@@ -86,15 +87,16 @@ crossbeam-deque = "0.8.6"
8687
crossbeam-queue = "0.3.12"
8788
crossterm = "0.29"
8889
dashmap = "6.1.0"
89-
datafusion = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "faca92d", default-features = false }
90-
datafusion-catalog = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "faca92d" }
91-
datafusion-common = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "faca92d" }
92-
datafusion-common-runtime = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "faca92d" }
93-
datafusion-datasource = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "faca92d", default-features = false }
94-
datafusion-execution = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "faca92d" }
95-
datafusion-expr = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "faca92d" }
96-
datafusion-physical-expr = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "faca92d" }
97-
datafusion-physical-plan = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "faca92d" }
90+
datafusion = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d", default-features = false }
91+
datafusion-catalog = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d" }
92+
datafusion-common = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d" }
93+
datafusion-common-runtime = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d" }
94+
datafusion-datasource = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d", default-features = false }
95+
datafusion-execution = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d" }
96+
datafusion-expr = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d" }
97+
datafusion-physical-expr = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d" }
98+
datafusion-physical-plan = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d" }
99+
datafusion-physical-expr-adapter = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "5a85b7d" }
98100
dirs = "6.0.0"
99101
divan = { package = "codspeed-divan-compat", version = "3.0" }
100102
dyn-hash = "0.2.0"
@@ -137,7 +139,7 @@ opentelemetry = "0.30.0"
137139
opentelemetry-otlp = "0.30.0"
138140
opentelemetry_sdk = "0.30.0"
139141
parking_lot = { version = "0.12.3", features = ["nightly"] }
140-
parquet = "55.2.0"
142+
parquet = "56"
141143
paste = "1.0.15"
142144
pco = "0.4.4"
143145
pin-project = "1.1.5"
@@ -183,17 +185,16 @@ target-lexicon = "0.13"
183185
tempfile = "3"
184186
termtree = { version = "0.5" }
185187
thiserror = "2.0.3"
186-
tokio = { version = "1.46" }
188+
tokio = { version = "1.47" }
187189
tokio-stream = "0.1.17"
188190
tokio-util = { version = "0.7.16" }
189-
# replace these with releases
190-
tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs.git", rev = "d849ff430cd52250f6891ed4d5e3adad77bb2698" }
191-
tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs.git", rev = "d849ff430cd52250f6891ed4d5e3adad77bb2698" }
191+
tpchgen = { version = "2" }
192+
tpchgen-arrow = { version = "2" }
192193
tracing = { version = "0.1.41" }
193194
tracing-perfetto = "0.1.5"
194195
tracing-subscriber = "0.3.20"
195-
url = "2.5.4"
196-
uuid = { version = "1.17", features = ["js"] }
196+
url = "2.5.7"
197+
uuid = { version = "1.18", features = ["js"] }
197198
walkdir = "2.5.0"
198199
wasm-bindgen-futures = "0.4.39"
199200
witchcraft-metrics = "1.0.1"

bench-vortex/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ datafusion = { workspace = true, features = [
2929
"parquet",
3030
"datetime_expressions",
3131
"nested_expressions",
32+
"unicode_expressions",
3233
] }
3334
datafusion-common = { workspace = true }
3435
datafusion-physical-plan = { workspace = true }

vortex-array/src/arrow/compute/to_arrow/canonical.rs

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use arrow_array::types::{
1010
};
1111
use arrow_array::{
1212
Array, ArrayRef as ArrowArrayRef, ArrowPrimitiveType, BooleanArray as ArrowBoolArray,
13+
Decimal32Array as ArrowDecimal32Array, Decimal64Array as ArrowDecimal64Array,
1314
Decimal128Array as ArrowDecimal128Array, Decimal256Array as ArrowDecimal256Array,
1415
GenericByteArray, GenericByteViewArray, GenericListArray, NullArray as ArrowNullArray,
1516
OffsetSizeTrait, PrimitiveArray as ArrowPrimitiveArray, StructArray as ArrowStructArray,
@@ -110,6 +111,34 @@ impl Kernel for ToArrowCanonical {
110111
{
111112
to_arrow_primitive::<Float64Type>(array)
112113
}
114+
(Canonical::Decimal(array), DataType::Decimal32(precision, scale)) => {
115+
if array.decimal_dtype().precision() != *precision
116+
|| array.decimal_dtype().scale() != *scale
117+
{
118+
vortex_bail!(
119+
"ToArrowCanonical: target precision/scale {}/{} does not match array precision/scale {}/{}",
120+
precision,
121+
scale,
122+
array.decimal_dtype().precision(),
123+
array.decimal_dtype().scale()
124+
);
125+
}
126+
to_arrow_decimal32(array)
127+
}
128+
(Canonical::Decimal(array), DataType::Decimal64(precision, scale)) => {
129+
if array.decimal_dtype().precision() != *precision
130+
|| array.decimal_dtype().scale() != *scale
131+
{
132+
vortex_bail!(
133+
"ToArrowCanonical: target precision/scale {}/{} does not match array precision/scale {}/{}",
134+
precision,
135+
scale,
136+
array.decimal_dtype().precision(),
137+
array.decimal_dtype().scale()
138+
);
139+
}
140+
to_arrow_decimal64(array)
141+
}
113142
(Canonical::Decimal(array), DataType::Decimal128(precision, scale)) => {
114143
if array.decimal_dtype().precision() != *precision
115144
|| array.decimal_dtype().scale() != *scale
@@ -217,6 +246,91 @@ fn to_arrow_primitive<T: ArrowPrimitiveType>(array: PrimitiveArray) -> VortexRes
217246
)))
218247
}
219248

249+
fn to_arrow_decimal32(array: DecimalArray) -> VortexResult<ArrowArrayRef> {
250+
let null_buffer = array.validity_mask().to_null_buffer();
251+
let buffer: Buffer<i32> = match array.values_type() {
252+
DecimalValueType::I8 => {
253+
Buffer::from_trusted_len_iter(array.buffer::<i8>().into_iter().map(|x| x.as_()))
254+
}
255+
DecimalValueType::I16 => {
256+
Buffer::from_trusted_len_iter(array.buffer::<i16>().into_iter().map(|x| x.as_()))
257+
}
258+
DecimalValueType::I32 => array.buffer::<i32>(),
259+
DecimalValueType::I64 => array
260+
.buffer::<i64>()
261+
.into_iter()
262+
.map(|x| {
263+
x.to_i32()
264+
.ok_or_else(|| vortex_err!("i64 to i32 narrowing cannot be done safely"))
265+
})
266+
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
267+
DecimalValueType::I128 => array
268+
.buffer::<i128>()
269+
.into_iter()
270+
.map(|x| {
271+
x.to_i32()
272+
.ok_or_else(|| vortex_err!("i128 to i32 narrowing cannot be done safely"))
273+
})
274+
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
275+
DecimalValueType::I256 => array
276+
.buffer::<vortex_scalar::i256>()
277+
.into_iter()
278+
.map(|x| {
279+
x.to_i32()
280+
.ok_or_else(|| vortex_err!("i256 to i32 narrowing cannot be done safely"))
281+
})
282+
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
283+
_ => vortex_bail!("unknown value type {:?}", array.values_type()),
284+
};
285+
Ok(Arc::new(
286+
ArrowDecimal32Array::new(buffer.into_arrow_scalar_buffer(), null_buffer)
287+
.with_precision_and_scale(
288+
array.decimal_dtype().precision(),
289+
array.decimal_dtype().scale(),
290+
)?,
291+
))
292+
}
293+
294+
fn to_arrow_decimal64(array: DecimalArray) -> VortexResult<ArrowArrayRef> {
295+
let null_buffer = array.validity_mask().to_null_buffer();
296+
let buffer: Buffer<i64> = match array.values_type() {
297+
DecimalValueType::I8 => {
298+
Buffer::from_trusted_len_iter(array.buffer::<i8>().into_iter().map(|x| x.as_()))
299+
}
300+
DecimalValueType::I16 => {
301+
Buffer::from_trusted_len_iter(array.buffer::<i16>().into_iter().map(|x| x.as_()))
302+
}
303+
DecimalValueType::I32 => {
304+
Buffer::from_trusted_len_iter(array.buffer::<i32>().into_iter().map(|x| x.as_()))
305+
}
306+
DecimalValueType::I64 => array.buffer::<i64>(),
307+
DecimalValueType::I128 => array
308+
.buffer::<i128>()
309+
.into_iter()
310+
.map(|x| {
311+
x.to_i64()
312+
.ok_or_else(|| vortex_err!("i128 to i64 narrowing cannot be done safely"))
313+
})
314+
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
315+
DecimalValueType::I256 => array
316+
.buffer::<vortex_scalar::i256>()
317+
.into_iter()
318+
.map(|x| {
319+
x.to_i64()
320+
.ok_or_else(|| vortex_err!("i256 to i64 narrowing cannot be done safely"))
321+
})
322+
.process_results(|iter| Buffer::from_trusted_len_iter(iter))?,
323+
_ => vortex_bail!("unknown value type {:?}", array.values_type()),
324+
};
325+
Ok(Arc::new(
326+
ArrowDecimal64Array::new(buffer.into_arrow_scalar_buffer(), null_buffer)
327+
.with_precision_and_scale(
328+
array.decimal_dtype().precision(),
329+
array.decimal_dtype().scale(),
330+
)?,
331+
))
332+
}
333+
220334
fn to_arrow_decimal128(array: DecimalArray) -> VortexResult<ArrowArrayRef> {
221335
let null_buffer = array.validity_mask().to_null_buffer();
222336
let buffer: Buffer<i128> = match array.values_type() {
@@ -540,6 +654,60 @@ mod tests {
540654
assert_eq!(arrow_decimal.value(2), 12);
541655
}
542656

657+
#[rstest]
658+
#[case(0i8)]
659+
#[case(0i16)]
660+
#[case(0i32)]
661+
#[case(0i64)]
662+
#[case(0i128)]
663+
#[case(vortex_scalar::i256::ZERO)]
664+
fn to_arrow_decimal32<T: NativeDecimalType>(#[case] _decimal_type: T) {
665+
use arrow_array::Decimal32Array;
666+
667+
let mut decimal = DecimalBuilder::new::<T>(2, 1, false.into());
668+
decimal.append_value(10);
669+
decimal.append_value(11);
670+
decimal.append_value(12);
671+
672+
let decimal = decimal.finish();
673+
674+
let arrow_array = decimal.into_arrow(&DataType::Decimal32(2, 1)).unwrap();
675+
let arrow_decimal = arrow_array
676+
.as_any()
677+
.downcast_ref::<Decimal32Array>()
678+
.unwrap();
679+
assert_eq!(arrow_decimal.value(0), 10);
680+
assert_eq!(arrow_decimal.value(1), 11);
681+
assert_eq!(arrow_decimal.value(2), 12);
682+
}
683+
684+
#[rstest]
685+
#[case(0i8)]
686+
#[case(0i16)]
687+
#[case(0i32)]
688+
#[case(0i64)]
689+
#[case(0i128)]
690+
#[case(vortex_scalar::i256::ZERO)]
691+
fn to_arrow_decimal64<T: NativeDecimalType>(#[case] _decimal_type: T) {
692+
use arrow_array::Decimal64Array;
693+
694+
let mut decimal = DecimalBuilder::new::<T>(2, 1, false.into());
695+
decimal.append_value(10);
696+
decimal.append_value(11);
697+
decimal.append_value(12);
698+
699+
let decimal = decimal.finish();
700+
701+
let arrow_array = decimal.into_arrow(&DataType::Decimal64(2, 1)).unwrap();
702+
let arrow_decimal = arrow_array
703+
.as_any()
704+
.downcast_ref::<Decimal64Array>()
705+
.unwrap();
706+
assert_eq!(arrow_decimal.value(0), 10);
707+
assert_eq!(arrow_decimal.value(1), 11);
708+
assert_eq!(arrow_decimal.value(2), 12);
709+
}
710+
543711
#[rstest]
544712
#[case(0i8)]
545713
#[case(0i16)]

vortex-array/src/arrow/convert.rs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ use std::sync::Arc;
55

66
use arrow_array::cast::{AsArray, as_null_array};
77
use arrow_array::types::{
8-
ByteArrayType, ByteViewType, Date32Type, Date64Type, Decimal128Type, Decimal256Type,
9-
Float16Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type,
10-
Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
11-
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
12-
TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
8+
ByteArrayType, ByteViewType, Date32Type, Date64Type, Decimal32Type, Decimal64Type,
9+
Decimal128Type, Decimal256Type, Float16Type, Float32Type, Float64Type, Int8Type, Int16Type,
10+
Int32Type, Int64Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
11+
Time64NanosecondType, TimestampMicrosecondType, TimestampMillisecondType,
12+
TimestampNanosecondType, TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
1313
};
1414
use arrow_array::{
1515
Array as ArrowArray, ArrowPrimitiveType, BooleanArray as ArrowBooleanArray, GenericByteArray,
@@ -103,6 +103,24 @@ impl_from_arrow_primitive!(Float16Type);
103103
impl_from_arrow_primitive!(Float32Type);
104104
impl_from_arrow_primitive!(Float64Type);
105105

106+
impl FromArrowArray<&ArrowPrimitiveArray<Decimal32Type>> for ArrayRef {
107+
fn from_arrow(array: &ArrowPrimitiveArray<Decimal32Type>, nullable: bool) -> Self {
108+
let decimal_type = DecimalDType::new(array.precision(), array.scale());
109+
let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
110+
let validity = nulls(array.nulls(), nullable);
111+
DecimalArray::new(buffer, decimal_type, validity).into_array()
112+
}
113+
}
114+
115+
impl FromArrowArray<&ArrowPrimitiveArray<Decimal64Type>> for ArrayRef {
116+
fn from_arrow(array: &ArrowPrimitiveArray<Decimal64Type>, nullable: bool) -> Self {
117+
let decimal_type = DecimalDType::new(array.precision(), array.scale());
118+
let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
119+
let validity = nulls(array.nulls(), nullable);
120+
DecimalArray::new(buffer, decimal_type, validity).into_array()
121+
}
122+
}
123+
106124
impl FromArrowArray<&ArrowPrimitiveArray<Decimal128Type>> for ArrayRef {
107125
fn from_arrow(array: &ArrowPrimitiveArray<Decimal128Type>, nullable: bool) -> Self {
108126
let decimal_type = DecimalDType::new(array.precision(), array.scale());
@@ -413,6 +431,12 @@ impl FromArrowArray<&dyn ArrowArray> for ArrayRef {
413431
}
414432
ArrowTimeUnit::Second | ArrowTimeUnit::Millisecond => unreachable!(),
415433
},
434+
DataType::Decimal32(..) => {
435+
Self::from_arrow(array.as_primitive::<Decimal32Type>(), nullable)
436+
}
437+
DataType::Decimal64(..) => {
438+
Self::from_arrow(array.as_primitive::<Decimal64Type>(), nullable)
439+
}
416440
DataType::Decimal128(..) => {
417441
Self::from_arrow(array.as_primitive::<Decimal128Type>(), nullable)
418442
}

vortex-datafusion/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ datafusion-datasource = { workspace = true, default-features = false }
2424
datafusion-execution = { workspace = true }
2525
datafusion-expr = { workspace = true }
2626
datafusion-physical-expr = { workspace = true }
27+
datafusion-physical-expr-adapter = { workspace = true }
28+
datafusion-physical-expr-common = { workspace = true }
2729
datafusion-physical-plan = { workspace = true }
2830
futures = { workspace = true }
2931
itertools = { workspace = true }

vortex-datafusion/src/convert/exprs.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::sync::Arc;
66
use arrow_schema::{DataType, Schema};
77
use datafusion_expr::Operator as DFOperator;
88
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
9+
use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr;
910
use datafusion_physical_plan::expressions as df_expr;
1011
use vortex::error::{VortexResult, vortex_bail, vortex_err};
1112
use vortex::expr::{BinaryExpr, ExprRef, LikeExpr, Operator, and, get_item, lit, root};
@@ -122,6 +123,12 @@ impl TryFromDataFusion<DFOperator> for Operator {
122123
}
123124

124125
pub(crate) fn can_be_pushed_down(expr: &PhysicalExprRef, schema: &Schema) -> bool {
126+
// We currently do not support pushdown of dynamic expressions in DF.
127+
// See issue: https://github.com/vortex-data/vortex/issues/4034
128+
if is_dynamic_physical_expr(expr) {
129+
return false;
130+
}
131+
125132
let expr = expr.as_any();
126133
if let Some(binary) = expr.downcast_ref::<df_expr::BinaryExpr>() {
127134
can_binary_be_pushed_down(binary, schema)

0 commit comments

Comments
 (0)