Skip to content

Commit 815b6d7

Browse files
committed
change PyExpr -> PySortExpr
1 parent 6353aa9 commit 815b6d7

File tree

5 files changed

+60
-58
lines changed

5 files changed

+60
-58
lines changed

src/context.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::catalog::{PyCatalog, PyTable};
3535
use crate::dataframe::PyDataFrame;
3636
use crate::dataset::Dataset;
3737
use crate::errors::{py_datafusion_err, DataFusionError};
38-
use crate::expr::PyExpr;
38+
use crate::expr::sort_expr::PySortExpr;
3939
use crate::physical_plan::PyExecutionPlan;
4040
use crate::record_batch::PyRecordBatchStream;
4141
use crate::sql::logical::PyLogicalPlan;
@@ -333,7 +333,7 @@ impl PySessionContext {
333333
table_partition_cols: Vec<(String, String)>,
334334
file_extension: &str,
335335
schema: Option<PyArrowType<Schema>>,
336-
file_sort_order: Option<Vec<Vec<PyExpr>>>,
336+
file_sort_order: Option<Vec<Vec<PySortExpr>>>,
337337
py: Python,
338338
) -> PyResult<()> {
339339
let options = ListingOptions::new(Arc::new(ParquetFormat::new()))
@@ -589,7 +589,7 @@ impl PySessionContext {
589589
file_extension: &str,
590590
skip_metadata: bool,
591591
schema: Option<PyArrowType<Schema>>,
592-
file_sort_order: Option<Vec<Vec<PyExpr>>>,
592+
file_sort_order: Option<Vec<Vec<PySortExpr>>>,
593593
py: Python,
594594
) -> PyResult<()> {
595595
let mut options = ParquetReadOptions::default()
@@ -890,7 +890,7 @@ impl PySessionContext {
890890
file_extension: &str,
891891
skip_metadata: bool,
892892
schema: Option<PyArrowType<Schema>>,
893-
file_sort_order: Option<Vec<Vec<PyExpr>>>,
893+
file_sort_order: Option<Vec<Vec<PySortExpr>>>,
894894
py: Python,
895895
) -> PyResult<PyDataFrame> {
896896
let mut options = ParquetReadOptions::default()

src/dataframe.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@ use pyo3::types::{PyCapsule, PyTuple};
4040
use tokio::task::JoinHandle;
4141

4242
use crate::errors::py_datafusion_err;
43-
use crate::expr::to_sort_expressions;
43+
use crate::expr::sort_expr::to_sort_expressions;
4444
use crate::physical_plan::PyExecutionPlan;
4545
use crate::record_batch::PyRecordBatchStream;
4646
use crate::sql::logical::PyLogicalPlan;
4747
use crate::utils::{get_tokio_runtime, wait_for_future};
48-
use crate::{errors::DataFusionError, expr::PyExpr};
48+
use crate::{errors::DataFusionError, expr::{PyExpr, sort_expr::PySortExpr}};
4949

5050
/// A PyDataFrame is a representation of a logical plan and an API to compose statements.
5151
/// Use it to build a plan and `.collect()` to execute the plan and collect the result.
@@ -196,7 +196,7 @@ impl PyDataFrame {
196196
}
197197

198198
#[pyo3(signature = (*exprs))]
199-
fn sort(&self, exprs: Vec<PyExpr>) -> PyResult<Self> {
199+
fn sort(&self, exprs: Vec<PySortExpr>) -> PyResult<Self> {
200200
let exprs = to_sort_expressions(exprs);
201201
let df = self.df.as_ref().clone().sort(exprs)?;
202202
Ok(Self::new(df))

src/expr.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ pub mod unnest;
9494
pub mod unnest_expr;
9595
pub mod window;
9696

97+
use sort_expr::{PySortExpr, to_sort_expressions};
98+
9799
/// A PyExpr that can be used on a DataFrame
98100
#[pyclass(name = "Expr", module = "datafusion.expr", subclass)]
99101
#[derive(Debug, Clone)]
@@ -518,7 +520,7 @@ impl PyExpr {
518520

519521
// Expression Function Builder functions
520522

521-
pub fn order_by(&self, order_by: Vec<PyExpr>) -> PyExprFuncBuilder {
523+
pub fn order_by(&self, order_by: Vec<PySortExpr>) -> PyExprFuncBuilder {
522524
self.expr
523525
.clone()
524526
.order_by(to_sort_expressions(order_by))
@@ -562,20 +564,9 @@ impl From<ExprFuncBuilder> for PyExprFuncBuilder {
562564
}
563565
}
564566

565-
pub fn to_sort_expressions(order_by: Vec<PyExpr>) -> Vec<Expr> {
566-
order_by
567-
.iter()
568-
.map(|e| e.expr.clone())
569-
.map(|e| match e {
570-
Expr::Sort(_) => e,
571-
_ => e.sort(true, true),
572-
})
573-
.collect()
574-
}
575-
576567
#[pymethods]
577568
impl PyExprFuncBuilder {
578-
pub fn order_by(&self, order_by: Vec<PyExpr>) -> PyExprFuncBuilder {
569+
pub fn order_by(&self, order_by: Vec<PySortExpr>) -> PyExprFuncBuilder {
579570
self.builder
580571
.clone()
581572
.order_by(to_sort_expressions(order_by))

src/expr/sort_expr.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,17 @@ impl Display for PySortExpr {
5151
}
5252
}
5353

54+
pub fn to_sort_expressions(order_by: Vec<PySortExpr>) -> Vec<SortExpr> {
55+
order_by
56+
.iter()
57+
.map(|e| e.sort.clone())
58+
.collect()
59+
}
60+
5461
#[pymethods]
5562
impl PySortExpr {
5663
fn expr(&self) -> PyResult<PyExpr> {
57-
Ok((*self.sort.expr).clone().into())
64+
Ok(self.sort.expr.clone().into())
5865
}
5966

6067
fn ascending(&self) -> PyResult<bool> {

src/functions.rs

Lines changed: 41 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ use crate::common::data_type::NullTreatment;
2525
use crate::context::PySessionContext;
2626
use crate::errors::DataFusionError;
2727
use crate::expr::conditional_expr::PyCaseBuilder;
28-
use crate::expr::to_sort_expressions;
28+
use crate::expr::sort_expr::to_sort_expressions;
29+
use crate::expr::sort_expr::PySortExpr;
2930
use crate::expr::window::PyWindowFrame;
3031
use crate::expr::PyExpr;
3132
use datafusion::common::{Column, ScalarValue, TableReference};
@@ -35,15 +36,15 @@ use datafusion::functions_aggregate;
3536
use datafusion::logical_expr::expr::Alias;
3637
use datafusion::logical_expr::sqlparser::ast::NullTreatment as DFNullTreatment;
3738
use datafusion::logical_expr::{
38-
expr::{find_df_window_func, Sort, WindowFunction},
39+
expr::{find_df_window_func, WindowFunction},
3940
lit, Expr, WindowFunctionDefinition,
4041
};
4142

4243
fn add_builder_fns_to_aggregate(
4344
agg_fn: Expr,
4445
distinct: Option<bool>,
4546
filter: Option<PyExpr>,
46-
order_by: Option<Vec<PyExpr>>,
47+
order_by: Option<Vec<PySortExpr>>,
4748
null_treatment: Option<NullTreatment>,
4849
) -> PyResult<PyExpr> {
4950
// Since ExprFuncBuilder::new() is private, we can guarantee initializing
@@ -174,14 +175,16 @@ fn regexp_replace(
174175
}
175176
/// Creates a new Sort Expr
176177
#[pyfunction]
177-
fn order_by(expr: PyExpr, asc: bool, nulls_first: bool) -> PyResult<PyExpr> {
178-
Ok(PyExpr {
179-
expr: datafusion::logical_expr::Expr::Sort(Sort {
180-
expr: Box::new(expr.expr),
181-
asc,
182-
nulls_first,
183-
}),
184-
})
178+
fn order_by(expr: PyExpr, asc: bool, nulls_first: bool) -> PyResult<PySortExpr> {
179+
Ok(
180+
PySortExpr::from(
181+
datafusion::logical_expr::expr::Sort {
182+
expr: expr.expr,
183+
asc,
184+
nulls_first,
185+
}
186+
)
187+
)
185188
}
186189

187190
/// Creates a new Alias Expr
@@ -342,7 +345,7 @@ macro_rules! aggregate_function {
342345
$($arg: PyExpr),*,
343346
distinct: Option<bool>,
344347
filter: Option<PyExpr>,
345-
order_by: Option<Vec<PyExpr>>,
348+
order_by: Option<Vec<PySortExpr>>,
346349
null_treatment: Option<NullTreatment>
347350
) -> PyResult<PyExpr> {
348351
let agg_fn = functions_aggregate::expr_fn::$NAME($($arg.into()),*);
@@ -363,7 +366,7 @@ macro_rules! aggregate_function_vec_args {
363366
$($arg: PyExpr),*,
364367
distinct: Option<bool>,
365368
filter: Option<PyExpr>,
366-
order_by: Option<Vec<PyExpr>>,
369+
order_by: Option<Vec<PySortExpr>>,
367370
null_treatment: Option<NullTreatment>
368371
) -> PyResult<PyExpr> {
369372
let agg_fn = functions_aggregate::expr_fn::$NAME(vec![$($arg.into()),*]);
@@ -677,7 +680,7 @@ pub fn first_value(
677680
expr: PyExpr,
678681
distinct: Option<bool>,
679682
filter: Option<PyExpr>,
680-
order_by: Option<Vec<PyExpr>>,
683+
order_by: Option<Vec<PySortExpr>>,
681684
null_treatment: Option<NullTreatment>,
682685
) -> PyResult<PyExpr> {
683686
// If we initialize the UDAF with order_by directly, then it gets over-written by the builder
@@ -687,19 +690,20 @@ pub fn first_value(
687690
}
688691

689692
// nth_value requires a non-expr argument
690-
#[pyfunction]
691-
#[pyo3(signature = (expr, n, distinct=None, filter=None, order_by=None, null_treatment=None))]
692-
pub fn nth_value(
693-
expr: PyExpr,
694-
n: i64,
695-
distinct: Option<bool>,
696-
filter: Option<PyExpr>,
697-
order_by: Option<Vec<PyExpr>>,
698-
null_treatment: Option<NullTreatment>,
699-
) -> PyResult<PyExpr> {
700-
let agg_fn = datafusion::functions_aggregate::nth_value::nth_value(vec![expr.expr, lit(n)]);
701-
add_builder_fns_to_aggregate(agg_fn, distinct, filter, order_by, null_treatment)
702-
}
693+
// #[pyfunction]
694+
// #[pyo3(signature = (expr, n, distinct=None, filter=None, order_by=None, null_treatment=None))]
695+
// pub fn nth_value(
696+
// expr: PyExpr,
697+
// n: i64,
698+
// distinct: Option<bool>,
699+
// filter: Option<PyExpr>,
700+
// order_by: Option<Vec<PySortExpr>>,
701+
// null_treatment: Option<NullTreatment>,
702+
// ) -> PyResult<PyExpr> {
703+
// // @todo: Commenting this function out for now as it requires some reworking
704+
// let agg_fn = datafusion::functions_aggregate::nth_value::nth_value(vec![expr.expr, lit(n)]);
705+
// add_builder_fns_to_aggregate(agg_fn, distinct, filter, order_by, null_treatment)
706+
// }
703707

704708
// string_agg requires a non-expr argument
705709
#[pyfunction]
@@ -709,7 +713,7 @@ pub fn string_agg(
709713
delimiter: String,
710714
distinct: Option<bool>,
711715
filter: Option<PyExpr>,
712-
order_by: Option<Vec<PyExpr>>,
716+
order_by: Option<Vec<PySortExpr>>,
713717
null_treatment: Option<NullTreatment>,
714718
) -> PyResult<PyExpr> {
715719
let agg_fn = datafusion::functions_aggregate::string_agg::string_agg(expr.expr, lit(delimiter));
@@ -719,7 +723,7 @@ pub fn string_agg(
719723
fn add_builder_fns_to_window(
720724
window_fn: Expr,
721725
partition_by: Option<Vec<PyExpr>>,
722-
order_by: Option<Vec<PyExpr>>,
726+
order_by: Option<Vec<PySortExpr>>,
723727
) -> PyResult<PyExpr> {
724728
// Since ExprFuncBuilder::new() is private, set an empty partition and then
725729
// override later if appropriate.
@@ -749,7 +753,7 @@ pub fn lead(
749753
shift_offset: i64,
750754
default_value: Option<ScalarValue>,
751755
partition_by: Option<Vec<PyExpr>>,
752-
order_by: Option<Vec<PyExpr>>,
756+
order_by: Option<Vec<PySortExpr>>,
753757
) -> PyResult<PyExpr> {
754758
let window_fn = window_function::lead(arg.expr, Some(shift_offset), default_value);
755759

@@ -763,7 +767,7 @@ pub fn lag(
763767
shift_offset: i64,
764768
default_value: Option<ScalarValue>,
765769
partition_by: Option<Vec<PyExpr>>,
766-
order_by: Option<Vec<PyExpr>>,
770+
order_by: Option<Vec<PySortExpr>>,
767771
) -> PyResult<PyExpr> {
768772
let window_fn = window_function::lag(arg.expr, Some(shift_offset), default_value);
769773

@@ -772,7 +776,7 @@ pub fn lag(
772776

773777
#[pyfunction]
774778
#[pyo3(signature = (partition_by=None, order_by=None))]
775-
pub fn rank(partition_by: Option<Vec<PyExpr>>, order_by: Option<Vec<PyExpr>>) -> PyResult<PyExpr> {
779+
pub fn rank(partition_by: Option<Vec<PyExpr>>, order_by: Option<Vec<PySortExpr>>) -> PyResult<PyExpr> {
776780
let window_fn = window_function::rank();
777781

778782
add_builder_fns_to_window(window_fn, partition_by, order_by)
@@ -782,7 +786,7 @@ pub fn rank(partition_by: Option<Vec<PyExpr>>, order_by: Option<Vec<PyExpr>>) ->
782786
#[pyo3(signature = (partition_by=None, order_by=None))]
783787
pub fn dense_rank(
784788
partition_by: Option<Vec<PyExpr>>,
785-
order_by: Option<Vec<PyExpr>>,
789+
order_by: Option<Vec<PySortExpr>>,
786790
) -> PyResult<PyExpr> {
787791
let window_fn = window_function::dense_rank();
788792

@@ -793,7 +797,7 @@ pub fn dense_rank(
793797
#[pyo3(signature = (partition_by=None, order_by=None))]
794798
pub fn percent_rank(
795799
partition_by: Option<Vec<PyExpr>>,
796-
order_by: Option<Vec<PyExpr>>,
800+
order_by: Option<Vec<PySortExpr>>,
797801
) -> PyResult<PyExpr> {
798802
let window_fn = window_function::percent_rank();
799803

@@ -804,7 +808,7 @@ pub fn percent_rank(
804808
#[pyo3(signature = (partition_by=None, order_by=None))]
805809
pub fn cume_dist(
806810
partition_by: Option<Vec<PyExpr>>,
807-
order_by: Option<Vec<PyExpr>>,
811+
order_by: Option<Vec<PySortExpr>>,
808812
) -> PyResult<PyExpr> {
809813
let window_fn = window_function::cume_dist();
810814

@@ -816,7 +820,7 @@ pub fn cume_dist(
816820
pub fn ntile(
817821
arg: PyExpr,
818822
partition_by: Option<Vec<PyExpr>>,
819-
order_by: Option<Vec<PyExpr>>,
823+
order_by: Option<Vec<PySortExpr>>,
820824
) -> PyResult<PyExpr> {
821825
let window_fn = window_function::ntile(arg.into());
822826

@@ -965,7 +969,7 @@ pub(crate) fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
965969
m.add_wrapped(wrap_pyfunction!(regr_syy))?;
966970
m.add_wrapped(wrap_pyfunction!(first_value))?;
967971
m.add_wrapped(wrap_pyfunction!(last_value))?;
968-
m.add_wrapped(wrap_pyfunction!(nth_value))?;
972+
// m.add_wrapped(wrap_pyfunction!(nth_value))?;
969973
m.add_wrapped(wrap_pyfunction!(bit_and))?;
970974
m.add_wrapped(wrap_pyfunction!(bit_or))?;
971975
m.add_wrapped(wrap_pyfunction!(bit_xor))?;

0 commit comments

Comments
 (0)