Skip to content

Commit e567cb9

Browse files
Fix serde of window lead/lag defaults (#20608)
## Which issue does this PR close? - Closes #20607. ## Rationale for this change Don't lose values in serde ## What changes are included in this PR? Preservation of window function arguments, particularly default value ## Are these changes tested? A RTT is included ## Are there any user-facing changes? Users with distributed query engines such as Ballista will have more queries work than before **note**: AI was used to create this PR
1 parent 451c79f commit e567cb9

File tree

3 files changed

+59
-1
lines changed

3 files changed

+59
-1
lines changed

datafusion/physical-plan/src/windows/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,18 @@ impl WindowUDFExpr {
226226
pub fn fun(&self) -> &Arc<WindowUDF> {
227227
&self.fun
228228
}
229+
230+
/// Returns all arguments passed to this window function.
231+
///
232+
/// Unlike [`StandardWindowFunctionExpr::expressions`], which returns
233+
/// only the expressions that need batch evaluation (and may filter out
234+
/// literal offset/default args like those for `lead`/`lag`), this
235+
/// method returns the complete, unfiltered argument list. This is
236+
/// needed for serialization so that all arguments survive a
237+
/// protobuf round-trip.
238+
pub fn args(&self) -> &[Arc<dyn PhysicalExpr>] {
239+
&self.args
240+
}
229241
}
230242

231243
impl StandardWindowFunctionExpr for WindowUDFExpr {

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ pub fn serialize_physical_window_expr(
109109
proto_converter: &dyn PhysicalProtoConverterExtension,
110110
) -> Result<protobuf::PhysicalWindowExprNode> {
111111
let expr = window_expr.as_any();
112-
let args = window_expr.expressions().to_vec();
112+
let mut args = window_expr.expressions().to_vec();
113113
let window_frame = window_expr.get_window_frame();
114114

115115
let (window_function, fun_definition, ignore_nulls, distinct) =
@@ -145,6 +145,7 @@ pub fn serialize_physical_window_expr(
145145
{
146146
let mut buf = Vec::new();
147147
codec.try_encode_udwf(expr.fun(), &mut buf)?;
148+
args = expr.args().to_vec();
148149
(
149150
physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(
150151
expr.fun().name().to_string(),

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3055,3 +3055,48 @@ fn test_session_id_rotation_with_execution_plans() -> Result<()> {
30553055

30563056
Ok(())
30573057
}
3058+
3059+
/// Tests that `lead` window function with offset and default value args
3060+
/// survives a protobuf round-trip. This is a regression test for a bug
3061+
/// where `expressions()` (used during serialization) returns only the
3062+
/// column expression for lead/lag, silently dropping the offset and
3063+
/// default value literal args.
3064+
#[test]
3065+
fn roundtrip_lead_with_default_value() -> Result<()> {
3066+
use datafusion::functions_window::lead_lag::lead_udwf;
3067+
3068+
let field_a = Field::new("a", DataType::Int64, false);
3069+
let field_b = Field::new("b", DataType::Int64, false);
3070+
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
3071+
3072+
// lead(a, 2, 42) — column a, offset 2, default value 42
3073+
let lead_window = create_udwf_window_expr(
3074+
&lead_udwf(),
3075+
&[col("a", &schema)?, lit(2i64), lit(42i64)],
3076+
schema.as_ref(),
3077+
"test lead with default".to_string(),
3078+
false,
3079+
)?;
3080+
3081+
let udwf_expr = Arc::new(StandardWindowExpr::new(
3082+
lead_window,
3083+
&[col("b", &schema)?],
3084+
&[PhysicalSortExpr {
3085+
expr: col("a", &schema)?,
3086+
options: SortOptions {
3087+
descending: false,
3088+
nulls_first: false,
3089+
},
3090+
}],
3091+
Arc::new(WindowFrame::new(None)),
3092+
));
3093+
3094+
let input = Arc::new(EmptyExec::new(schema.clone()));
3095+
3096+
roundtrip_test(Arc::new(BoundedWindowAggExec::try_new(
3097+
vec![udwf_expr],
3098+
input,
3099+
InputOrderMode::Sorted,
3100+
true,
3101+
)?))
3102+
}

0 commit comments

Comments
 (0)