Skip to content

the python udaf example cannot print the result #1190

@l1t1

Description

@l1t1

Describe the bug
when I run it, it raise an error as following, both assert result_df.to_pydict()["a"] == [1, 3] and print(result_df) are faild

C:\d>python udaf.py
Traceback (most recent call last):
  File "C:\d\udaf.py", line 68, in <module>
    assert result_df.to_pydict()["a"] == [1, 3]
           ~~~~~~~~~~~~~~~~~~~^^
  File "C:\Users\lt\AppData\Local\Programs\Python\Python313\Lib\site-packages\datafusion\dataframe.py", line 1060, in to_pydict
    return self.df.to_pydict()
           ~~~~~~~~~~~~~~~~~^^
Exception: DataFusion error: Execution("ArrowInvalid: Could not convert <pyarrow.lib.DoubleArray object at 0x0000022AFC1A6620>\n[\n  9\n] with type pyarrow.lib.DoubleArray: did not recognize Python value type when inferring an Arrow data type")

C:\d>python udaf.py
Traceback (most recent call last):
  File "C:\d\udaf.py", line 70, in <module>
    print(result_df)
    ~~~~~^^^^^^^^^^^
  File "C:\Users\lt\AppData\Local\Programs\Python\Python313\Lib\site-packages\datafusion\dataframe.py", line 324, in __repr__
    return self.df.__repr__()
           ~~~~~~~~~~~~~~~~^^
Exception: DataFusion error: Execution("ArrowInvalid: Could not convert <pyarrow.lib.DoubleArray object at 0x0000021F129D6620>\n[\n  9\n] with type pyarrow.lib.DoubleArray: did not recognize Python value type when inferring an Arrow data type")

To Reproduce
edit the udaf.py as following (copied from the source code tree)

import pyarrow as pa
from datafusion import Accumulator, SessionContext, udaf


# Define a user-defined aggregation function (UDAF)
class MyAccumulator(Accumulator):
    """
    Interface of a user-defined accumulation.
    """

    def __init__(self) -> None:
        self._sum = pa.scalar(0.0)

    def update(self, values: pa.Array) -> None:
        # not nice since pyarrow scalars can't be summed yet. This breaks on `None`
        self._sum = pa.scalar(self._sum.as_py() + pa.compute.sum(values).as_py())

    def merge(self, states: pa.Array) -> None:
        # not nice since pyarrow scalars can't be summed yet. This breaks on `None`
        self._sum = pa.scalar(self._sum.as_py() + pa.compute.sum(states).as_py())

    def state(self) -> pa.Array:
        return pa.array([self._sum.as_py()])

    def evaluate(self) -> pa.Scalar:
        return self._sum


my_udaf = udaf(
    MyAccumulator,
    pa.float64(),
    pa.float64(),
    [pa.float64()],
    "stable",
    # This will be the name of the UDAF in SQL
    # If not specified it will by default the same as accumulator class name
    name="my_accumulator",
)

# Create a context
ctx = SessionContext()

# Create a datafusion DataFrame from a Python dictionary
source_df = ctx.from_pydict({"a": [1, 1, 3], "b": [4, 5, 6]}, name="t")
# Dataframe:
# +---+---+
# | a | b |
# +---+---+
# | 1 | 4 |
# | 1 | 5 |
# | 3 | 6 |
# +---+---+

# Register UDF for use in SQL
ctx.register_udaf(my_udaf)

# Query the DataFrame using SQL
result_df = ctx.sql(
    "select a, my_accumulator(b) as b_aggregated from t group by a order by a"
)
# Dataframe:
# +---+--------------+
# | a | b_aggregated |
# +---+--------------+
# | 1 | 9            |
# | 3 | 6            |
# +---+--------------+
#assert result_df.to_pydict()["a"] == [1, 3]
#assert result_df.to_pydict()["b_aggregated"] == [9, 6]
print(result_df)

Expected behavior

it prints

# +---+--------------+
# | a | b_aggregated |
# +---+--------------+
# | 1 | 9            |
# | 3 | 6            |
# +---+--------------+

Additional context
Add any other context about the problem here.

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions