Skip to content

Commit 24a59f2

Browse files
committed
Upgrade to DF50 release candidate
1 parent daa1e1b commit 24a59f2

File tree

9 files changed

+416
-437
lines changed

9 files changed

+416
-437
lines changed

Cargo.lock

Lines changed: 331 additions & 405 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ protoc = [ "datafusion-substrait/protoc" ]
3434
substrait = ["dep:datafusion-substrait"]
3535

3636
[dependencies]
37-
tokio = { version = "1.45", features = ["macros", "rt", "rt-multi-thread", "sync"] }
38-
pyo3 = { version = "0.24", features = ["extension-module", "abi3", "abi3-py39"] }
39-
pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"]}
37+
tokio = { version = "1.47", features = ["macros", "rt", "rt-multi-thread", "sync"] }
38+
pyo3 = { version = "0.25", features = ["extension-module", "abi3", "abi3-py39"] }
39+
pyo3-async-runtimes = { version = "0.25", features = ["tokio-runtime"]}
4040
pyo3-log = "0.12.4"
41-
arrow = { version = "55.1.0", features = ["pyarrow"] }
42-
datafusion = { version = "49.0.2", features = ["avro", "unicode_expressions"] }
43-
datafusion-substrait = { version = "49.0.2", optional = true }
44-
datafusion-proto = { version = "49.0.2" }
45-
datafusion-ffi = { version = "49.0.2" }
41+
arrow = { version = "56", features = ["pyarrow"] }
42+
datafusion = { version = "50", features = ["avro", "unicode_expressions"] }
43+
datafusion-substrait = { version = "50", optional = true }
44+
datafusion-proto = { version = "50" }
45+
datafusion-ffi = { version = "50" }
4646
prost = "0.13.1" # keep in line with `datafusion-substrait`
4747
uuid = { version = "1.18", features = ["v4"] }
4848
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
@@ -54,7 +54,7 @@ log = "0.4.27"
5454

5555
[build-dependencies]
5656
prost-types = "0.13.1" # keep in line with `datafusion-substrait`
57-
pyo3-build-config = "0.24"
57+
pyo3-build-config = "0.25"
5858

5959
[lib]
6060
name = "datafusion_python"
@@ -63,3 +63,9 @@ crate-type = ["cdylib", "rlib"]
6363
[profile.release]
6464
lto = true
6565
codegen-units = 1
66+
67+
[patch.crates-io]
68+
datafusion = { git = "https://github.com/apache/datafusion.git", branch = "branch-50" }
69+
datafusion-substrait = { git = "https://github.com/apache/datafusion.git", branch = "branch-50" }
70+
datafusion-proto = { git = "https://github.com/apache/datafusion.git", branch = "branch-50" }
71+
datafusion-ffi = { git = "https://github.com/apache/datafusion.git", branch = "branch-50" }

python/datafusion/functions.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
WindowFrame,
3232
expr_list_to_raw_expr_list,
3333
sort_list_to_raw_sort_list,
34+
sort_or_default,
3435
)
3536

3637
if TYPE_CHECKING:
@@ -1659,7 +1660,7 @@ def approx_median(expression: Expr, filter: Optional[Expr] = None) -> Expr:
16591660

16601661

16611662
def approx_percentile_cont(
1662-
expression: Expr,
1663+
sort_expression: Expr | SortExpr,
16631664
percentile: float,
16641665
num_centroids: Optional[int] = None,
16651666
filter: Optional[Expr] = None,
@@ -1680,21 +1681,26 @@ def approx_percentile_cont(
16801681
the options ``order_by``, ``null_treatment``, and ``distinct``.
16811682
16821683
Args:
1683-
expression: Values for which to find the approximate percentile
1684+
sort_expression: Values for which to find the approximate percentile
16841685
percentile: This must be between 0.0 and 1.0, inclusive
16851686
num_centroids: Max bin size for the t-digest algorithm
16861687
filter: If provided, only compute against rows for which the filter is True
16871688
"""
1689+
sort_expr_raw = sort_or_default(sort_expression)
16881690
filter_raw = filter.expr if filter is not None else None
16891691
return Expr(
16901692
f.approx_percentile_cont(
1691-
expression.expr, percentile, num_centroids=num_centroids, filter=filter_raw
1693+
sort_expr_raw, percentile, num_centroids=num_centroids, filter=filter_raw
16921694
)
16931695
)
16941696

16951697

16961698
def approx_percentile_cont_with_weight(
1697-
expression: Expr, weight: Expr, percentile: float, filter: Optional[Expr] = None
1699+
sort_expression: Expr | SortExpr,
1700+
weight: Expr,
1701+
percentile: float,
1702+
num_centroids: Optional[int] = None,
1703+
filter: Optional[Expr] = None,
16981704
) -> Expr:
16991705
"""Returns the value of the weighted approximate percentile.
17001706
@@ -1705,16 +1711,22 @@ def approx_percentile_cont_with_weight(
17051711
the options ``order_by``, ``null_treatment``, and ``distinct``.
17061712
17071713
Args:
1708-
expression: Values for which to find the approximate percentile
1714+
sort_expression: Values for which to find the approximate percentile
17091715
weight: Relative weight for each of the values in ``expression``
17101716
percentile: This must be between 0.0 and 1.0, inclusive
1717+
num_centroids: Max bin size for the t-digest algorithm
17111718
filter: If provided, only compute against rows for which the filter is True
17121719
17131720
"""
1721+
sort_expr_raw = sort_or_default(sort_expression)
17141722
filter_raw = filter.expr if filter is not None else None
17151723
return Expr(
17161724
f.approx_percentile_cont_with_weight(
1717-
expression.expr, weight.expr, percentile, filter=filter_raw
1725+
sort_expr_raw,
1726+
weight.expr,
1727+
percentile,
1728+
num_centroids=num_centroids,
1729+
filter=filter_raw,
17181730
)
17191731
)
17201732

python/tests/test_aggregation.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,27 @@ def test_aggregation_stats(df, agg_expr, calc_expected):
130130
(f.median(column("b"), filter=column("a") != 2), pa.array([5]), False),
131131
(f.approx_median(column("b"), filter=column("a") != 2), pa.array([5]), False),
132132
(f.approx_percentile_cont(column("b"), 0.5), pa.array([4]), False),
133+
(
134+
f.approx_percentile_cont(
135+
column("b").sort(ascending=True, nulls_first=False),
136+
0.5,
137+
num_centroids=2,
138+
),
139+
pa.array([4]),
140+
False,
141+
),
133142
(
134143
f.approx_percentile_cont_with_weight(column("b"), lit(0.6), 0.5),
135144
pa.array([6], type=pa.float64()),
136145
False,
137146
),
147+
(
148+
f.approx_percentile_cont_with_weight(
149+
column("b").sort(ascending=False, nulls_first=False), lit(0.6), 0.5
150+
),
151+
pa.array([6], type=pa.float64()),
152+
False,
153+
),
138154
(
139155
f.approx_percentile_cont_with_weight(
140156
column("b"), lit(0.6), 0.5, filter=column("a") != lit(3)

src/common/data_type.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,16 @@ impl DataTypeMap {
224224
DataType::Dictionary(_, _) => Err(py_datafusion_err(DataFusionError::NotImplemented(
225225
format!("{arrow_type:?}"),
226226
))),
227+
DataType::Decimal32(precision, scale) => Ok(DataTypeMap::new(
228+
DataType::Decimal32(*precision, *scale),
229+
PythonType::Float,
230+
SqlType::DECIMAL,
231+
)),
232+
DataType::Decimal64(precision, scale) => Ok(DataTypeMap::new(
233+
DataType::Decimal64(*precision, *scale),
234+
PythonType::Float,
235+
SqlType::DECIMAL,
236+
)),
227237
DataType::Decimal128(precision, scale) => Ok(DataTypeMap::new(
228238
DataType::Decimal128(*precision, *scale),
229239
PythonType::Float,
@@ -612,6 +622,8 @@ impl DataTypeMap {
612622
DataType::Struct(_) => "Struct",
613623
DataType::Union(_, _) => "Union",
614624
DataType::Dictionary(_, _) => "Dictionary",
625+
DataType::Decimal32(_, _) => "Decimal32",
626+
DataType::Decimal64(_, _) => "Decimal64",
615627
DataType::Decimal128(_, _) => "Decimal128",
616628
DataType::Decimal256(_, _) => "Decimal256",
617629
DataType::Map(_, _) => "Map",

src/dataframe.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,6 @@ impl PyParquetColumnOptions {
276276
statistics_enabled,
277277
bloom_filter_fpp,
278278
bloom_filter_ndv,
279-
..Default::default()
280279
},
281280
}
282281
}

src/expr/sort_expr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::fmt::{self, Display, Formatter};
2323
#[pyclass(name = "SortExpr", module = "datafusion.expr", subclass)]
2424
#[derive(Clone)]
2525
pub struct PySortExpr {
26-
sort: SortExpr,
26+
pub(crate) sort: SortExpr,
2727
}
2828

2929
impl From<PySortExpr> for SortExpr {

src/functions.rs

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -319,21 +319,25 @@ fn find_window_fn(
319319
}
320320

321321
/// Creates a new Window function expression
322+
#[allow(clippy::too_many_arguments)]
322323
#[pyfunction]
323-
#[pyo3(signature = (name, args, partition_by=None, order_by=None, window_frame=None, ctx=None))]
324+
#[pyo3(signature = (name, args, partition_by=None, order_by=None, window_frame=None, filter=None, distinct=false, ctx=None))]
324325
fn window(
325326
name: &str,
326327
args: Vec<PyExpr>,
327328
partition_by: Option<Vec<PyExpr>>,
328329
order_by: Option<Vec<PySortExpr>>,
329330
window_frame: Option<PyWindowFrame>,
331+
filter: Option<PyExpr>,
332+
distinct: bool,
330333
ctx: Option<PySessionContext>,
331334
) -> PyResult<PyExpr> {
332335
let fun = find_window_fn(name, ctx)?;
333336

334337
let window_frame = window_frame
335338
.map(|w| w.into())
336339
.unwrap_or(WindowFrame::new(order_by.as_ref().map(|v| !v.is_empty())));
340+
let filter = filter.map(|f| f.expr.into());
337341

338342
Ok(PyExpr {
339343
expr: datafusion::logical_expr::Expr::WindowFunction(Box::new(WindowFunction {
@@ -351,6 +355,8 @@ fn window(
351355
.map(|x| x.into())
352356
.collect::<Vec<_>>(),
353357
window_frame,
358+
filter,
359+
distinct,
354360
null_treatment: None,
355361
},
356362
})),
@@ -649,36 +655,36 @@ aggregate_function!(approx_median);
649655
// aggregate_function!(grouping);
650656

651657
#[pyfunction]
652-
#[pyo3(signature = (expression, percentile, num_centroids=None, filter=None))]
658+
#[pyo3(signature = (sort_expression, percentile, num_centroids=None, filter=None))]
653659
pub fn approx_percentile_cont(
654-
expression: PyExpr,
660+
sort_expression: PySortExpr,
655661
percentile: f64,
656662
num_centroids: Option<i64>, // enforces optional arguments at the end, currently
657663
filter: Option<PyExpr>,
658664
) -> PyDataFusionResult<PyExpr> {
659-
let args = if let Some(num_centroids) = num_centroids {
660-
vec![expression.expr, lit(percentile), lit(num_centroids)]
661-
} else {
662-
vec![expression.expr, lit(percentile)]
663-
};
664-
let udaf = functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf();
665-
let agg_fn = udaf.call(args);
665+
let agg_fn = functions_aggregate::expr_fn::approx_percentile_cont(
666+
sort_expression.sort,
667+
lit(percentile),
668+
num_centroids.map(lit),
669+
);
666670

667671
add_builder_fns_to_aggregate(agg_fn, None, filter, None, None)
668672
}
669673

670674
#[pyfunction]
671-
#[pyo3(signature = (expression, weight, percentile, filter=None))]
675+
#[pyo3(signature = (sort_expression, weight, percentile, num_centroids=None, filter=None))]
672676
pub fn approx_percentile_cont_with_weight(
673-
expression: PyExpr,
677+
sort_expression: PySortExpr,
674678
weight: PyExpr,
675679
percentile: f64,
680+
num_centroids: Option<i64>,
676681
filter: Option<PyExpr>,
677682
) -> PyDataFusionResult<PyExpr> {
678683
let agg_fn = functions_aggregate::expr_fn::approx_percentile_cont_with_weight(
679-
expression.expr,
684+
sort_expression.sort,
680685
weight.expr,
681686
lit(percentile),
687+
num_centroids.map(lit),
682688
);
683689

684690
add_builder_fns_to_aggregate(agg_fn, None, filter, None, None)

src/udwf.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::utils::{parse_volatility, validate_pycapsule};
3333
use datafusion::arrow::datatypes::DataType;
3434
use datafusion::arrow::pyarrow::{FromPyArrow, PyArrowType, ToPyArrow};
3535
use datafusion::error::{DataFusionError, Result};
36+
use datafusion::logical_expr::ptr_eq::PtrEq;
3637
use datafusion::logical_expr::{
3738
PartitionEvaluator, PartitionEvaluatorFactory, Signature, Volatility, WindowUDF, WindowUDFImpl,
3839
};
@@ -271,11 +272,12 @@ impl PyWindowUDF {
271272
}
272273
}
273274

275+
#[derive(Hash, Eq, PartialEq)]
274276
pub struct MultiColumnWindowUDF {
275277
name: String,
276278
signature: Signature,
277279
return_type: DataType,
278-
partition_evaluator_factory: PartitionEvaluatorFactory,
280+
partition_evaluator_factory: PtrEq<PartitionEvaluatorFactory>,
279281
}
280282

281283
impl std::fmt::Debug for MultiColumnWindowUDF {
@@ -303,7 +305,7 @@ impl MultiColumnWindowUDF {
303305
name,
304306
signature,
305307
return_type,
306-
partition_evaluator_factory,
308+
partition_evaluator_factory: partition_evaluator_factory.into(),
307309
}
308310
}
309311
}

0 commit comments

Comments
 (0)