diff --git a/arkouda/pandas/extension/_arkouda_array.py b/arkouda/pandas/extension/_arkouda_array.py index 56eabafc226..6ea29a596c4 100644 --- a/arkouda/pandas/extension/_arkouda_array.py +++ b/arkouda/pandas/extension/_arkouda_array.py @@ -1,6 +1,8 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Sequence, TypeVar +import inspect + +from typing import TYPE_CHECKING, Any, Callable, Sequence, TypeVar from typing import cast as type_cast import numpy as np @@ -9,6 +11,7 @@ from pandas.api.extensions import ExtensionArray from arkouda.numpy.dtypes import dtype as ak_dtype +from arkouda.pandas.groupbyclass import GroupByReductionType from ._arkouda_extension_array import ArkoudaExtensionArray from ._dtypes import ( @@ -224,21 +227,95 @@ def equals(self, other): return False return self._data.equals(other._data) - def _reduce(self, name, skipna=True, **kwargs): - if name == "all": - return self._data.all() - elif name == "any": - return self._data.any() - elif name == "sum": - return self._data.sum() - elif name == "prod": - return self._data.prod() - elif name == "min": - return self._data.min() - elif name == "max": - return self._data.max() - else: - raise TypeError(f"'ArkoudaArray' with dtype arkouda does not support reduction '{name}'") + def _reduce( + self, + name: str | GroupByReductionType, + skipna: bool = True, + **kwargs: Any, + ) -> Any: + """ + Reduce the underlying data. + + Parameters + ---------- + name : str | GroupByReductionType + Reduction name, e.g. "sum", "mean", "nunique", ... + skipna : bool + If supported by the underlying implementation, skip NaN/NA values. + Default is True. + **kwargs : Any + Extra args for compatibility (e.g. ddof for var/std). + + Returns + ------- + Any + The reduction result. + + Raises + ------ + TypeError + If ``name`` is not a supported reduction or the underlying data does not + implement the requested reduction. + """ + # Normalize: accept Enum or str + if hasattr(name, "value"): # enum-like + name = name.value + if isinstance(name, tuple) and len(name) == 1: # guards against UNIQUE="unique", + name = name[0] + if not isinstance(name, str): + raise TypeError(f"Reduction name must be a string or GroupByReductionType, got {type(name)}") + + data = self._data + + def _call_method(method_name: str, *args: Any, **kw: Any) -> Any: + if not hasattr(data, method_name): + raise TypeError( + f"'ArkoudaArray' with dtype {self.dtype} does not support reduction '{name}' " + f"(missing method {method_name!r} on {type(data).__name__})" + ) + meth = getattr(data, method_name) + + # Best-effort: pass skipna/ddof/etc only if the method accepts them. + try: + sig = inspect.signature(meth) + except (TypeError, ValueError): + return meth(*args, **kw) + + params = sig.parameters + filtered: dict[str, Any] = {k: v for k, v in kw.items() if k in params} + return meth(*args, **filtered) + + reductions: dict[str, Callable[[], Any]] = { + "all": lambda: _call_method("all", skipna=skipna, **kwargs), + "any": lambda: _call_method("any", skipna=skipna, **kwargs), + "sum": lambda: _call_method("sum", skipna=skipna, **kwargs), + "prod": lambda: _call_method("prod", skipna=skipna, **kwargs), + "min": lambda: _call_method("min", skipna=skipna, **kwargs), + "max": lambda: _call_method("max", skipna=skipna, **kwargs), + "mean": lambda: _call_method("mean", skipna=skipna, **kwargs), + "median": lambda: _call_method("median", skipna=skipna, **kwargs), + "var": lambda: _call_method("var", skipna=skipna, **kwargs), + "std": lambda: _call_method("std", skipna=skipna, **kwargs), + "argmin": lambda: _call_method("argmin", skipna=skipna, **kwargs), + "argmax": lambda: _call_method("argmax", skipna=skipna, **kwargs), + "count": lambda: _call_method("count", **kwargs), + "nunique": lambda: _call_method("nunique", **kwargs), + "or": lambda: _call_method("or", skipna=skipna, **kwargs), + "and": lambda: _call_method("and", skipna=skipna, **kwargs), + "xor": lambda: _call_method("xor", skipna=skipna, **kwargs), + "first": lambda: _call_method("first", skipna=skipna, **kwargs), + "mode": lambda: _call_method("mode", skipna=skipna, **kwargs), + "unique": lambda: _call_method("unique", **kwargs), + } + + fn = reductions.get(name) + if fn is None: + raise TypeError( + f"'ArkoudaArray' with dtype {self.dtype} does not support reduction '{name}'. " + f"Supported: {sorted(reductions)}" + ) + + return fn() def __eq__(self, other): """ diff --git a/tests/pandas/extension/arkouda_array_extension.py b/tests/pandas/extension/arkouda_array_extension.py index 0de5df06d12..208c90c6929 100644 --- a/tests/pandas/extension/arkouda_array_extension.py +++ b/tests/pandas/extension/arkouda_array_extension.py @@ -1,3 +1,5 @@ +import inspect + import numpy as np import pandas as pd import pytest @@ -233,18 +235,130 @@ def test_argsort(self): sorted_vals = arr._data[perm] assert ak.is_sorted(sorted_vals) - @pytest.mark.parametrize("reduction", ["all", "any", "sum", "prod", "min", "max"]) - def test_reduce_ops(self, reduction): + @pytest.mark.parametrize( + "reduction", + [ + "all", + "any", + "sum", + "prod", + "min", + "max", + "mean", + "var", + "std", + "median", + "mode", + "unique", + "count", + "first", + "nunique", + ], + ) + def test_reduce_scalar_ops(self, reduction): + ak_data = ak.arange(10) + arr = ArkoudaArray(ak_data) + + if not hasattr(ak_data, reduction): + pytest.xfail(f"{reduction} not implemented on pdarray backend yet") + + result = arr._reduce(reduction) + + assert isinstance(result, numeric_and_bool_scalars) + + @pytest.mark.parametrize("reduction", ["argmin", "argmax"]) + def test_reduce_arg_ops(self, reduction): ak_data = ak.arange(10) arr = ArkoudaArray(ak_data) + + result = arr._reduce(reduction) + + assert isinstance(result, numeric_and_bool_scalars) + assert result >= 0 + + @pytest.mark.parametrize("reduction", ["or", "and", "xor"]) + def test_reduce_bitwise_ops(self, reduction): + ak_data = ak.array([True, False, True, False]) + arr = ArkoudaArray(ak_data) + + if not hasattr(ak_data, reduction): + pytest.xfail(f"{reduction} not implemented on pdarray backend yet") + + result = arr._reduce(reduction) + + assert isinstance(result, bool) + + @pytest.mark.parametrize("reduction", ["unique", "mode"]) + def test_reduce_array_ops(self, reduction): + ak_data = ak.array([1, 2, 2, 3, 3, 3]) + arr = ArkoudaArray(ak_data) + + if not hasattr(ak_data, reduction): + pytest.xfail(f"{reduction} not implemented on pdarray backend yet") + result = arr._reduce(reduction) + + assert isinstance(result, ArkoudaArray | ak.pdarray) + + @pytest.mark.parametrize( + "reduction", + ["sum", "mean", "min", "max", "var", "std"], + ) + def test_reduce_skipna_kwarg(self, reduction): + ak_data = ak.array([1.0, 2.0, ak.nan, 4.0]) + arr = ArkoudaArray(ak_data) + + result = arr._reduce(reduction, skipna=True) + assert isinstance(result, numeric_and_bool_scalars) + @pytest.mark.parametrize("reduction", ["var", "std"]) + def test_reduce_ddof_kwarg(self, reduction): + ak_data = ak.array([1.0, 2.0, 3.0, 4.0]) + arr = ArkoudaArray(ak_data) + + meth = getattr(ak_data, reduction, None) + if meth is None: + pytest.xfail(f"{reduction} not implemented on pdarray backend yet") + + try: + accepts_ddof = "ddof" in inspect.signature(meth).parameters + except (TypeError, ValueError): + # If we can't introspect, just do a smoke test + accepts_ddof = False + + if not accepts_ddof: + # ddof should be ignored by your filtering logic (no crash) + r = arr._reduce(reduction, ddof=1) + assert isinstance(r, numeric_and_bool_scalars) + return + + r0 = arr._reduce(reduction, ddof=0) + r1 = arr._reduce(reduction, ddof=1) + + # Compare to numpy for correctness + np_data = np.array([1.0, 2.0, 3.0, 4.0]) + expected0 = getattr(np_data, reduction)(ddof=0) + expected1 = getattr(np_data, reduction)(ddof=1) + + assert np.isclose(r0, expected0) + assert np.isclose(r1, expected1) + + @pytest.mark.parametrize("reduction", ["sum", "mean", "min", "max"]) + def test_reduce_ignores_unknown_kwargs(self, reduction): + ak_data = ak.arange(10) + arr = ArkoudaArray(ak_data) + + r1 = arr._reduce(reduction) + r2 = arr._reduce(reduction, totally_not_a_real_kwarg=123) + + assert r1 == r2 + def test_reduce_invalid(self): ak_data = ak.arange(10) arr = ArkoudaArray(ak_data) with pytest.raises(TypeError): - arr._reduce("mean") + arr._reduce("test") def test_concat_same_type(self): a1 = ArkoudaArray(ak.array([1, 2]))