Skip to content

Commit be88837

Browse files
authored
Fix Python UDAF Accumulator Interface example to Properly Handle State and Updates with List[Array] Types (#1192)
* Add test for user-defined aggregation function (UDAF) with DataFusion - Implement MyAccumulator class following Accumulator interface - Register UDAF named "my_accumulator" in SessionContext - Create test DataFrame and run SQL query using UDAF with GROUP BY - Verify results match expected aggregated values - Ensure correct integration and functionality of UDAF in Python bindings * fix: update UDAF implementation to use correct pyarrow compute functions * fix: correct comment capitalization and remove unused import in UDAF script * fix: update UDAF accumulator methods to handle state and summation correctly * Remove test script
1 parent 94687cd commit be88837

File tree

1 file changed

+4
-4
lines changed

1 file changed

+4
-4
lines changed

examples/sql-using-python-udaf.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,16 @@ class MyAccumulator(Accumulator):
2828
def __init__(self) -> None:
2929
self._sum = pa.scalar(0.0)
3030

31-
def update(self, values: pa.Array) -> None:
31+
def update(self, values: list[pa.Array]) -> None:
3232
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
3333
self._sum = pa.scalar(self._sum.as_py() + pa.compute.sum(values).as_py())
3434

3535
def merge(self, states: pa.Array) -> None:
3636
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
37-
self._sum = pa.scalar(self._sum.as_py() + pa.compute.sum(states).as_py())
37+
self._sum = pa.scalar(self._sum.as_py() + pa.compute.sum(states[0]).as_py())
3838

39-
def state(self) -> pa.Array:
40-
return pa.array([self._sum.as_py()])
39+
def state(self) -> list[pa.Array]:
40+
return [self._sum]
4141

4242
def evaluate(self) -> pa.Scalar:
4343
return self._sum

0 commit comments

Comments
 (0)