From 2e70075904665f4aaf0ceda690d23e23cdfaac23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 3 Jan 2026 15:58:33 +0100 Subject: [PATCH 01/18] Speedup accumulators --- .../groups_accumulator/accumulate.rs | 49 +++++++++++++++---- .../aggregate/groups_accumulator/prim_op.rs | 3 +- datafusion/functions-aggregate/src/average.rs | 9 ++-- datafusion/functions-aggregate/src/count.rs | 4 +- 4 files changed, 50 insertions(+), 15 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index 29b8752048c3e..e40620ac0879b 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -59,6 +59,9 @@ pub struct NullState { /// If `seen_values[i]` is false, have not seen any values that /// pass the filter yet for group `i` seen_values: BooleanBufferBuilder, + /// If true, all groups seen so far have seen at least one non-null value + /// and no filters have been applied. + all_seen: bool, } impl Default for NullState { @@ -71,6 +74,7 @@ impl NullState { pub fn new() -> Self { Self { seen_values: BooleanBufferBuilder::new(0), + all_seen: true, } } @@ -107,14 +111,18 @@ impl NullState { T: ArrowPrimitiveType + Send, F: FnMut(usize, T::Native) + Send, { - // ensure the seen_values is big enough (start everything at - // "not seen" valid) - let seen_values = - initialize_builder(&mut self.seen_values, total_num_groups, false); - accumulate(group_indices, values, opt_filter, |group_index, value| { - seen_values.set_bit(group_index, true); - value_fn(group_index, value); - }); + if self.all_seen && opt_filter.is_none() && values.null_count() == 0 { + initialize_builder(&mut self.seen_values, total_num_groups, true); + accumulate(group_indices, values, None, value_fn); + } else { + self.all_seen = false; + let seen_values = + initialize_builder(&mut self.seen_values, total_num_groups, false); + accumulate(group_indices, values, opt_filter, |group_index, value| { + seen_values.set_bit(group_index, true); + value_fn(group_index, value); + }); + } } /// Invokes `value_fn(group_index, value)` for each non null, non @@ -140,6 +148,16 @@ impl NullState { let data = values.values(); assert_eq!(data.len(), group_indices.len()); + if self.all_seen && opt_filter.is_none() && values.null_count() == 0 { + initialize_builder(&mut self.seen_values, total_num_groups, true); + group_indices + .iter() + .zip(data.iter()) + .for_each(|(&group_index, new_value)| value_fn(group_index, new_value)); + return; + } + + self.all_seen = false; // ensure the seen_values is big enough (start everything at // "not seen" valid) let seen_values = @@ -215,7 +233,10 @@ impl NullState { let nulls: BooleanBuffer = self.seen_values.finish(); let nulls = match emit_to { - EmitTo::All => nulls, + EmitTo::All => { + self.all_seen = true; + nulls + } EmitTo::First(n) => { // split off the first N values in seen_values let first_n_null: BooleanBuffer = nulls.slice(0, n); @@ -662,10 +683,18 @@ mod test { let num_groups: usize = rng.random_range(2..1000); let max_group = num_groups - 1; - let group_indices: Vec = (0..num_values) + let mut group_indices: Vec = (0..num_values) .map(|_| rng.random_range(0..max_group)) .collect(); + // ensure all groups are seen at least once to match DataFusion behavior + // where total_num_groups is the number of groups seen so far + for (i, group_index) in group_indices.iter_mut().enumerate() { + if i < num_groups { + *group_index = i; + } + } + let values: Vec = (0..num_values).map(|_| rng.random()).collect(); // 10% chance of false diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs index 656b95d140dde..34c4766c86802 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs @@ -106,7 +106,8 @@ where opt_filter, total_num_groups, |group_index, new_value| { - let value = &mut self.values[group_index]; + // SAFETY: group_index is guaranteed to be in bounds + let value = unsafe { self.values.get_unchecked_mut(group_index) }; (self.prim_fn)(value, new_value); }, ); diff --git a/datafusion/functions-aggregate/src/average.rs b/datafusion/functions-aggregate/src/average.rs index 46a8dbf9540b6..3505e4bb7c6a9 100644 --- a/datafusion/functions-aggregate/src/average.rs +++ b/datafusion/functions-aggregate/src/average.rs @@ -821,7 +821,8 @@ where opt_filter, total_num_groups, |group_index, new_value| { - let sum = &mut self.sums[group_index]; + // SAFETY: group_index is guaranteed to be in bounds + let sum = unsafe { self.sums.get_unchecked_mut(group_index) }; *sum = sum.add_wrapping(new_value); self.counts[group_index] += 1; @@ -904,7 +905,9 @@ where opt_filter, total_num_groups, |group_index, partial_count| { - self.counts[group_index] += partial_count; + // SAFETY: group_index is guaranteed to be in bounds + let count = unsafe { self.counts.get_unchecked_mut(group_index) }; + *count += partial_count; }, ); @@ -916,7 +919,7 @@ where opt_filter, total_num_groups, |group_index, new_value: ::Native| { - let sum = &mut self.sums[group_index]; + let sum = unsafe { self.sums.get_unchecked_mut(group_index) }; *sum = sum.add_wrapping(new_value); }, ); diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index a7c819acafea8..10cc2ad33f563 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -598,7 +598,9 @@ impl GroupsAccumulator for CountGroupsAccumulator { values.logical_nulls().as_ref(), opt_filter, |group_index| { - self.counts[group_index] += 1; + // SAFETY: group_index is guaranteed to be in bounds + let count = unsafe { self.counts.get_unchecked_mut(group_index) }; + *count += 1; }, ); From 5f9deeaf7998d50029e3ffe06845de90c07f074e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 3 Jan 2026 19:25:52 +0100 Subject: [PATCH 02/18] Speedup accumulators --- .../src/aggregate/groups_accumulator/accumulate.rs | 10 +--------- datafusion/functions-aggregate/src/average.rs | 1 + 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index e40620ac0879b..08be69d62ca5c 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -683,18 +683,10 @@ mod test { let num_groups: usize = rng.random_range(2..1000); let max_group = num_groups - 1; - let mut group_indices: Vec = (0..num_values) + let group_indices: Vec = (0..num_values) .map(|_| rng.random_range(0..max_group)) .collect(); - // ensure all groups are seen at least once to match DataFusion behavior - // where total_num_groups is the number of groups seen so far - for (i, group_index) in group_indices.iter_mut().enumerate() { - if i < num_groups { - *group_index = i; - } - } - let values: Vec = (0..num_values).map(|_| rng.random()).collect(); // 10% chance of false diff --git a/datafusion/functions-aggregate/src/average.rs b/datafusion/functions-aggregate/src/average.rs index 3505e4bb7c6a9..2efa305b72703 100644 --- a/datafusion/functions-aggregate/src/average.rs +++ b/datafusion/functions-aggregate/src/average.rs @@ -919,6 +919,7 @@ where opt_filter, total_num_groups, |group_index, new_value: ::Native| { + // SAFETY: group_index is guaranteed to be in bounds let sum = unsafe { self.sums.get_unchecked_mut(group_index) }; *sum = sum.add_wrapping(new_value); }, From fb249d641fa1845a47f89be5499f65853c64edbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 3 Jan 2026 20:01:19 +0100 Subject: [PATCH 03/18] Speedup accumulators --- .../groups_accumulator/accumulate.rs | 48 +++++++++++++++---- .../aggregate/groups_accumulator/bool_op.rs | 2 +- .../aggregate/groups_accumulator/prim_op.rs | 2 +- 3 files changed, 41 insertions(+), 11 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index 08be69d62ca5c..9f97ca74ce38b 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -62,6 +62,7 @@ pub struct NullState { /// If true, all groups seen so far have seen at least one non-null value /// and no filters have been applied. all_seen: bool, + seen_values_size: usize, } impl Default for NullState { @@ -75,6 +76,7 @@ impl NullState { Self { seen_values: BooleanBufferBuilder::new(0), all_seen: true, + seen_values_size: 0, } } @@ -112,12 +114,21 @@ impl NullState { F: FnMut(usize, T::Native) + Send, { if self.all_seen && opt_filter.is_none() && values.null_count() == 0 { - initialize_builder(&mut self.seen_values, total_num_groups, true); accumulate(group_indices, values, None, value_fn); + self.seen_values_size = total_num_groups; } else { + let prev_seen: bool = self.all_seen && self.seen_values_size > 0; + self.all_seen = false; let seen_values = initialize_builder(&mut self.seen_values, total_num_groups, false); + + if prev_seen { + // restore previous seen values + for i in 0..self.seen_values_size { + seen_values.set_bit(i, true); + } + } accumulate(group_indices, values, opt_filter, |group_index, value| { seen_values.set_bit(group_index, true); value_fn(group_index, value); @@ -149,19 +160,27 @@ impl NullState { assert_eq!(data.len(), group_indices.len()); if self.all_seen && opt_filter.is_none() && values.null_count() == 0 { - initialize_builder(&mut self.seen_values, total_num_groups, true); group_indices .iter() .zip(data.iter()) .for_each(|(&group_index, new_value)| value_fn(group_index, new_value)); + self.seen_values_size = total_num_groups; + return; } + let prev_seen: bool = self.all_seen && self.seen_values_size > 0; self.all_seen = false; // ensure the seen_values is big enough (start everything at // "not seen" valid) let seen_values = initialize_builder(&mut self.seen_values, total_num_groups, false); + if prev_seen { + // restore previous seen values + for i in 0..self.seen_values_size { + seen_values.set_bit(i, true); + } + } // These could be made more performant by iterating in chunks of 64 bits at a time match (values.null_count() > 0, opt_filter) { @@ -229,7 +248,7 @@ impl NullState { /// for the `emit_to` rows. /// /// resets the internal state appropriately - pub fn build(&mut self, emit_to: EmitTo) -> NullBuffer { + pub fn build(&mut self, emit_to: EmitTo) -> Option { let nulls: BooleanBuffer = self.seen_values.finish(); let nulls = match emit_to { @@ -246,7 +265,11 @@ impl NullState { first_n_null } }; - NullBuffer::new(nulls) + if self.all_seen { + None + } else { + Some(NullBuffer::new(nulls)) + } } } @@ -856,14 +879,19 @@ mod test { "\n\naccumulated_values:{accumulated_values:#?}\n\nexpected_values:{expected_values:#?}" ); let seen_values = null_state.seen_values.finish_cloned(); - mock.validate_seen_values(&seen_values); + if null_state.all_seen { + assert_eq!(null_state.seen_values_size, total_num_groups); + } else { + mock.validate_seen_values(&seen_values); + } // Validate the final buffer (one value per group) let expected_null_buffer = mock.expected_null_buffer(total_num_groups); let null_buffer = null_state.build(EmitTo::All); - - assert_eq!(null_buffer, expected_null_buffer); + if let Some(nulls) = &null_buffer { + assert_eq!(*nulls, expected_null_buffer); + } } // Calls `accumulate_indices` @@ -980,11 +1008,13 @@ mod test { mock.validate_seen_values(&seen_values); // Validate the final buffer (one value per group) - let expected_null_buffer = mock.expected_null_buffer(total_num_groups); + let expected_null_buffer = Some(mock.expected_null_buffer(total_num_groups)); let null_buffer = null_state.build(EmitTo::All); - assert_eq!(null_buffer, expected_null_buffer); + if !null_state.all_seen { + assert_eq!(null_buffer, expected_null_buffer); + } } } diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs index 149312e5a9c0f..f716b48f0cccc 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs @@ -120,7 +120,7 @@ where }; let nulls = self.null_state.build(emit_to); - let values = BooleanArray::new(values, Some(nulls)); + let values = BooleanArray::new(values, nulls); Ok(Arc::new(values)) } diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs index 34c4766c86802..acf875b686139 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs @@ -118,7 +118,7 @@ where fn evaluate(&mut self, emit_to: EmitTo) -> Result { let values = emit_to.take_needed(&mut self.values); let nulls = self.null_state.build(emit_to); - let values = PrimitiveArray::::new(values.into(), Some(nulls)) // no copy + let values = PrimitiveArray::::new(values.into(), nulls) // no copy .with_data_type(self.data_type.clone()); Ok(Arc::new(values)) } From 07777940d951be2861ccfab315d40757b0e3c4d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 3 Jan 2026 20:12:47 +0100 Subject: [PATCH 04/18] Speedup accumulators --- datafusion-examples/examples/udf/advanced_udaf.rs | 11 +++++++---- datafusion/functions-aggregate/src/average.rs | 9 +++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/datafusion-examples/examples/udf/advanced_udaf.rs b/datafusion-examples/examples/udf/advanced_udaf.rs index fbb9e652486ce..03ba6f05bee19 100644 --- a/datafusion-examples/examples/udf/advanced_udaf.rs +++ b/datafusion-examples/examples/udf/advanced_udaf.rs @@ -314,12 +314,16 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator { let prods = emit_to.take_needed(&mut self.prods); let nulls = self.null_state.build(emit_to); - assert_eq!(nulls.len(), prods.len()); + if let Some(nulls) = &nulls { + assert_eq!(nulls.len(), counts.len()); + } assert_eq!(counts.len(), prods.len()); // don't evaluate geometric mean with null inputs to avoid errors on null values - let array: PrimitiveArray = if nulls.null_count() > 0 { + let array: PrimitiveArray = if let Some(nulls) = &nulls + && nulls.null_count() > 0 + { let mut builder = PrimitiveBuilder::::with_capacity(nulls.len()); let iter = prods.into_iter().zip(counts).zip(nulls.iter()); @@ -337,7 +341,7 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator { .zip(counts) .map(|(prod, count)| prod.powf(1.0 / count as f64)) .collect::>(); - PrimitiveArray::new(geo_mean.into(), Some(nulls)) // no copy + PrimitiveArray::new(geo_mean.into(), nulls) // no copy .with_data_type(self.return_data_type.clone()) }; @@ -347,7 +351,6 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator { // return arrays for counts and prods fn state(&mut self, emit_to: EmitTo) -> Result> { let nulls = self.null_state.build(emit_to); - let nulls = Some(nulls); let counts = emit_to.take_needed(&mut self.counts); let counts = UInt32Array::new(counts.into(), nulls.clone()); // zero copy diff --git a/datafusion/functions-aggregate/src/average.rs b/datafusion/functions-aggregate/src/average.rs index 2efa305b72703..f40b4c6e7acc9 100644 --- a/datafusion/functions-aggregate/src/average.rs +++ b/datafusion/functions-aggregate/src/average.rs @@ -837,12 +837,14 @@ where let sums = emit_to.take_needed(&mut self.sums); let nulls = self.null_state.build(emit_to); - assert_eq!(nulls.len(), sums.len()); + if let Some(nulls) = &nulls { + assert_eq!(nulls.len(), sums.len()); + } assert_eq!(counts.len(), sums.len()); // don't evaluate averages with null inputs to avoid errors on null values - let array: PrimitiveArray = if nulls.null_count() > 0 { + let array: PrimitiveArray = if let Some(nulls) = &nulls && nulls.null_count() > 0 { let mut builder = PrimitiveBuilder::::with_capacity(nulls.len()) .with_data_type(self.return_data_type.clone()); let iter = sums.into_iter().zip(counts).zip(nulls.iter()); @@ -861,7 +863,7 @@ where .zip(counts.into_iter()) .map(|(sum, count)| (self.avg_fn)(sum, count)) .collect::>>()?; - PrimitiveArray::new(averages.into(), Some(nulls)) // no copy + PrimitiveArray::new(averages.into(), nulls) // no copy .with_data_type(self.return_data_type.clone()) }; @@ -871,7 +873,6 @@ where // return arrays for sums and counts fn state(&mut self, emit_to: EmitTo) -> Result> { let nulls = self.null_state.build(emit_to); - let nulls = Some(nulls); let counts = emit_to.take_needed(&mut self.counts); let counts = UInt64Array::new(counts.into(), nulls.clone()); // zero copy From bdeda6afaca5b6cbe04b2a0c0762df8cb128c3d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 3 Jan 2026 20:13:08 +0100 Subject: [PATCH 05/18] Speedup accumulators --- datafusion/functions-aggregate/src/average.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate/src/average.rs b/datafusion/functions-aggregate/src/average.rs index f40b4c6e7acc9..543116db1ddb6 100644 --- a/datafusion/functions-aggregate/src/average.rs +++ b/datafusion/functions-aggregate/src/average.rs @@ -838,13 +838,15 @@ where let nulls = self.null_state.build(emit_to); if let Some(nulls) = &nulls { - assert_eq!(nulls.len(), sums.len()); + assert_eq!(nulls.len(), sums.len()); } assert_eq!(counts.len(), sums.len()); // don't evaluate averages with null inputs to avoid errors on null values - let array: PrimitiveArray = if let Some(nulls) = &nulls && nulls.null_count() > 0 { + let array: PrimitiveArray = if let Some(nulls) = &nulls + && nulls.null_count() > 0 + { let mut builder = PrimitiveBuilder::::with_capacity(nulls.len()) .with_data_type(self.return_data_type.clone()); let iter = sums.into_iter().zip(counts).zip(nulls.iter()); From 5332b17b703910b1231db865531000542b7f8559 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 3 Jan 2026 21:24:55 +0100 Subject: [PATCH 06/18] Fix --- .../src/aggregate/groups_accumulator/accumulate.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index 9f97ca74ce38b..4d34bb0d1c2d3 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -253,7 +253,6 @@ impl NullState { let nulls = match emit_to { EmitTo::All => { - self.all_seen = true; nulls } EmitTo::First(n) => { From 05414e850043f94722d1478553f998abba53f9fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 3 Jan 2026 21:33:12 +0100 Subject: [PATCH 07/18] Speedup accumulators --- .../groups_accumulator/accumulate.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index 4d34bb0d1c2d3..88a2cc02a2fa6 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -250,12 +250,21 @@ impl NullState { /// resets the internal state appropriately pub fn build(&mut self, emit_to: EmitTo) -> Option { let nulls: BooleanBuffer = self.seen_values.finish(); - let nulls = match emit_to { EmitTo::All => { - nulls + self.seen_values_size = 0; + if self.all_seen { + // all groups have seen at least one non null value + return None; + } else { + nulls + } } EmitTo::First(n) => { + if self.all_seen { + self.seen_values_size -= n; + return None; + } // split off the first N values in seen_values let first_n_null: BooleanBuffer = nulls.slice(0, n); // reset the existing seen buffer @@ -264,11 +273,7 @@ impl NullState { first_n_null } }; - if self.all_seen { - None - } else { - Some(NullBuffer::new(nulls)) - } + Some(NullBuffer::new(nulls)) } } From 2e39af21f47a6a1f1ea043559c569d06087d047d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 7 Jan 2026 16:33:55 +0100 Subject: [PATCH 08/18] Use enum --- .../groups_accumulator/accumulate.rs | 229 ++++++++++-------- 1 file changed, 134 insertions(+), 95 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index 88a2cc02a2fa6..02ec243cf4164 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -24,6 +24,59 @@ use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::datatypes::ArrowPrimitiveType; use datafusion_expr_common::groups_accumulator::EmitTo; + +/// +/// If the input has nulls, then the accumulator must potentially +/// handle each input null value specially (e.g. for `SUM` to mark the +/// corresponding sum as null) +/// +/// If there are filters present, `NullState` tracks if it has seen +/// *any* value for that group (as some values may be filtered +/// out). Without a filter, the accumulator is only passed groups that +/// had at least one value to accumulate so they do not need to track +/// if they have seen values for a particular group. +/// +/// [`GroupsAccumulator`]: datafusion_expr_common::groups_accumulator::GroupsAccumulator +#[derive(Debug)] +pub enum SeenValues { + /// All groups seen so far have seen at least one non-null value + All { num_values: usize }, + // some groups have not yet seen a non-null value + Some { values: BooleanBufferBuilder }, +} + +impl SeenValues { + /// Return a mutable reference to the `BooleanBufferBuilder` in `SeenValues::Some`. + /// + /// If `self` is `SeenValues::All`, it is transitioned to `SeenValues::Some` + /// by creating a new `BooleanBufferBuilder` where the first `num_values` are true. + /// + /// The builder is then ensured to have at least `total_num_groups` length, + /// with any new entries initialized to false. + fn get_builder(&mut self, total_num_groups: usize) -> &mut BooleanBufferBuilder { + match self { + SeenValues::All { num_values } => { + let mut builder = BooleanBufferBuilder::new(total_num_groups); + builder.append_n(*num_values, true); + if total_num_groups > *num_values { + builder.append_n(total_num_groups - *num_values, false); + } + *self = SeenValues::Some { values: builder }; + match self { + SeenValues::Some { values } => values, + _ => unreachable!(), + } + } + SeenValues::Some { values } => { + if values.len() < total_num_groups { + values.append_n(total_num_groups - values.len(), false); + } + values + } + } + } +} + /// Track the accumulator null state per row: if any values for that /// group were null and if any values have been seen at all for that group. /// @@ -53,16 +106,12 @@ use datafusion_expr_common::groups_accumulator::EmitTo; pub struct NullState { /// Have we seen any non-filtered input values for `group_index`? /// - /// If `seen_values[i]` is true, have seen at least one non null + /// If `seen_values` is `SeenValues::Some(buffer)` and buffer[i] is true, have seen at least one non null /// value for group `i` /// - /// If `seen_values[i]` is false, have not seen any values that + /// If `seen_values` is `SeenValues::Some(buffer)` and buffer[i] is false, have not seen any values that /// pass the filter yet for group `i` - seen_values: BooleanBufferBuilder, - /// If true, all groups seen so far have seen at least one non-null value - /// and no filters have been applied. - all_seen: bool, - seen_values_size: usize, + seen_values: SeenValues, } impl Default for NullState { @@ -74,16 +123,16 @@ impl Default for NullState { impl NullState { pub fn new() -> Self { Self { - seen_values: BooleanBufferBuilder::new(0), - all_seen: true, - seen_values_size: 0, + seen_values: SeenValues::All { num_values: 0 }, } } /// return the size of all buffers allocated by this null state, not including self pub fn size(&self) -> usize { - // capacity is in bits, so convert to bytes - self.seen_values.capacity() / 8 + match &self.seen_values { + SeenValues::All { .. } => 0, + SeenValues::Some { values } => values.capacity() / 8, + } } /// Invokes `value_fn(group_index, value)` for each non null, non @@ -113,27 +162,19 @@ impl NullState { T: ArrowPrimitiveType + Send, F: FnMut(usize, T::Native) + Send, { - if self.all_seen && opt_filter.is_none() && values.null_count() == 0 { - accumulate(group_indices, values, None, value_fn); - self.seen_values_size = total_num_groups; - } else { - let prev_seen: bool = self.all_seen && self.seen_values_size > 0; - - self.all_seen = false; - let seen_values = - initialize_builder(&mut self.seen_values, total_num_groups, false); - - if prev_seen { - // restore previous seen values - for i in 0..self.seen_values_size { - seen_values.set_bit(i, true); - } + if let SeenValues::All { num_values } = &mut self.seen_values { + if opt_filter.is_none() && values.null_count() == 0 { + accumulate(group_indices, values, None, value_fn); + *num_values = total_num_groups; + return; } - accumulate(group_indices, values, opt_filter, |group_index, value| { - seen_values.set_bit(group_index, true); - value_fn(group_index, value); - }); } + + let seen_values = self.seen_values.get_builder(total_num_groups); + accumulate(group_indices, values, opt_filter, |group_index, value| { + seen_values.set_bit(group_index, true); + value_fn(group_index, value); + }); } /// Invokes `value_fn(group_index, value)` for each non null, non @@ -159,29 +200,20 @@ impl NullState { let data = values.values(); assert_eq!(data.len(), group_indices.len()); - if self.all_seen && opt_filter.is_none() && values.null_count() == 0 { - group_indices - .iter() - .zip(data.iter()) - .for_each(|(&group_index, new_value)| value_fn(group_index, new_value)); - self.seen_values_size = total_num_groups; + if let SeenValues::All { num_values } = &mut self.seen_values { + if opt_filter.is_none() && values.null_count() == 0 { + group_indices + .iter() + .zip(data.iter()) + .for_each(|(&group_index, new_value)| value_fn(group_index, new_value)); + *num_values = total_num_groups; - return; - } - let prev_seen: bool = self.all_seen && self.seen_values_size > 0; - - self.all_seen = false; - // ensure the seen_values is big enough (start everything at - // "not seen" valid) - let seen_values = - initialize_builder(&mut self.seen_values, total_num_groups, false); - if prev_seen { - // restore previous seen values - for i in 0..self.seen_values_size { - seen_values.set_bit(i, true); + return; } } + let seen_values = self.seen_values.get_builder(total_num_groups); + // These could be made more performant by iterating in chunks of 64 bits at a time match (values.null_count() > 0, opt_filter) { // no nulls, no filter, @@ -249,31 +281,42 @@ impl NullState { /// /// resets the internal state appropriately pub fn build(&mut self, emit_to: EmitTo) -> Option { - let nulls: BooleanBuffer = self.seen_values.finish(); - let nulls = match emit_to { + match emit_to { EmitTo::All => { - self.seen_values_size = 0; - if self.all_seen { - // all groups have seen at least one non null value - return None; - } else { - nulls + let old_seen = std::mem::replace( + &mut self.seen_values, + SeenValues::All { num_values: 0 }, + ); + match old_seen { + SeenValues::All { .. } => None, + SeenValues::Some { mut values } => { + Some(NullBuffer::new(values.finish())) + } } } - EmitTo::First(n) => { - if self.all_seen { - self.seen_values_size -= n; - return None; + EmitTo::First(n) => match &mut self.seen_values { + SeenValues::All { num_values } => { + *num_values = num_values.saturating_sub(n); + None } - // split off the first N values in seen_values - let first_n_null: BooleanBuffer = nulls.slice(0, n); - // reset the existing seen buffer - self.seen_values - .append_buffer(&nulls.slice(n, nulls.len() - n)); - first_n_null - } - }; - Some(NullBuffer::new(nulls)) + SeenValues::Some { .. } => { + let mut old_values = match std::mem::replace( + &mut self.seen_values, + SeenValues::All { num_values: 0 }, + ) { + SeenValues::Some { values } => values, + _ => unreachable!(), + }; + let nulls = old_values.finish(); + let first_n_null = nulls.slice(0, n); + let remainder = nulls.slice(n, nulls.len() - n); + let mut new_builder = BooleanBufferBuilder::new(remainder.len()); + new_builder.append_buffer(&remainder); + self.seen_values = SeenValues::Some { values: new_builder }; + Some(NullBuffer::new(first_n_null)) + } + }, + } } } @@ -621,22 +664,6 @@ pub fn accumulate_indices( } } -/// Ensures that `builder` contains a `BooleanBufferBuilder with at -/// least `total_num_groups`. -/// -/// All new entries are initialized to `default_value` -fn initialize_builder( - builder: &mut BooleanBufferBuilder, - total_num_groups: usize, - default_value: bool, -) -> &mut BooleanBufferBuilder { - if builder.len() < total_num_groups { - let new_groups = total_num_groups - builder.len(); - builder.append_n(new_groups, default_value); - } - builder -} - #[cfg(test)] mod test { use super::*; @@ -882,11 +909,15 @@ mod test { accumulated_values, expected_values, "\n\naccumulated_values:{accumulated_values:#?}\n\nexpected_values:{expected_values:#?}" ); - let seen_values = null_state.seen_values.finish_cloned(); - if null_state.all_seen { - assert_eq!(null_state.seen_values_size, total_num_groups); - } else { - mock.validate_seen_values(&seen_values); + + match &null_state.seen_values { + SeenValues::All { num_values } => { + assert_eq!(*num_values, total_num_groups); + } + SeenValues::Some { values } => { + let seen_values = values.finish_cloned(); + mock.validate_seen_values(&seen_values); + } } // Validate the final buffer (one value per group) @@ -1008,15 +1039,23 @@ mod test { "\n\naccumulated_values:{accumulated_values:#?}\n\nexpected_values:{expected_values:#?}" ); - let seen_values = null_state.seen_values.finish_cloned(); - mock.validate_seen_values(&seen_values); + match &null_state.seen_values { + SeenValues::All { num_values } => { + assert_eq!(*num_values, total_num_groups); + } + SeenValues::Some { values } => { + let seen_values = values.finish_cloned(); + mock.validate_seen_values(&seen_values); + } + } // Validate the final buffer (one value per group) let expected_null_buffer = Some(mock.expected_null_buffer(total_num_groups)); + let is_all_seen = matches!(null_state.seen_values, SeenValues::All { .. }); let null_buffer = null_state.build(EmitTo::All); - if !null_state.all_seen { + if !is_all_seen { assert_eq!(null_buffer, expected_null_buffer); } } From b00e8d88ff07154028bc94f0eaa382de028a9480 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 7 Jan 2026 16:34:38 +0100 Subject: [PATCH 09/18] fmt --- .../groups_accumulator/accumulate.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index 02ec243cf4164..6cdaafe49a593 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -40,9 +40,13 @@ use datafusion_expr_common::groups_accumulator::EmitTo; #[derive(Debug)] pub enum SeenValues { /// All groups seen so far have seen at least one non-null value - All { num_values: usize }, + All { + num_values: usize, + }, // some groups have not yet seen a non-null value - Some { values: BooleanBufferBuilder }, + Some { + values: BooleanBufferBuilder, + }, } impl SeenValues { @@ -202,10 +206,9 @@ impl NullState { if let SeenValues::All { num_values } = &mut self.seen_values { if opt_filter.is_none() && values.null_count() == 0 { - group_indices - .iter() - .zip(data.iter()) - .for_each(|(&group_index, new_value)| value_fn(group_index, new_value)); + group_indices.iter().zip(data.iter()).for_each( + |(&group_index, new_value)| value_fn(group_index, new_value), + ); *num_values = total_num_groups; return; @@ -312,7 +315,9 @@ impl NullState { let remainder = nulls.slice(n, nulls.len() - n); let mut new_builder = BooleanBufferBuilder::new(remainder.len()); new_builder.append_buffer(&remainder); - self.seen_values = SeenValues::Some { values: new_builder }; + self.seen_values = SeenValues::Some { + values: new_builder, + }; Some(NullBuffer::new(first_n_null)) } }, From 68a498d75218d7e46504b162dc6bf6baaaf469ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 7 Jan 2026 19:20:34 +0100 Subject: [PATCH 10/18] fmt --- .../groups_accumulator/accumulate.rs | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index 6cdaafe49a593..b18c848324c59 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -20,7 +20,7 @@ //! [`GroupsAccumulator`]: datafusion_expr_common::groups_accumulator::GroupsAccumulator use arrow::array::{Array, BooleanArray, BooleanBufferBuilder, PrimitiveArray}; -use arrow::buffer::{BooleanBuffer, NullBuffer}; +use arrow::buffer::NullBuffer; use arrow::datatypes::ArrowPrimitiveType; use datafusion_expr_common::groups_accumulator::EmitTo; @@ -166,12 +166,13 @@ impl NullState { T: ArrowPrimitiveType + Send, F: FnMut(usize, T::Native) + Send, { - if let SeenValues::All { num_values } = &mut self.seen_values { - if opt_filter.is_none() && values.null_count() == 0 { - accumulate(group_indices, values, None, value_fn); - *num_values = total_num_groups; - return; - } + if let SeenValues::All { num_values } = &mut self.seen_values + && opt_filter.is_none() + && values.null_count() == 0 + { + accumulate(group_indices, values, None, value_fn); + *num_values = total_num_groups; + return; } let seen_values = self.seen_values.get_builder(total_num_groups); @@ -204,15 +205,17 @@ impl NullState { let data = values.values(); assert_eq!(data.len(), group_indices.len()); - if let SeenValues::All { num_values } = &mut self.seen_values { - if opt_filter.is_none() && values.null_count() == 0 { - group_indices.iter().zip(data.iter()).for_each( - |(&group_index, new_value)| value_fn(group_index, new_value), - ); - *num_values = total_num_groups; + if let SeenValues::All { num_values } = &mut self.seen_values + && opt_filter.is_none() + && values.null_count() == 0 + { + group_indices + .iter() + .zip(data.iter()) + .for_each(|(&group_index, new_value)| value_fn(group_index, new_value)); + *num_values = total_num_groups; - return; - } + return; } let seen_values = self.seen_values.get_builder(total_num_groups); @@ -673,7 +676,10 @@ pub fn accumulate_indices( mod test { use super::*; - use arrow::array::{Int32Array, UInt32Array}; + use arrow::{ + array::{Int32Array, UInt32Array}, + buffer::BooleanBuffer, + }; use rand::{Rng, rngs::ThreadRng}; use std::collections::HashSet; From da2d7de9ff4ec2301f046cf341f2d1bbbc68a6e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 7 Jan 2026 19:22:22 +0100 Subject: [PATCH 11/18] fmt --- .../src/aggregate/groups_accumulator/accumulate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index b18c848324c59..327155c35b9fe 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -43,7 +43,7 @@ pub enum SeenValues { All { num_values: usize, }, - // some groups have not yet seen a non-null value + // Some groups have not yet seen a non-null value Some { values: BooleanBufferBuilder, }, From 2c6100241cb7481724075f7b9bed0796a4466cba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 7 Jan 2026 19:23:36 +0100 Subject: [PATCH 12/18] fmt --- .../src/aggregate/groups_accumulator/accumulate.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index 327155c35b9fe..8b6def2aed821 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -25,7 +25,6 @@ use arrow::datatypes::ArrowPrimitiveType; use datafusion_expr_common::groups_accumulator::EmitTo; -/// /// If the input has nulls, then the accumulator must potentially /// handle each input null value specially (e.g. for `SUM` to mark the /// corresponding sum as null) @@ -35,8 +34,6 @@ use datafusion_expr_common::groups_accumulator::EmitTo; /// out). Without a filter, the accumulator is only passed groups that /// had at least one value to accumulate so they do not need to track /// if they have seen values for a particular group. -/// -/// [`GroupsAccumulator`]: datafusion_expr_common::groups_accumulator::GroupsAccumulator #[derive(Debug)] pub enum SeenValues { /// All groups seen so far have seen at least one non-null value From f646fe826afb8734e98fe6adea0a4ad506ec1c1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 7 Jan 2026 19:24:13 +0100 Subject: [PATCH 13/18] comments --- .../src/aggregate/groups_accumulator/accumulate.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index 8b6def2aed821..876042a59bacb 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -112,6 +112,8 @@ pub struct NullState { /// /// If `seen_values` is `SeenValues::Some(buffer)` and buffer[i] is false, have not seen any values that /// pass the filter yet for group `i` + /// + /// If `seen_values` is `SeenValues::All`, all groups have seen at least one non null value seen_values: SeenValues, } From 47c46e786ae1af18a5ab643cd481bb4d74f5e291 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 7 Jan 2026 20:43:14 +0100 Subject: [PATCH 14/18] lint --- .../src/aggregate/groups_accumulator/accumulate.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index 876042a59bacb..d362ca296ba0d 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -107,12 +107,12 @@ impl SeenValues { pub struct NullState { /// Have we seen any non-filtered input values for `group_index`? /// - /// If `seen_values` is `SeenValues::Some(buffer)` and buffer[i] is true, have seen at least one non null + /// If `seen_values` is `SeenValues::Some(buffer)` and buffer\[i\] is true, have seen at least one non null /// value for group `i` /// - /// If `seen_values` is `SeenValues::Some(buffer)` and buffer[i] is false, have not seen any values that + /// If `seen_values` is `SeenValues::Some(buffer)` and buffer\[i\] is false, have not seen any values that /// pass the filter yet for group `i` - /// + /// /// If `seen_values` is `SeenValues::All`, all groups have seen at least one non null value seen_values: SeenValues, } From 4692d8d3be5be4158e02bff6f608d4fb452fe03c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 9 Jan 2026 08:58:27 +0100 Subject: [PATCH 15/18] Update datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs Co-authored-by: Andrew Lamb --- .../src/aggregate/groups_accumulator/accumulate.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index d362ca296ba0d..e95bd14920580 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -204,6 +204,7 @@ impl NullState { let data = values.values(); assert_eq!(data.len(), group_indices.len()); + // skip null handling if no nulls in input or accumulator if let SeenValues::All { num_values } = &mut self.seen_values && opt_filter.is_none() && values.null_count() == 0 From 5e0562ccd1707a5ddaf477a67580447d2b4bd441 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 9 Jan 2026 08:58:38 +0100 Subject: [PATCH 16/18] Update datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs Co-authored-by: Andrew Lamb --- .../src/aggregate/groups_accumulator/accumulate.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index e95bd14920580..34361fa9ebead 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -165,6 +165,7 @@ impl NullState { T: ArrowPrimitiveType + Send, F: FnMut(usize, T::Native) + Send, { + // skip null handling if no nulls in input or accumulator if let SeenValues::All { num_values } = &mut self.seen_values && opt_filter.is_none() && values.null_count() == 0 From d96c80a25f386daff8c6d4f3d25b1a8310f9bbbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 9 Jan 2026 10:33:26 +0100 Subject: [PATCH 17/18] Simplify --- .../src/aggregate/groups_accumulator/accumulate.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index d362ca296ba0d..9989ab8fb089d 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -46,6 +46,12 @@ pub enum SeenValues { }, } +impl Default for SeenValues { + fn default() -> Self { + SeenValues::All { num_values: 0 } + } +} + impl SeenValues { /// Return a mutable reference to the `BooleanBufferBuilder` in `SeenValues::Some`. /// @@ -288,9 +294,8 @@ impl NullState { pub fn build(&mut self, emit_to: EmitTo) -> Option { match emit_to { EmitTo::All => { - let old_seen = std::mem::replace( + let old_seen = std::mem::take( &mut self.seen_values, - SeenValues::All { num_values: 0 }, ); match old_seen { SeenValues::All { .. } => None, @@ -305,9 +310,8 @@ impl NullState { None } SeenValues::Some { .. } => { - let mut old_values = match std::mem::replace( + let mut old_values = match std::mem::take( &mut self.seen_values, - SeenValues::All { num_values: 0 }, ) { SeenValues::Some { values } => values, _ => unreachable!(), From 24e8f525313c89925b68d94c350ad4bbd1b51b1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 9 Jan 2026 10:33:38 +0100 Subject: [PATCH 18/18] Simplify --- .../src/aggregate/groups_accumulator/accumulate.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index 9989ab8fb089d..fcf213652efae 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -294,9 +294,7 @@ impl NullState { pub fn build(&mut self, emit_to: EmitTo) -> Option { match emit_to { EmitTo::All => { - let old_seen = std::mem::take( - &mut self.seen_values, - ); + let old_seen = std::mem::take(&mut self.seen_values); match old_seen { SeenValues::All { .. } => None, SeenValues::Some { mut values } => { @@ -310,9 +308,7 @@ impl NullState { None } SeenValues::Some { .. } => { - let mut old_values = match std::mem::take( - &mut self.seen_values, - ) { + let mut old_values = match std::mem::take(&mut self.seen_values) { SeenValues::Some { values } => values, _ => unreachable!(), };