Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 93 additions & 16 deletions arkouda/pandas/extension/_arkouda_array.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Sequence, TypeVar
import inspect

from typing import TYPE_CHECKING, Any, Callable, Sequence, TypeVar
from typing import cast as type_cast

import numpy as np
Expand All @@ -9,6 +11,7 @@
from pandas.api.extensions import ExtensionArray

from arkouda.numpy.dtypes import dtype as ak_dtype
from arkouda.pandas.groupbyclass import GroupByReductionType

from ._arkouda_extension_array import ArkoudaExtensionArray
from ._dtypes import (
Expand Down Expand Up @@ -224,21 +227,95 @@ def equals(self, other):
return False
return self._data.equals(other._data)

def _reduce(self, name, skipna=True, **kwargs):
if name == "all":
return self._data.all()
elif name == "any":
return self._data.any()
elif name == "sum":
return self._data.sum()
elif name == "prod":
return self._data.prod()
elif name == "min":
return self._data.min()
elif name == "max":
return self._data.max()
else:
raise TypeError(f"'ArkoudaArray' with dtype arkouda does not support reduction '{name}'")
def _reduce(
self,
name: str | GroupByReductionType,
skipna: bool = True,
**kwargs: Any,
) -> Any:
"""
Reduce the underlying data.

Parameters
----------
name : str | GroupByReductionType
Reduction name, e.g. "sum", "mean", "nunique", ...
skipna : bool
If supported by the underlying implementation, skip NaN/NA values.
Default is True.
**kwargs : Any
Extra args for compatibility (e.g. ddof for var/std).

Returns
-------
Any
The reduction result.

Raises
------
TypeError
If ``name`` is not a supported reduction or the underlying data does not
implement the requested reduction.
"""
# Normalize: accept Enum or str
if hasattr(name, "value"): # enum-like
name = name.value
if isinstance(name, tuple) and len(name) == 1: # guards against UNIQUE="unique",
name = name[0]
if not isinstance(name, str):
raise TypeError(f"Reduction name must be a string or GroupByReductionType, got {type(name)}")

data = self._data

def _call_method(method_name: str, *args: Any, **kw: Any) -> Any:
if not hasattr(data, method_name):
raise TypeError(
f"'ArkoudaArray' with dtype {self.dtype} does not support reduction '{name}' "
f"(missing method {method_name!r} on {type(data).__name__})"
)
meth = getattr(data, method_name)

# Best-effort: pass skipna/ddof/etc only if the method accepts them.
try:
sig = inspect.signature(meth)
except (TypeError, ValueError):
return meth(*args, **kw)

params = sig.parameters
filtered: dict[str, Any] = {k: v for k, v in kw.items() if k in params}
return meth(*args, **filtered)

reductions: dict[str, Callable[[], Any]] = {
"all": lambda: _call_method("all", skipna=skipna, **kwargs),
"any": lambda: _call_method("any", skipna=skipna, **kwargs),
"sum": lambda: _call_method("sum", skipna=skipna, **kwargs),
"prod": lambda: _call_method("prod", skipna=skipna, **kwargs),
"min": lambda: _call_method("min", skipna=skipna, **kwargs),
"max": lambda: _call_method("max", skipna=skipna, **kwargs),
"mean": lambda: _call_method("mean", skipna=skipna, **kwargs),
"median": lambda: _call_method("median", skipna=skipna, **kwargs),
"var": lambda: _call_method("var", skipna=skipna, **kwargs),
"std": lambda: _call_method("std", skipna=skipna, **kwargs),
"argmin": lambda: _call_method("argmin", skipna=skipna, **kwargs),
"argmax": lambda: _call_method("argmax", skipna=skipna, **kwargs),
"count": lambda: _call_method("count", **kwargs),
"nunique": lambda: _call_method("nunique", **kwargs),
"or": lambda: _call_method("or", skipna=skipna, **kwargs),
"and": lambda: _call_method("and", skipna=skipna, **kwargs),
"xor": lambda: _call_method("xor", skipna=skipna, **kwargs),
"first": lambda: _call_method("first", skipna=skipna, **kwargs),
"mode": lambda: _call_method("mode", skipna=skipna, **kwargs),
"unique": lambda: _call_method("unique", **kwargs),
}

fn = reductions.get(name)
if fn is None:
raise TypeError(
f"'ArkoudaArray' with dtype {self.dtype} does not support reduction '{name}'. "
f"Supported: {sorted(reductions)}"
)

return fn()

def __eq__(self, other):
"""
Expand Down
120 changes: 117 additions & 3 deletions tests/pandas/extension/arkouda_array_extension.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import inspect

import numpy as np
import pandas as pd
import pytest
Expand Down Expand Up @@ -233,18 +235,130 @@ def test_argsort(self):
sorted_vals = arr._data[perm]
assert ak.is_sorted(sorted_vals)

@pytest.mark.parametrize("reduction", ["all", "any", "sum", "prod", "min", "max"])
def test_reduce_ops(self, reduction):
@pytest.mark.parametrize(
"reduction",
[
"all",
"any",
"sum",
"prod",
"min",
"max",
"mean",
"var",
"std",
"median",
"mode",
"unique",
"count",
"first",
"nunique",
],
)
def test_reduce_scalar_ops(self, reduction):
ak_data = ak.arange(10)
arr = ArkoudaArray(ak_data)

if not hasattr(ak_data, reduction):
pytest.xfail(f"{reduction} not implemented on pdarray backend yet")

result = arr._reduce(reduction)

assert isinstance(result, numeric_and_bool_scalars)

@pytest.mark.parametrize("reduction", ["argmin", "argmax"])
def test_reduce_arg_ops(self, reduction):
ak_data = ak.arange(10)
arr = ArkoudaArray(ak_data)

result = arr._reduce(reduction)

assert isinstance(result, numeric_and_bool_scalars)
assert result >= 0

@pytest.mark.parametrize("reduction", ["or", "and", "xor"])
def test_reduce_bitwise_ops(self, reduction):
ak_data = ak.array([True, False, True, False])
arr = ArkoudaArray(ak_data)

if not hasattr(ak_data, reduction):
pytest.xfail(f"{reduction} not implemented on pdarray backend yet")

result = arr._reduce(reduction)

assert isinstance(result, bool)

@pytest.mark.parametrize("reduction", ["unique", "mode"])
def test_reduce_array_ops(self, reduction):
ak_data = ak.array([1, 2, 2, 3, 3, 3])
arr = ArkoudaArray(ak_data)

if not hasattr(ak_data, reduction):
pytest.xfail(f"{reduction} not implemented on pdarray backend yet")

result = arr._reduce(reduction)

assert isinstance(result, ArkoudaArray | ak.pdarray)

@pytest.mark.parametrize(
"reduction",
["sum", "mean", "min", "max", "var", "std"],
)
def test_reduce_skipna_kwarg(self, reduction):
ak_data = ak.array([1.0, 2.0, ak.nan, 4.0])
arr = ArkoudaArray(ak_data)

result = arr._reduce(reduction, skipna=True)

assert isinstance(result, numeric_and_bool_scalars)

@pytest.mark.parametrize("reduction", ["var", "std"])
def test_reduce_ddof_kwarg(self, reduction):
ak_data = ak.array([1.0, 2.0, 3.0, 4.0])
arr = ArkoudaArray(ak_data)

meth = getattr(ak_data, reduction, None)
if meth is None:
pytest.xfail(f"{reduction} not implemented on pdarray backend yet")

try:
accepts_ddof = "ddof" in inspect.signature(meth).parameters
except (TypeError, ValueError):
# If we can't introspect, just do a smoke test
accepts_ddof = False

if not accepts_ddof:
# ddof should be ignored by your filtering logic (no crash)
r = arr._reduce(reduction, ddof=1)
assert isinstance(r, numeric_and_bool_scalars)
return

r0 = arr._reduce(reduction, ddof=0)
r1 = arr._reduce(reduction, ddof=1)

# Compare to numpy for correctness
np_data = np.array([1.0, 2.0, 3.0, 4.0])
expected0 = getattr(np_data, reduction)(ddof=0)
expected1 = getattr(np_data, reduction)(ddof=1)

assert np.isclose(r0, expected0)
assert np.isclose(r1, expected1)

@pytest.mark.parametrize("reduction", ["sum", "mean", "min", "max"])
def test_reduce_ignores_unknown_kwargs(self, reduction):
ak_data = ak.arange(10)
arr = ArkoudaArray(ak_data)

r1 = arr._reduce(reduction)
r2 = arr._reduce(reduction, totally_not_a_real_kwarg=123)

assert r1 == r2

def test_reduce_invalid(self):
ak_data = ak.arange(10)
arr = ArkoudaArray(ak_data)
with pytest.raises(TypeError):
arr._reduce("mean")
arr._reduce("test")

def test_concat_same_type(self):
a1 = ArkoudaArray(ak.array([1, 2]))
Expand Down