Skip to content

Commit 50d20dd

Browse files
adriangbclaude
andauthored
preserve Field metadata in first_value/last_value (#19335)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> Closes #19336 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> The `first_value` and `last_value` aggregate functions were not preserving Field metadata from their input arguments. This caused metadata to be lost when using these functions, which affects downstream consumers that rely on metadata (e.g., for DISTINCT ON queries which use `first_value` internally). ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - Implement `return_field()` for `FirstValue` to preserve input field metadata - Implement `return_field()` for `LastValue` to preserve input field metadata - Add `get_metadata` UDF for testing metadata preservation in sqllogictest - Add regression tests for `first_value`, `last_value`, `DISTINCT ON`, `DISTINCT`, and grouped columns ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes, new sqllogictest tests are added in `metadata.slt` that verify metadata is preserved through various aggregate operations. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> Yes, Field metadata is now correctly preserved when using `first_value()` and `last_value()` aggregate functions. This is a bug fix that improves metadata propagation. --- 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 1f26716 commit 50d20dd

File tree

3 files changed

+131
-5
lines changed

3 files changed

+131
-5
lines changed

datafusion/functions-aggregate/src/first_last.rs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use datafusion_common::cast::as_boolean_array;
4141
use datafusion_common::utils::{compare_rows, extract_row_at_idx_to_buf, get_row_at_idx};
4242
use datafusion_common::{
4343
DataFusionError, Result, ScalarValue, arrow_datafusion_err, internal_err,
44+
not_impl_err,
4445
};
4546
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
4647
use datafusion_expr::utils::{AggregateOrderSensitivity, format_state_name};
@@ -133,8 +134,20 @@ impl AggregateUDFImpl for FirstValue {
133134
&self.signature
134135
}
135136

136-
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
137-
Ok(arg_types[0].clone())
137+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
138+
not_impl_err!("Not called because the return_field_from_args is implemented")
139+
}
140+
141+
fn return_field(&self, arg_fields: &[FieldRef]) -> Result<FieldRef> {
142+
// Preserve metadata from the first argument field
143+
Ok(Arc::new(
144+
Field::new(
145+
self.name(),
146+
arg_fields[0].data_type().clone(),
147+
true, // always nullable, there may be no rows
148+
)
149+
.with_metadata(arg_fields[0].metadata().clone()),
150+
))
138151
}
139152

140153
fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
@@ -1071,8 +1084,20 @@ impl AggregateUDFImpl for LastValue {
10711084
&self.signature
10721085
}
10731086

1074-
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
1075-
Ok(arg_types[0].clone())
1087+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1088+
not_impl_err!("Not called because the return_field_from_args is implemented")
1089+
}
1090+
1091+
fn return_field(&self, arg_fields: &[FieldRef]) -> Result<FieldRef> {
1092+
// Preserve metadata from the first argument field
1093+
Ok(Arc::new(
1094+
Field::new(
1095+
self.name(),
1096+
arg_fields[0].data_type().clone(),
1097+
true, // always nullable, there may be no rows
1098+
)
1099+
.with_metadata(arg_fields[0].metadata().clone()),
1100+
))
10761101
}
10771102

10781103
fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {

datafusion/sqllogictest/src/test_context.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use arrow::record_batch::RecordBatch;
3232
use datafusion::catalog::{
3333
CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, Session,
3434
};
35-
use datafusion::common::{not_impl_err, DataFusionError, Result};
35+
use datafusion::common::{exec_err, not_impl_err, DataFusionError, Result, ScalarValue};
3636
use datafusion::functions::math::abs;
3737
use datafusion::logical_expr::async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl};
3838
use datafusion::logical_expr::{
@@ -398,6 +398,58 @@ pub async fn register_metadata_tables(ctx: &SessionContext) {
398398
.unwrap();
399399

400400
ctx.register_batch("table_with_metadata", batch).unwrap();
401+
402+
// Register the get_metadata UDF for testing metadata preservation
403+
ctx.register_udf(ScalarUDF::from(GetMetadataUdf::new()));
404+
}
405+
406+
/// UDF to extract metadata from a field for testing purposes
407+
/// Usage: get_metadata(expr, 'key') -> returns the metadata value or NULL
408+
#[derive(Debug, PartialEq, Eq, Hash)]
409+
struct GetMetadataUdf {
410+
signature: Signature,
411+
}
412+
413+
impl GetMetadataUdf {
414+
fn new() -> Self {
415+
Self {
416+
signature: Signature::any(2, Volatility::Immutable),
417+
}
418+
}
419+
}
420+
421+
impl ScalarUDFImpl for GetMetadataUdf {
422+
fn as_any(&self) -> &dyn Any {
423+
self
424+
}
425+
426+
fn name(&self) -> &str {
427+
"get_metadata"
428+
}
429+
430+
fn signature(&self) -> &Signature {
431+
&self.signature
432+
}
433+
434+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
435+
Ok(DataType::Utf8)
436+
}
437+
438+
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
439+
// Get the metadata key from the second argument (must be a string literal)
440+
let key = match &args.args[1] {
441+
ColumnarValue::Scalar(ScalarValue::Utf8(Some(k))) => k.clone(),
442+
_ => {
443+
return exec_err!("get_metadata second argument must be a string literal")
444+
}
445+
};
446+
447+
// Get metadata from the first argument's field
448+
let metadata_value = args.arg_fields[0].metadata().get(&key).cloned();
449+
450+
// Return as a scalar (same value for all rows)
451+
Ok(ColumnarValue::Scalar(ScalarValue::Utf8(metadata_value)))
452+
}
401453
}
402454

403455
/// Create a UDF function named "example". See the `sample_udf.rs` example

datafusion/sqllogictest/test_files/metadata.slt

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,56 @@ order by 1 asc nulls last;
235235
3 1
236236
NULL 1
237237

238+
# Regression test: first_value should preserve metadata
239+
query IT
240+
select first_value(id order by id asc nulls last), get_metadata(first_value(id order by id asc nulls last), 'metadata_key')
241+
from table_with_metadata;
242+
----
243+
1 the id field
244+
245+
# Regression test: last_value should preserve metadata
246+
query IT
247+
select last_value(id order by id asc nulls first), get_metadata(last_value(id order by id asc nulls first), 'metadata_key')
248+
from table_with_metadata;
249+
----
250+
3 the id field
238251

252+
# Regression test: DISTINCT ON should preserve metadata (uses first_value internally)
253+
query ITTT
254+
select distinct on (id) id, get_metadata(id, 'metadata_key'), name, get_metadata(name, 'metadata_key')
255+
from table_with_metadata order by id asc nulls last;
256+
----
257+
1 the id field NULL the name field
258+
3 the id field baz the name field
259+
NULL the id field bar the name field
260+
261+
# Regression test: DISTINCT should preserve metadata
262+
query ITTT
263+
with res AS (
264+
select distinct id, name from table_with_metadata
265+
)
266+
select id, get_metadata(id, 'metadata_key'), name, get_metadata(name, 'metadata_key')
267+
from res
268+
order by id asc nulls last;
269+
----
270+
1 the id field NULL the name field
271+
3 the id field baz the name field
272+
NULL the id field bar the name field
273+
274+
# Regression test: grouped columns should preserve metadata
275+
query ITTT
276+
with res AS (
277+
select name, count(*), id
278+
from table_with_metadata
279+
group by id, name
280+
)
281+
select id, get_metadata(id, 'metadata_key'), name, get_metadata(name, 'metadata_key')
282+
from res
283+
order by id asc nulls last, name asc nulls last
284+
----
285+
1 the id field NULL the name field
286+
3 the id field baz the name field
287+
NULL the id field bar the name field
239288

240289
statement ok
241290
drop table table_with_metadata;

0 commit comments

Comments
 (0)