diff --git a/src/blosc2/proxy.py b/src/blosc2/proxy.py index 213d8808..1c5890c2 100644 --- a/src/blosc2/proxy.py +++ b/src/blosc2/proxy.py @@ -676,3 +676,58 @@ def wrapper(*args, **func_kwargs): return decorator else: return decorator(func) + + +class PandasUdfEngine: + @staticmethod + def _ensure_numpy_data(data): + if not isinstance(data, np.ndarray): + try: + data = data.values + except AttributeError as err: + raise ValueError( + "blosc2.jit received an object of type {data.__name__}, which is not supported. " + "Try casting your Series or DataFrame to a NumPy dtype." + ) from err + return data + + @classmethod + def map(cls, data, func, args, kwargs, decorator, skip_na): + """ + JIT a NumPy array element-wise. In the case of Blosc2, functions are + expected to be vectorized NumPy operations, so the function is called + with the NumPy array as the function parameter, instead of calling the + function once for each element. + """ + raise NotImplementedError("The Blosc2 engine does not support map. Use apply instead.") + + @classmethod + def apply(cls, data, func, args, kwargs, decorator, axis): + """ + JIT a NumPy array by column or row. In the case of Blosc2, functions are + expected to be vectorized NumPy operations, so the function is called + with the NumPy array as the function parameter, instead of calling the + function once for each column or row. + """ + data = cls._ensure_numpy_data(data) + func = decorator(func) + if data.ndim == 1 or axis is None: + # pandas Series.apply or pipe + return func(data, *args, **kwargs) + elif axis in (0, "index"): + # pandas apply(axis=0) column-wise + result = [] + for row_idx in range(data.shape[1]): + result.append(func(data[:, row_idx], *args, **kwargs)) + return np.vstack(result).transpose() + elif axis in (1, "columns"): + # pandas apply(axis=1) row-wise + result = [] + for col_idx in range(data.shape[0]): + result.append(func(data[col_idx, :], *args, **kwargs)) + return np.vstack(result) + else: + raise NotImplementedError(f"Unknown axis '{axis}'. Use one of 0, 1 or None.") + + +jit.__pandas_udf__ = PandasUdfEngine diff --git a/tests/test_pandas_udf_engine.py b/tests/test_pandas_udf_engine.py new file mode 100644 index 00000000..326a7123 --- /dev/null +++ b/tests/test_pandas_udf_engine.py @@ -0,0 +1,120 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# This source code is licensed under a BSD-style license (found in the +# LICENSE file in the root directory of this source tree) +####################################################################### + +import numpy as np +import pytest + +import blosc2 + + +class TestPandasUDF: + def test_map(self): + def add_one(x): + return x + 1 + + data = np.array([1, 2]) + + with pytest.raises(NotImplementedError): + blosc2.jit.__pandas_udf__.map( + data, + add_one, + args=(), + kwargs={}, + decorator=blosc2.jit, + skip_na=False, + ) + + def test_apply_1d(self): + def add_one(x): + return x + 1 + + data = np.array([1, 2]) + + result = blosc2.jit.__pandas_udf__.apply( + data, + add_one, + args=(), + kwargs={}, + decorator=blosc2.jit, + axis=0, + ) + assert result.shape == (2,) + assert result[0] == 2 + assert result[1] == 3 + + def test_apply_1d_with_args(self): + def add_numbers(x, num1, num2): + return x + num1 + num2 + + data = np.array([1, 2]) + + result = blosc2.jit.__pandas_udf__.apply( + data, + add_numbers, + args=(10,), + kwargs={"num2": 100}, + decorator=blosc2.jit, + axis=0, + ) + assert result.shape == (2,) + assert result[0] == 111 + assert result[1] == 112 + + def test_apply_2d(self): + def add_one(x): + assert x.shape == (2, 3) + return x + 1 + + data = np.array([[1, 2, 3], [4, 5, 6]]) + + result = blosc2.jit.__pandas_udf__.apply( + data, + add_one, + args=(), + kwargs={}, + decorator=blosc2.jit, + axis=None, + ) + expected = np.array([[2, 3, 4], [5, 6, 7]]) + assert np.array_equal(result, expected) + + def test_apply_2d_by_column(self): + def add_one(x): + assert x.shape == (2,) + return x + 1 + + data = np.array([[1, 2, 3], [4, 5, 6]]) + + result = blosc2.jit.__pandas_udf__.apply( + data, + add_one, + args=(), + kwargs={}, + decorator=blosc2.jit, + axis=0, + ) + expected = np.array([[2, 3, 4], [5, 6, 7]]) + assert np.array_equal(result, expected) + + def test_apply_2d_by_row(self): + def add_one(x): + assert x.shape == (3,) + return x + 1 + + data = np.array([[1, 2, 3], [4, 5, 6]]) + + result = blosc2.jit.__pandas_udf__.apply( + data, + add_one, + args=(), + kwargs={}, + decorator=blosc2.jit, + axis=1, + ) + expected = np.array([[2, 3, 4], [5, 6, 7]]) + assert np.array_equal(result, expected)