Skip to content

Commit c8678e5

Browse files
committed
refactor: Extract AccumulatorArgs construction into a separate method in AggregateFunctionExpr
1 parent c386ba0 commit c8678e5

File tree

1 file changed

+15
-41
lines changed

1 file changed

+15
-41
lines changed

datafusion/physical-expr/src/aggregate.rs

Lines changed: 15 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -403,24 +403,26 @@ impl AggregateFunctionExpr {
403403
Cow::Borrowed(&self.schema)
404404
}
405405
}
406-
407-
/// the accumulator used to accumulate values from the expressions.
408-
/// the accumulator expects the same number of arguments as `expressions` and must
409-
/// return states with the same description as `state_fields`
410-
// TODO: factor AccumulatorArgs construction into a private helper to avoid duplication
411-
pub fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
412-
let schema = self.args_schema();
413-
let acc_args = AccumulatorArgs {
406+
/// Construct AccumulatorArgs for this aggregate using a given schema slice.
407+
fn make_acc_args(&self, schema: &Schema) -> AccumulatorArgs<'_> {
408+
AccumulatorArgs {
414409
return_field: Arc::clone(&self.return_field),
415-
schema: schema.as_ref(),
410+
schema,
416411
ignore_nulls: self.ignore_nulls,
417412
order_bys: self.order_bys.as_ref(),
418413
is_distinct: self.is_distinct,
419414
name: &self.name,
420415
is_reversed: self.is_reversed,
421416
exprs: &self.args,
422-
};
417+
}
418+
}
423419

420+
/// the accumulator used to accumulate values from the expressions.
421+
/// the accumulator expects the same number of arguments as `expressions` and must
422+
/// return states with the same description as `state_fields`
423+
pub fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
424+
let schema = self.args_schema();
425+
let acc_args = self.make_acc_args(schema.as_ref());
424426
self.fun.accumulator(acc_args)
425427
}
426428

@@ -495,17 +497,7 @@ impl AggregateFunctionExpr {
495497
/// Creates accumulator implementation that supports retract
496498
pub fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
497499
let schema = self.args_schema();
498-
let args = AccumulatorArgs {
499-
return_field: Arc::clone(&self.return_field),
500-
schema: schema.as_ref(),
501-
ignore_nulls: self.ignore_nulls,
502-
order_bys: self.order_bys.as_ref(),
503-
is_distinct: self.is_distinct,
504-
name: &self.name,
505-
is_reversed: self.is_reversed,
506-
exprs: &self.args,
507-
};
508-
500+
let args = self.make_acc_args(schema.as_ref());
509501
let accumulator = self.fun.create_sliding_accumulator(args)?;
510502

511503
// Accumulators that have window frame startings different
@@ -565,16 +557,7 @@ impl AggregateFunctionExpr {
565557
/// `[Self::create_groups_accumulator`] will be called.
566558
pub fn groups_accumulator_supported(&self) -> bool {
567559
let schema = self.args_schema();
568-
let args = AccumulatorArgs {
569-
return_field: Arc::clone(&self.return_field),
570-
schema: schema.as_ref(),
571-
ignore_nulls: self.ignore_nulls,
572-
order_bys: self.order_bys.as_ref(),
573-
is_distinct: self.is_distinct,
574-
name: &self.name,
575-
is_reversed: self.is_reversed,
576-
exprs: &self.args,
577-
};
560+
let args = self.make_acc_args(schema.as_ref());
578561
self.fun.groups_accumulator_supported(args)
579562
}
580563

@@ -585,16 +568,7 @@ impl AggregateFunctionExpr {
585568
/// implemented in addition to [`Accumulator`].
586569
pub fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
587570
let schema = self.args_schema();
588-
let args = AccumulatorArgs {
589-
return_field: Arc::clone(&self.return_field),
590-
schema: schema.as_ref(),
591-
ignore_nulls: self.ignore_nulls,
592-
order_bys: self.order_bys.as_ref(),
593-
is_distinct: self.is_distinct,
594-
name: &self.name,
595-
is_reversed: self.is_reversed,
596-
exprs: &self.args,
597-
};
571+
let args = self.make_acc_args(schema.as_ref());
598572
self.fun.create_groups_accumulator(args)
599573
}
600574

0 commit comments

Comments
 (0)