Skip to content

Commit 0b97bd0

Browse files
committed
ensure validate after merge batch
1 parent 7648eb7 commit 0b97bd0

File tree

1 file changed

+15
-9
lines changed
  • datafusion/functions-aggregate/src

1 file changed

+15
-9
lines changed

datafusion/functions-aggregate/src/sum.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -599,15 +599,8 @@ impl<T: DecimalType + std::fmt::Debug> TrySumDecimalAccumulator<T> {
599599
inner: TrySumAccumulator::new(data_type),
600600
}
601601
}
602-
}
603-
604-
impl<T: DecimalType + std::fmt::Debug> Accumulator for TrySumDecimalAccumulator<T> {
605-
fn state(&mut self) -> Result<Vec<ScalarValue>> {
606-
self.inner.state()
607-
}
608602

609-
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
610-
self.inner.update_batch(values)?;
603+
fn validate_decimal(&mut self) {
611604
// Check decimal precision overflow
612605
let precision = match self.inner.data_type {
613606
DataType::Decimal32(precision, _)
@@ -621,11 +614,24 @@ impl<T: DecimalType + std::fmt::Debug> Accumulator for TrySumDecimalAccumulator<
621614
{
622615
self.inner.state = TrySumState::Overflow;
623616
}
617+
}
618+
}
619+
620+
impl<T: DecimalType + std::fmt::Debug> Accumulator for TrySumDecimalAccumulator<T> {
621+
fn state(&mut self) -> Result<Vec<ScalarValue>> {
622+
self.inner.state()
623+
}
624+
625+
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
626+
self.inner.update_batch(values)?;
627+
self.validate_decimal();
624628
Ok(())
625629
}
626630

627631
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
628-
self.inner.merge_batch(states)
632+
self.inner.merge_batch(states)?;
633+
self.validate_decimal();
634+
Ok(())
629635
}
630636

631637
fn evaluate(&mut self) -> Result<ScalarValue> {

0 commit comments

Comments
 (0)