diff --git a/datafusion/expr-common/src/accumulator.rs b/datafusion/expr-common/src/accumulator.rs index fc4e90114beea..72b0b24e1a40f 100644 --- a/datafusion/expr-common/src/accumulator.rs +++ b/datafusion/expr-common/src/accumulator.rs @@ -58,17 +58,37 @@ pub trait Accumulator: Send + Sync + Debug { /// running sum. fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>; - /// Returns the final aggregate value, consuming the internal state. + /// Returns the final aggregate value. /// /// For example, the `SUM` accumulator maintains a running sum, /// and `evaluate` will produce that running sum as its output. /// - /// This function should not be called twice, otherwise it will - /// result in potentially non-deterministic behavior. - /// /// This function gets `&mut self` to allow for the accumulator to build /// arrow-compatible internal state that can be returned without copying - /// when possible (for example distinct strings) + /// when possible (for example distinct strings). + /// + /// # Window Frame Queries + /// + /// When used in a window context without [`Self::supports_retract_batch`], + /// `evaluate()` may be called multiple times on the same accumulator instance + /// (once per row in the partition). In this case, implementations **must not** + /// consume or modify internal state. Use references or clones to preserve state: + /// + /// ```ignore + /// // GOOD: Preserves state for subsequent calls + /// fn evaluate(&mut self) -> Result { + /// calculate_result(&self.values) // Use reference + /// } + /// + /// // BAD: Consumes state, breaks window queries + /// fn evaluate(&mut self) -> Result { + /// calculate_result(std::mem::take(&mut self.values)) + /// } + /// ``` + /// + /// For efficient sliding window calculations, consider implementing + /// [`Self::retract_batch`] which allows DataFusion to incrementally + /// update state rather than calling `evaluate()` repeatedly. fn evaluate(&mut self) -> Result; /// Returns the allocated size required for this accumulator, in diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index a69176e1173a5..599f481a5e986 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -208,6 +208,7 @@ impl AggregateUDF { self.inner.window_function_display_name(params) } + #[allow(deprecated)] pub fn is_nullable(&self) -> bool { self.inner.is_nullable() } @@ -528,10 +529,32 @@ pub trait AggregateUDFImpl: Debug + DynEq + DynHash + Send + Sync { /// Whether the aggregate function is nullable. /// + /// **DEPRECATED**: This method is deprecated and will be removed in a future version. + /// Nullability should instead be specified in [`Self::return_field`] which can provide + /// more context-aware nullability based on input field properties. + /// /// Nullable means that the function could return `null` for any inputs. /// For example, aggregate functions like `COUNT` always return a non null value /// but others like `MIN` will return `NULL` if there is nullable input. /// Note that if the function is declared as *not* nullable, make sure the [`AggregateUDFImpl::default_value`] is `non-null` + /// + /// # Migration Guide + /// + /// If you need to override nullability, implement [`Self::return_field`] instead: + /// + /// ```ignore + /// fn return_field(&self, arg_fields: &[FieldRef]) -> Result { + /// let arg_types: Vec<_> = arg_fields.iter().map(|f| f.data_type()).cloned().collect(); + /// let data_type = self.return_type(&arg_types)?; + /// // Specify nullability based on your function's logic + /// let nullable = arg_fields.iter().any(|f| f.is_nullable()); + /// Ok(Arc::new(Field::new(self.name(), data_type, nullable))) + /// } + /// ``` + #[deprecated( + since = "52.0.0", + note = "Use `return_field` to specify nullability instead of `is_nullable`" + )] fn is_nullable(&self) -> bool { true } @@ -1091,6 +1114,13 @@ pub fn udaf_default_window_function_display_name( } /// Encapsulates default implementation of [`AggregateUDFImpl::return_field`]. +/// +/// This function computes nullability based on input field nullability: +/// - The result is nullable if ANY input field is nullable +/// - The result is non-nullable only if ALL input fields are non-nullable +/// +/// This replaces the previous behavior of always deferring to `is_nullable()`, +/// providing more accurate nullability inference for aggregate functions. pub fn udaf_default_return_field( func: &F, arg_fields: &[FieldRef], @@ -1098,10 +1128,13 @@ pub fn udaf_default_return_field( let arg_types: Vec<_> = arg_fields.iter().map(|f| f.data_type()).cloned().collect(); let data_type = func.return_type(&arg_types)?; + // Determine nullability: result is nullable if any input is nullable + let is_nullable = arg_fields.iter().any(|f| f.is_nullable()); + Ok(Arc::new(Field::new( func.name(), data_type, - func.is_nullable(), + is_nullable, ))) } @@ -1247,6 +1280,7 @@ impl AggregateUDFImpl for AliasedAggregateUDFImpl { self.inner.return_field(arg_fields) } + #[allow(deprecated)] fn is_nullable(&self) -> bool { self.inner.is_nullable() } @@ -1343,7 +1377,7 @@ mod test { &self.signature } fn return_type(&self, _args: &[DataType]) -> Result { - unimplemented!() + Ok(DataType::Float64) } fn accumulator( &self, @@ -1383,7 +1417,7 @@ mod test { &self.signature } fn return_type(&self, _args: &[DataType]) -> Result { - unimplemented!() + Ok(DataType::Float64) } fn accumulator( &self, @@ -1424,4 +1458,71 @@ mod test { value.hash(hasher); hasher.finish() } + + #[test] + fn test_return_field_nullability_from_nullable_input() { + // Test that return_field derives nullability from input field nullability + use arrow::datatypes::Field; + use std::sync::Arc; + + let udf = AggregateUDF::from(AMeanUdf::new()); + + // Create a nullable input field + let nullable_field = Arc::new(Field::new("col", DataType::Float64, true)); + let return_field = udf.return_field(&[nullable_field]).unwrap(); + + // When input is nullable, output should be nullable + assert!(return_field.is_nullable()); + } + + #[test] + fn test_return_field_nullability_from_non_nullable_input() { + // Test that return_field respects non-nullable input fields + use arrow::datatypes::Field; + use std::sync::Arc; + + let udf = AggregateUDF::from(AMeanUdf::new()); + + // Create a non-nullable input field + let non_nullable_field = Arc::new(Field::new("col", DataType::Float64, false)); + let return_field = udf.return_field(&[non_nullable_field]).unwrap(); + + // When input is non-nullable, output should also be non-nullable + assert!(!return_field.is_nullable()); + } + + #[test] + fn test_return_field_nullability_with_mixed_inputs() { + // Test that return_field is nullable if ANY input is nullable + use arrow::datatypes::Field; + use std::sync::Arc; + + let a = AggregateUDF::from(AMeanUdf::new()); + + // With multiple inputs (typical for aggregates in more complex scenarios) + let nullable_field = Arc::new(Field::new("col1", DataType::Float64, true)); + let non_nullable_field = Arc::new(Field::new("col2", DataType::Float64, false)); + + let return_field = a.return_field(&[non_nullable_field, nullable_field]).unwrap(); + + // If ANY input is nullable, result should be nullable + assert!(return_field.is_nullable()); + } + + #[test] + fn test_return_field_preserves_return_type() { + // Test that return_field correctly preserves the return type + use arrow::datatypes::Field; + use std::sync::Arc; + + let udf = AggregateUDF::from(AMeanUdf::new()); + + let nullable_field = Arc::new(Field::new("col", DataType::Float64, true)); + let return_field = udf.return_field(&[nullable_field]).unwrap(); + + // Verify data type is preserved + assert_eq!(*return_field.data_type(), DataType::Float64); + // Verify name matches function name + assert_eq!(return_field.name(), "a"); + } } diff --git a/datafusion/functions-aggregate/src/percentile_cont.rs b/datafusion/functions-aggregate/src/percentile_cont.rs index a4e8332626b00..a9bd4744d23ec 100644 --- a/datafusion/functions-aggregate/src/percentile_cont.rs +++ b/datafusion/functions-aggregate/src/percentile_cont.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::fmt::Debug; use std::mem::{size_of, size_of_val}; use std::sync::Arc; @@ -52,7 +53,7 @@ use datafusion_expr::{ }; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::accumulate; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filtered_null_mask; -use datafusion_functions_aggregate_common::utils::GenericDistinctBuffer; +use datafusion_functions_aggregate_common::utils::{GenericDistinctBuffer, Hashable}; use datafusion_macros::user_doc; use crate::utils::validate_percentile_expr; @@ -427,14 +428,51 @@ impl Accumulator for PercentileContAccumulator { } fn evaluate(&mut self) -> Result { - let d = std::mem::take(&mut self.all_values); - let value = calculate_percentile::(d, self.percentile); + let value = calculate_percentile::(&mut self.all_values, self.percentile); ScalarValue::new_primitive::(value, &T::DATA_TYPE) } fn size(&self) -> usize { size_of_val(self) + self.all_values.capacity() * size_of::() } + + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } + let mut to_remove: HashMap = HashMap::new(); + for i in 0..values[0].len() { + let v = ScalarValue::try_from_array(&values[0], i)?; + if !v.is_null() { + *to_remove.entry(v).or_default() += 1; + } + } + + let mut i = 0; + while i < self.all_values.len() { + let k = + ScalarValue::new_primitive::(Some(self.all_values[i]), &T::DATA_TYPE)?; + if let Some(count) = to_remove.get_mut(&k) + && *count > 0 + { + self.all_values.swap_remove(i); + *count -= 1; + if *count == 0 { + to_remove.remove(&k); + if to_remove.is_empty() { + break; + } + } + } else { + i += 1; + } + } + Ok(()) + } + + fn supports_retract_batch(&self) -> bool { + true + } } /// The percentile_cont groups accumulator accumulates the raw input values @@ -549,13 +587,13 @@ impl GroupsAccumulator fn evaluate(&mut self, emit_to: EmitTo) -> Result { // Emit values - let emit_group_values = emit_to.take_needed(&mut self.group_values); + let mut emit_group_values = emit_to.take_needed(&mut self.group_values); // Calculate percentile for each group let mut evaluate_result_builder = PrimitiveBuilder::::with_capacity(emit_group_values.len()); - for values in emit_group_values { - let value = calculate_percentile::(values, self.percentile); + for values in &mut emit_group_values { + let value = calculate_percentile::(values.as_mut_slice(), self.percentile); evaluate_result_builder.append_option(value); } @@ -652,17 +690,31 @@ impl Accumulator for DistinctPercentileContAccumula } fn evaluate(&mut self) -> Result { - let d = std::mem::take(&mut self.distinct_values.values) - .into_iter() - .map(|v| v.0) - .collect::>(); - let value = calculate_percentile::(d, self.percentile); + let mut values: Vec = + self.distinct_values.values.iter().map(|v| v.0).collect(); + let value = calculate_percentile::(&mut values, self.percentile); ScalarValue::new_primitive::(value, &T::DATA_TYPE) } fn size(&self) -> usize { size_of_val(self) + self.distinct_values.size() } + + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } + + let arr = values[0].as_primitive::(); + for value in arr.iter().flatten() { + self.distinct_values.values.remove(&Hashable(value)); + } + Ok(()) + } + + fn supports_retract_batch(&self) -> bool { + true + } } /// Calculate the percentile value for a given set of values. @@ -672,8 +724,12 @@ impl Accumulator for DistinctPercentileContAccumula /// For percentile p and n values: /// - If p * (n-1) is an integer, return the value at that position /// - Otherwise, interpolate between the two closest values +/// +/// Note: This function takes a mutable slice and sorts it in place, but does not +/// consume the data. This is important for window frame queries where evaluate() +/// may be called multiple times on the same accumulator state. fn calculate_percentile( - mut values: Vec, + values: &mut [T::Native], percentile: f64, ) -> Option { let cmp = |x: &T::Native, y: &T::Native| x.compare(*y); diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index 77e9f60afd3cf..1c10818c091db 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -384,14 +384,13 @@ impl Accumulator for SimpleStringAggAccumulator { } fn evaluate(&mut self) -> Result { - let result = if self.has_value { - ScalarValue::LargeUtf8(Some(std::mem::take(&mut self.accumulated_string))) + if self.has_value { + Ok(ScalarValue::LargeUtf8(Some( + self.accumulated_string.clone(), + ))) } else { - ScalarValue::LargeUtf8(None) - }; - - self.has_value = false; - Ok(result) + Ok(ScalarValue::LargeUtf8(None)) + } } fn size(&self) -> usize { diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index f6ce68917e03b..a72e4f36ae084 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -1126,6 +1126,102 @@ ORDER BY tags, timestamp; 4 tag2 90 75 80 95 5 tag2 100 80 80 100 +########### +# Issue #19612: Test that percentile_cont produces correct results +# in window frame queries. Previously percentile_cont consumed its internal state +# during evaluate(), causing incorrect results when called multiple times. +########### + +# Test percentile_cont sliding window (same as median) +query ITRR +SELECT + timestamp, + tags, + value, + percentile_cont(value, 0.5) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING + ) AS value_percentile_50 +FROM median_window_test +ORDER BY tags, timestamp; +---- +1 tag1 10 15 +2 tag1 20 20 +3 tag1 30 30 +4 tag1 40 40 +5 tag1 50 45 +1 tag2 60 65 +2 tag2 70 70 +3 tag2 80 80 +4 tag2 90 90 +5 tag2 100 95 + +# Test percentile_cont non-sliding window +query ITRRRR +SELECT + timestamp, + tags, + value, + percentile_cont(value, 0.5) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS value_percentile_unbounded_preceding, + percentile_cont(value, 0.5) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS value_percentile_unbounded_both, + percentile_cont(value, 0.5) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + ) AS value_percentile_unbounded_following +FROM median_window_test +ORDER BY tags, timestamp; +---- +1 tag1 10 10 30 30 +2 tag1 20 15 30 35 +3 tag1 30 20 30 40 +4 tag1 40 25 30 45 +5 tag1 50 30 30 50 +1 tag2 60 60 80 80 +2 tag2 70 65 80 85 +3 tag2 80 70 80 90 +4 tag2 90 75 80 95 +5 tag2 100 80 80 100 + +# Test percentile_cont with different percentile values +query ITRRR +SELECT + timestamp, + tags, + value, + percentile_cont(value, 0.25) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS p25, + percentile_cont(value, 0.75) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS p75 +FROM median_window_test +ORDER BY tags, timestamp; +---- +1 tag1 10 10 10 +2 tag1 20 12.5 17.5 +3 tag1 30 15 25 +4 tag1 40 17.5 32.5 +5 tag1 50 20 40 +1 tag2 60 60 60 +2 tag2 70 62.5 67.5 +3 tag2 80 65 75 +4 tag2 90 67.5 82.5 +5 tag2 100 70 90 + statement ok DROP TABLE median_window_test; @@ -8246,3 +8342,44 @@ query R select percentile_cont(null, 0.5); ---- NULL + +# Test string_agg window frame behavior (fix for issue #19612) +statement ok +CREATE TABLE string_agg_window_test ( + id INT, + grp VARCHAR, + val VARCHAR +); + +statement ok +INSERT INTO string_agg_window_test (id, grp, val) VALUES +(1, 'A', 'a'), +(2, 'A', 'b'), +(3, 'A', 'c'), +(1, 'B', 'x'), +(2, 'B', 'y'), +(3, 'B', 'z'); + +# Test string_agg with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW +# The function should maintain state correctly across multiple evaluate() calls +query ITT +SELECT + id, + grp, + string_agg(val, ',') OVER ( + PARTITION BY grp + ORDER BY id + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS cumulative_string +FROM string_agg_window_test +ORDER BY grp, id; +---- +1 A a +2 A a,b +3 A a,b,c +1 B x +2 B x,y +3 B x,y,z + +statement ok +DROP TABLE string_agg_window_test; diff --git a/docs/source/library-user-guide/functions/adding-udfs.md b/docs/source/library-user-guide/functions/adding-udfs.md index 5d033ae3f9e97..422d498757d77 100644 --- a/docs/source/library-user-guide/functions/adding-udfs.md +++ b/docs/source/library-user-guide/functions/adding-udfs.md @@ -1350,6 +1350,12 @@ async fn main() -> Result<()> { [`create_udaf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udaf.html [`advanced_udaf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udaf.rs +### Nullability of Aggregate Functions + +By default, aggregate functions return nullable output if any of their input fields are nullable. This behavior is automatically computed by DataFusion and works correctly for most functions like `MIN`, `MAX`, `SUM`, and `AVG`. + +For more advanced control over nullability or to understand how it works, see the [Aggregate UDF Nullability Guide](udf-nullability.md). + ## Adding a Table UDF A User-Defined Table Function (UDTF) is a function that takes parameters and returns a `TableProvider`. diff --git a/docs/source/library-user-guide/functions/udf-nullability.md b/docs/source/library-user-guide/functions/udf-nullability.md new file mode 100644 index 0000000000000..5a87070129ed2 --- /dev/null +++ b/docs/source/library-user-guide/functions/udf-nullability.md @@ -0,0 +1,268 @@ + + +# Aggregate UDF Nullability + +## Overview + +DataFusion distinguishes between the nullability of aggregate function outputs and their input fields. This document explains how nullability is computed for aggregate User Defined Functions (UDAFs) and how to correctly specify it in your custom functions. + +## The Change: From `is_nullable()` to `return_field()` + +### What Changed? + +In earlier versions of DataFusion, aggregate function nullability was controlled by the `is_nullable()` method on `AggregateUDFImpl`. This method returned a simple boolean indicating whether the function could ever return `NULL`, regardless of input characteristics. + +**This approach has been deprecated** in favor of computing nullability more accurately via the `return_field()` method, which has access to the actual field metadata of the inputs. This allows for more precise nullability inference based on whether input fields are nullable. + +### Why the Change? + +1. **Input-aware nullability**: The new approach allows the function's nullability to depend on the nullability of its inputs. For example, `MIN(column)` should only be nullable if `column` is nullable. + +2. **Consistency with scalar UDFs**: Scalar UDFs already follow this pattern using `return_field_from_args()`, and aggregate UDFs now align with this design. + +3. **More accurate schema inference**: Query optimizers and executors can now make better decisions about whether intermediate or final results can be null. + +## How Nullability Works Now + +By default, the `return_field()` method in `AggregateUDFImpl` computes the output field using this logic: + +```text +output_is_nullable = ANY input field is nullable +``` + +In other words: +- **If ALL input fields are non-nullable**, the output is **non-nullable** +- **If ANY input field is nullable**, the output is **nullable** + +This default behavior works well for most aggregate functions like `MIN`, `MAX`, `SUM`, and `AVG`. + +## Implementing Custom Aggregate UDFs + +### Default Behavior (Recommended) + +For most aggregate functions, you don't need to override `return_field()`. The default implementation will correctly infer nullability from inputs: + +```rust +use std::sync::Arc; +use std::any::Any; +use arrow::datatypes::{DataType, Field, FieldRef}; +use datafusion_common::Result; +use datafusion_expr::{AggregateUDFImpl, Signature, Volatility}; +use datafusion_functions_aggregate_common::accumulator::AccumulatorArgs; + +#[derive(Debug)] +struct MyAggregateFunction { + signature: Signature, +} + +impl MyAggregateFunction { + fn new() -> Self { + Self { + signature: Signature::uniform( + 1, + vec![DataType::Float64], + Volatility::Immutable, + ), + } + } +} + +impl AggregateUDFImpl for MyAggregateFunction { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "my_agg" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Float64) + } + + // Don't override return_field() - let the default handle nullability + // The default will make the output nullable if any input is nullable + + fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { + // Implementation... + # unimplemented!() + } + + // ... other required methods +} +``` + +### Custom Nullability (Advanced) + +If your function has special nullability semantics, you can override `return_field()`: + +```rust +use std::sync::Arc; +use arrow::datatypes::Field; + +impl AggregateUDFImpl for MyAggregateFunction { + // ... other methods ... + + fn return_field(&self, arg_fields: &[FieldRef]) -> Result { + let arg_types: Vec<_> = arg_fields + .iter() + .map(|f| f.data_type()) + .cloned() + .collect(); + let data_type = self.return_type(&arg_types)?; + + // Example: COUNT always returns non-nullable i64, regardless of input + let is_nullable = false; // COUNT never returns NULL + + Ok(Arc::new(Field::new(self.name(), data_type, is_nullable))) + } +} +``` + +## Migration Guide + +If you have existing code that uses `is_nullable()`, here's how to migrate: + +### Before (Deprecated) + +```rust +impl AggregateUDFImpl for MyFunction { + fn is_nullable(&self) -> bool { + // Only returns true or false, independent of inputs + true + } +} +``` + +### After (Recommended) + +**Option 1: Use the default (simplest)** + +```rust +impl AggregateUDFImpl for MyFunction { + // Remove is_nullable() entirely + // The default return_field() will compute nullability from inputs +} +``` + +**Option 2: Override return_field() for custom logic** + +```rust +impl AggregateUDFImpl for MyFunction { + fn return_field(&self, arg_fields: &[FieldRef]) -> Result { + let arg_types: Vec<_> = arg_fields + .iter() + .map(|f| f.data_type()) + .cloned() + .collect(); + let data_type = self.return_type(&arg_types)?; + + // Your custom nullability logic here + let is_nullable = arg_fields.iter().any(|f| f.is_nullable()); + + Ok(Arc::new(Field::new(self.name(), data_type, is_nullable))) + } +} +``` + +## Examples + +### Example 1: MIN Function + +`MIN` returns the smallest value from a group. It should be nullable if and only if its input is nullable: + +```rust +impl AggregateUDFImpl for MinFunction { + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + Ok(arg_types[0].clone()) + } + + // No need to override return_field() - the default handles it correctly: + // - If input is nullable, output is nullable + // - If input is non-nullable, output is non-nullable + + fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { + // Implementation... + } +} +``` + +### Example 2: COUNT Function + +`COUNT` always returns a non-nullable integer, regardless of input nullability: + +```rust +impl AggregateUDFImpl for CountFunction { + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int64) + } + + // Override return_field to always return non-nullable + fn return_field(&self, _arg_fields: &[FieldRef]) -> Result { + Ok(Arc::new(Field::new(self.name(), DataType::Int64, false))) + } + + fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { + // Implementation... + } +} +``` + +## Deprecation Timeline + +- **Deprecated in v42.0.0**: The `is_nullable()` method in `AggregateUDFImpl` is now deprecated +- **Will be removed in a future version**: `is_nullable()` will be removed entirely + +During the deprecation period, both `is_nullable()` and the new `return_field()` approach work, but new code should use `return_field()`. + +## Troubleshooting + +### Issue: "Function `X` uses deprecated `is_nullable()`" + +**Solution**: Remove the `is_nullable()` implementation and let the default `return_field()` handle nullability, or override `return_field()` directly. + +### Issue: "Output field nullability is incorrect" + +**Check**: +1. Are your input fields correctly marked as nullable/non-nullable? +2. Does your function need custom nullability logic? If so, override `return_field()`. + +### Issue: "Tests fail with null value where non-null expected" + +**Check**: +1. Verify that your function's accumulator actually returns a non-null default value when the input is empty and your function declares non-nullable output +2. Override `return_field()` to adjust the nullability if needed + +## See Also + +- [Adding User Defined Functions](adding-udfs.md) - General guide to implementing UDFs +- [Scalar UDF Nullability](#) - Similar concepts for scalar UDFs (which already use `return_field_from_args()`)