Skip to content

Commit 9bbeef4

Browse files
committed
fix bounded but not fixed roundtrip
1 parent f17253b commit 9bbeef4

File tree

2 files changed

+41
-23
lines changed

2 files changed

+41
-23
lines changed

datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,7 +1163,9 @@ mod tests {
11631163
use crate::expressions::PhysicalSortExpr;
11641164
use crate::projection::ProjectionExec;
11651165
use crate::streaming::{PartitionStream, StreamingTableExec};
1166-
use crate::windows::{create_window_expr, BoundedWindowAggExec, InputOrderMode};
1166+
use crate::windows::{
1167+
create_udwf_window_expr, create_window_expr, BoundedWindowAggExec, InputOrderMode,
1168+
};
11671169
use crate::{execute_stream, get_plan_string, ExecutionPlan};
11681170

11691171
use arrow_array::builder::{Int64Builder, UInt64Builder};
@@ -1180,13 +1182,14 @@ mod tests {
11801182
WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition,
11811183
};
11821184
use datafusion_functions_aggregate::count::count_udaf;
1183-
use datafusion_functions_window::nth_value::first_value_udwf;
11841185
use datafusion_functions_window::nth_value::last_value_udwf;
11851186
use datafusion_functions_window::nth_value::nth_value_udwf;
1186-
use datafusion_physical_expr::expressions::{col, lit, Column};
1187+
use datafusion_physical_expr::expressions::{col, Column, Literal};
11871188
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
11881189

1190+
use crate::common::collect;
11891191
use crate::memory::MemoryExec;
1192+
use datafusion_physical_expr::window::BuiltInWindowExpr;
11901193
use futures::future::Shared;
11911194
use futures::{pin_mut, ready, FutureExt, Stream, StreamExt};
11921195
use itertools::Itertools;
@@ -1524,30 +1527,39 @@ mod tests {
15241527
)
15251528
.map(|e| Arc::new(e) as Arc<dyn ExecutionPlan>)?;
15261529
let col_a = col("a", &schema)?;
1527-
let nth_value_func1 = NthValue::nth(
1528-
"nth_value(-1)",
1529-
Arc::clone(&col_a),
1530-
DataType::Int32,
1531-
1,
1530+
let nth_value_func1 = create_udwf_window_expr(
1531+
&nth_value_udwf(),
1532+
&[
1533+
Arc::clone(&col_a),
1534+
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
1535+
],
1536+
&schema,
1537+
"nth_value(-1)".to_string(),
15321538
false,
15331539
)?
15341540
.reverse_expr()
15351541
.unwrap();
1536-
let nth_value_func2 = NthValue::nth(
1537-
"nth_value(-2)",
1538-
Arc::clone(&col_a),
1539-
DataType::Int32,
1540-
2,
1542+
let nth_value_func2 = create_udwf_window_expr(
1543+
&nth_value_udwf(),
1544+
&[
1545+
Arc::clone(&col_a),
1546+
Arc::new(Literal::new(ScalarValue::Int32(Some(2)))),
1547+
],
1548+
&schema,
1549+
"nth_value(-2)".to_string(),
15411550
false,
15421551
)?
15431552
.reverse_expr()
15441553
.unwrap();
1545-
let last_value_func = Arc::new(NthValue::last(
1546-
"last",
1547-
Arc::clone(&col_a),
1548-
DataType::Int32,
1554+
1555+
let last_value_func = create_udwf_window_expr(
1556+
&last_value_udwf(),
1557+
&[Arc::clone(&col_a)],
1558+
&schema,
1559+
"last".to_string(),
15491560
false,
1550-
)) as _;
1561+
)?;
1562+
15511563
let window_exprs = vec![
15521564
// LAST_VALUE(a)
15531565
Arc::new(BuiltInWindowExpr::new(

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::vec;
2424
use arrow::array::RecordBatch;
2525
use arrow::csv::WriterBuilder;
2626
use arrow::datatypes::{Fields, TimeUnit};
27-
use datafusion::physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
27+
use datafusion::physical_expr::aggregate::AggregateExprBuilder;
2828
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
2929
use datafusion_expr::dml::InsertOp;
3030
use datafusion_functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf;
@@ -47,7 +47,7 @@ use datafusion::datasource::physical_plan::{
4747
};
4848
use datafusion::execution::FunctionRegistry;
4949
use datafusion::functions_aggregate::sum::sum_udaf;
50-
use datafusion::functions_window::nth_value::nth_value_udwf;
50+
use datafusion::functions_window::nth_value::first_value_udwf;
5151
use datafusion::logical_expr::{create_udf, JoinType, Operator, Volatility};
5252
use datafusion::physical_expr::expressions::Literal;
5353
use datafusion::physical_expr::window::{BuiltInWindowExpr, SlidingAggregateWindowExpr};
@@ -281,10 +281,16 @@ fn roundtrip_window() -> Result<()> {
281281
WindowFrameBound::CurrentRow,
282282
);
283283

284-
let nth_value_window =
285-
create_udwf_window_expr(&nth_value_udwf(), &[col("a", &schema)?], schema.as_ref(), "FIRST_VALUE(a) PARTITION BY [b] ORDER BY [a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW".to_string(), false)?;
284+
let first_value_window = create_udwf_window_expr(
285+
&first_value_udwf(),
286+
&[col("a", &schema)?],
287+
schema.as_ref(),
288+
"FIRST_VALUE(a) PARTITION BY [b] ORDER BY [a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW".to_string(),
289+
false,
290+
)?;
291+
// "FIRST_VALUE(a) PARTITION BY [b] ORDER BY [a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW",
286292
let builtin_window_expr = Arc::new(BuiltInWindowExpr::new(
287-
nth_value_window,
293+
first_value_window,
288294
&[col("b", &schema)?],
289295
&LexOrdering {
290296
inner: vec![PhysicalSortExpr {

0 commit comments

Comments
 (0)