From 07df083f37047cf8b10b8cad2bb2b6116911129b Mon Sep 17 00:00:00 2001 From: wjsi Date: Sun, 1 Oct 2023 14:36:56 +0800 Subject: [PATCH 1/2] Add support for mode function --- docs/source/reference/dataframe/frame.rst | 1 + docs/source/reference/dataframe/series.rst | 1 + mars/dataframe/reduction/__init__.py | 2 + mars/dataframe/reduction/aggregation.py | 118 ++++++--- mars/dataframe/reduction/core.py | 102 +++++-- mars/dataframe/reduction/mode.py | 250 ++++++++++++++++++ .../tests/test_reduction_execution.py | 78 ++++++ mars/opcodes.py | 1 + 8 files changed, 488 insertions(+), 65 deletions(-) create mode 100644 mars/dataframe/reduction/mode.py diff --git a/docs/source/reference/dataframe/frame.rst b/docs/source/reference/dataframe/frame.rst index 1a76363ab8..daa593be29 100644 --- a/docs/source/reference/dataframe/frame.rst +++ b/docs/source/reference/dataframe/frame.rst @@ -127,6 +127,7 @@ Computations / descriptive stats DataFrame.max DataFrame.mean DataFrame.min + DataFrame.mode DataFrame.nunique DataFrame.pct_change DataFrame.prod diff --git a/docs/source/reference/dataframe/series.rst b/docs/source/reference/dataframe/series.rst index 3b939b665b..326053157b 100644 --- a/docs/source/reference/dataframe/series.rst +++ b/docs/source/reference/dataframe/series.rst @@ -125,6 +125,7 @@ Computations / descriptive stats Series.mean Series.median Series.min + Series.mode Series.pct_change Series.prod Series.product diff --git a/mars/dataframe/reduction/__init__.py b/mars/dataframe/reduction/__init__.py index 3f5bf4b99d..ddadd0a100 100644 --- a/mars/dataframe/reduction/__init__.py +++ b/mars/dataframe/reduction/__init__.py @@ -62,6 +62,7 @@ def _install(): from .skew import skew_dataframe, skew_series from .kurtosis import kurt_dataframe, kurt_series from .reduction_size import size_dataframe, size_series + from .mode import mode_dataframe, mode_series funcs = [ ("sum", sum_series, sum_dataframe), @@ -88,6 +89,7 @@ def _install(): ("kurtosis", kurt_series, kurt_dataframe), ("unique", unique, None), ("_reduction_size", size_dataframe, size_series), + ("mode", mode_series, mode_dataframe), ] for func_name, series_func, df_func in funcs: if df_func is not None: # pragma: no branch diff --git a/mars/dataframe/reduction/aggregation.py b/mars/dataframe/reduction/aggregation.py index e5c33d6ca7..34390f4457 100644 --- a/mars/dataframe/reduction/aggregation.py +++ b/mars/dataframe/reduction/aggregation.py @@ -79,6 +79,7 @@ def where_function(cond, var1, var2): "kurt": lambda x, skipna=True, bias=False: x.kurt(skipna=skipna, bias=bias), "kurtosis": lambda x, skipna=True, bias=False: x.kurtosis(skipna=skipna, bias=bias), "nunique": lambda x: x.nunique(), + "mode": lambda x, skipna=True: x.mode(skipna=skipna), } @@ -126,53 +127,76 @@ def _calc_result_shape(self, df): if isinstance(result_df, pd.DataFrame): self.output_types = [OutputType.dataframe] - return result_df.dtypes, result_df.index + return result_df.dtypes, result_df.index, 2 elif isinstance(result_df, pd.Series): self.output_types = [OutputType.series] - return pd.Series([result_df.dtype], index=[result_df.name]), result_df.index + return ( + pd.Series([result_df.dtype], index=[result_df.name]), + result_df.index, + 1, + ) else: self.output_types = [OutputType.scalar] - return np.array(result_df).dtype, None + return np.array(result_df).dtype, None, 0 def __call__(self, df, output_type=None, dtypes=None, index=None): self._output_types = df.op.output_types normalize_reduction_funcs(self, ndim=df.ndim) if output_type is None or dtypes is None: with enter_mode(kernel=False, build=False): - dtypes, index = self._calc_result_shape(df) + dtypes, index, out_ndim = self._calc_result_shape(df) else: self.output_types = [output_type] + if output_type == OutputType.dataframe: + out_ndim = 2 + elif output_type == OutputType.series: + out_ndim = 1 + else: + out_ndim = 0 + reduced_len = ( + 1 if df.ndim != out_ndim or isinstance(self.raw_func, list) else np.nan + ) if self.output_types[0] == OutputType.dataframe: if self.axis == 0: - new_shape = (len(index), len(dtypes)) - new_index = parse_index(index, store_data=True) + new_shape = (len(index) * reduced_len, len(dtypes)) + new_index_value = parse_index(index, store_data=True) + new_dtypes = dtypes + new_col_name = parse_index(dtypes.index, store_data=True) else: - new_shape = (df.shape[0], len(dtypes)) - new_index = df.index_value + new_shape = (df.shape[0], len(dtypes) * reduced_len) + new_index_value = df.index_value + new_dtypes = None if np.isnan(reduced_len) else dtypes + new_col_name = parse_index( + dtypes.index, store_data=not np.isnan(reduced_len) + ) return self.new_dataframe( [df], shape=new_shape, - dtypes=dtypes, - index_value=new_index, - columns_value=parse_index(dtypes.index, store_data=True), + dtypes=new_dtypes, + index_value=new_index_value, + columns_value=new_col_name, ) elif self.output_types[0] == OutputType.series: if df.ndim == 1: - new_shape = (len(index),) - new_index = parse_index(index, store_data=True) + new_shape = (len(index) * reduced_len,) + new_index_value = parse_index( + index, store_data=not np.isnan(reduced_len) + ) elif self.axis == 0: - new_shape = (len(index),) - new_index = parse_index(index, store_data=True) + new_shape = (len(index) * reduced_len,) + new_index_value = parse_index( + index, store_data=not np.isnan(reduced_len) + ) else: new_shape = (df.shape[0],) - new_index = df.index_value + new_index_value = df.index_value return self.new_series( [df], shape=new_shape, dtype=dtypes[0], name=dtypes.index[0], - index_value=new_index, + index_value=new_index_value, ) elif self.output_types[0] == OutputType.tensor: return self.new_tileable([df], dtype=dtypes, shape=(np.nan,)) @@ -208,6 +232,9 @@ def _gen_map_chunks( agg_chunks = np.empty(agg_chunks_shape, dtype=object) dtypes_cache = dict() + reduced_len = ( + 1 if in_df.ndim != out_df.ndim or isinstance(op.raw_func, list) else np.nan + ) for chunk in in_df.chunks: input_index = chunk.index[1 - axis] if len(chunk.index) > 1 else 0 if input_index not in input_index_to_output: @@ -234,9 +261,9 @@ def _gen_map_chunks( if map_op.output_types[0] == OutputType.dataframe: if axis == 0: - shape = (1, out_df.shape[-1]) + shape = (reduced_len, chunk.shape[-1]) if out_df.ndim == 2: - columns_value = out_df.columns_value + columns_value = chunk.columns_value index_value = out_df.index_value else: columns_value = out_df.index_value @@ -259,11 +286,11 @@ def _gen_map_chunks( index_value=index_value, ) else: - shape = (out_df.shape[0], 1) + shape = (chunk.shape[0], reduced_len) columns_value = parse_index( pd.Index([0]), out_df.key, store_data=True ) - index_value = out_df.index_value + index_value = chunk.index_value agg_chunk = map_op.new_chunk( [chunk], @@ -273,7 +300,9 @@ def _gen_map_chunks( index_value=index_value, ) else: - agg_chunk = map_op.new_chunk([chunk], shape=(1,), index=new_index) + agg_chunk = map_op.new_chunk( + [chunk], shape=(reduced_len,), index=new_index + ) agg_chunks[agg_chunk.index] = agg_chunk return agg_chunks @@ -409,6 +438,9 @@ def _tile_tree(cls, op: "DataFrameAggregate"): chunks = cls._gen_map_chunks( op, in_df, out_df, axis_func_infos, input_index_to_output ) + reduced_len = ( + 1 if in_df.ndim != out_df.ndim or isinstance(op.raw_func, list) else np.nan + ) while chunks.shape[axis] > combine_size: if axis == 0: new_chunks_shape = ( @@ -429,16 +461,16 @@ def _tile_tree(cls, op: "DataFrameAggregate"): chks = chunks[i : i + combine_size, idx1] chunk_index = (idx0, idx1) if chks[0].ndim == 1: - concat_shape = (len(chks),) - agg_shape = (1,) + concat_shape = (len(chks) * reduced_len,) + agg_shape = (reduced_len,) else: - concat_shape = (len(chks), chks[0].shape[1]) - agg_shape = (chks[0].shape[1], 1) + concat_shape = (len(chks) * reduced_len, chks[0].shape[1]) + agg_shape = (chks[0].shape[1], reduced_len) else: chks = chunks[idx1, i : i + combine_size] chunk_index = (idx1, idx0) - concat_shape = (chks[0].shape[0], len(chks)) - agg_shape = (chks[0].shape[0], 1) + concat_shape = (chks[0].shape[0], len(chks) * reduced_len) + agg_shape = (chks[0].shape[0], reduced_len) chks = chks.reshape((chks.shape[0],)).tolist() if len(chks) == 1: @@ -485,12 +517,12 @@ def _tile_tree(cls, op: "DataFrameAggregate"): if axis == 0: chks = chunks[:, idx] if chks[0].ndim == 1: - concat_shape = (len(chks),) + concat_shape = (len(chks) * reduced_len,) else: - concat_shape = (len(chks), chks[0].shape[1]) + concat_shape = (len(chks) * reduced_len, chks[0].shape[1]) else: chks = chunks[idx, :] - concat_shape = (chks[0].shape[0], len(chks)) + concat_shape = (chks[0].shape[0], len(chks) * reduced_len) chks = chks.reshape((chks.shape[0],)).tolist() chk = concat_op.new_chunk( chks, @@ -519,7 +551,7 @@ def _tile_tree(cls, op: "DataFrameAggregate"): shape_len = len(col_index) kw.update( dict( - shape=(out_df.shape[0], shape_len), + shape=(out_df.shape[0] * reduced_len, shape_len), columns_value=columns_value, index=(0, idx), dtypes=out_df.dtypes[columns_value.to_pandas()], @@ -531,7 +563,10 @@ def _tile_tree(cls, op: "DataFrameAggregate"): dict( index=(idx, 0), index_value=src_col_chunk.index_value, - shape=(src_col_chunk.shape[0], out_df.shape[1]), + shape=( + src_col_chunk.shape[0], + out_df.shape[1] * reduced_len, + ), dtypes=out_df.dtypes, ) ) @@ -843,25 +878,26 @@ def _execute_agg(cls, ctx, op: "DataFrameAggregate"): concat_df = concat_df.iloc[:, 0] else: concat_df = concat_df.iloc[:, 0] - concat_df.name = op.outputs[0].name + concat_df.name = out.name - concat_df = concat_df.astype(op.outputs[0].dtype, copy=False) + concat_df = concat_df.astype(out.dtype, copy=False) elif op.output_types[0] == OutputType.scalar: concat_df = concat_df.iloc[0] try: - concat_df = concat_df.astype(op.outputs[0].dtype) + concat_df = concat_df.astype(out.dtype) except AttributeError: # concat_df may be a string and has no `astype` method pass elif op.output_types[0] == OutputType.tensor: concat_df = xp.array(concat_df).astype(dtype=out.dtype) else: - if axis == 0: - concat_df = concat_df.reindex(op.outputs[0].index_value.to_pandas()) - else: - concat_df = concat_df[op.outputs[0].columns_value.to_pandas()] + if not np.isnan(out.shape[op.axis]): + if axis == 0: + concat_df = concat_df.reindex(out.index_value.to_pandas()) + else: + concat_df = concat_df[out.columns_value.to_pandas()] - concat_df = concat_df.astype(op.outputs[0].dtypes, copy=False) + concat_df = concat_df.astype(out.dtypes, copy=False) ctx[op.outputs[0].key] = concat_df @classmethod diff --git a/mars/dataframe/reduction/core.py b/mars/dataframe/reduction/core.py index 69f01a7a2d..502effb0a3 100644 --- a/mars/dataframe/reduction/core.py +++ b/mars/dataframe/reduction/core.py @@ -214,7 +214,7 @@ def _default_agg_fun(value, func_name=None, **kw): @functools.lru_cache(100) -def _get_series_reduction_dtype( +def _get_reduced_series( dtype, func_name, axis=None, @@ -227,17 +227,19 @@ def _get_series_reduction_dtype( reduced = test_series.count() elif func_name == "nunique": reduced = test_series.nunique() + elif func_name == "mode": + reduced = test_series.mode(dropna=skipna) elif func_name in ("all", "any"): reduced = getattr(test_series, func_name)(axis=axis, bool_only=bool_only) elif func_name == "size": reduced = test_series.size elif func_name == "str_concat": - reduced = pd.Series([test_series.str.cat()]) + reduced = test_series.str.cat() else: reduced = getattr(test_series, func_name)( axis=axis, skipna=skipna, numeric_only=numeric_only ) - return pd.Series(reduced).dtype + return reduced @functools.lru_cache(100) @@ -286,6 +288,9 @@ def tile(cls, op): elif out_df.ndim == 1: output_type = OutputType.tensor dtypes, index = out_df.dtype, None + elif isinstance(out_df, DATAFRAME_TYPE): # for mode function only + output_type = OutputType.dataframe + dtypes, index = out_df.dtypes, out_df.index_value.to_pandas() else: output_type = OutputType.scalar dtypes, index = out_df.dtype, None @@ -321,6 +326,7 @@ def _call_dataframe(self, df): if level is not None and axis == 1: raise NotImplementedError("Not support specify level for axis==1") + is_output_df = False if func_name == "size": reduced = pd.Series( np.zeros(df.shape[1 - axis]), @@ -333,6 +339,16 @@ def _call_dataframe(self, df): reduced = getattr(self, "custom_reduction").__call_agg__(empty_df) reduced_cols = list(reduced.index) reduced_dtype = reduced.dtype + elif func_name == "mode": + empty_df = build_df(df, ensure_string=True) + reduced = empty_df.mode(axis=axis, numeric_only=numeric_only, dropna=skipna) + is_output_df = True + if self._axis == 0: + reduced_cols = list(reduced.dtypes) + reduced_dtype = reduced.dtypes + else: + reduced_cols = [] + reduced_dtype = None else: reduced_cols, dtypes = [], [] for col, src_dt in df.dtypes.items(): @@ -374,22 +390,44 @@ def _call_dataframe(self, df): else: reduced_dtype = np.find_common_type(dtypes, []) - if level is not None: - return self._call_groupby_level(df[reduced_cols], level) - - if axis == 0: - reduced_shape = (len(reduced_cols),) - reduced_index_value = parse_index(pd.Index(reduced_cols), store_data=True) + if is_output_df: + if axis == 0: + reduced_shape = (np.nan, len(reduced_cols)) + reduced_index_value = parse_index(pd.RangeIndex(0), df.key) + reduced_columns_value = parse_index( + reduced_dtype.index, store_data=True + ) + else: + reduced_shape = (df.shape[0], np.nan) + reduced_index_value = df.index_value + reduced_columns_value = None + + return self.new_dataframe( + [df], + shape=reduced_shape, + dtypes=reduced_dtype, + columns_value=reduced_columns_value, + index_value=reduced_index_value, + ) else: - reduced_shape = (df.shape[0],) - reduced_index_value = parse_index(pd.RangeIndex(-1)) + if level is not None: + return self._call_groupby_level(df[reduced_cols], level) - return self.new_series( - [df], - shape=reduced_shape, - dtype=reduced_dtype, - index_value=reduced_index_value, - ) + if axis == 0: + reduced_shape = (len(reduced_cols),) + reduced_index_value = parse_index( + pd.Index(reduced_cols), store_data=True + ) + else: + reduced_shape = (df.shape[0],) + reduced_index_value = parse_index(pd.RangeIndex(-1)) + + return self.new_series( + [df], + shape=reduced_shape, + dtype=reduced_dtype, + index_value=reduced_index_value, + ) def _call_series(self, series): level = getattr(self, "level", None) @@ -405,12 +443,9 @@ def _call_series(self, series): if func_name == "custom_reduction": empty_series = build_series(series, ensure_string=True) - result_scalar = getattr(self, "custom_reduction").__call_agg__(empty_series) - if hasattr(result_scalar, "to_pandas"): # pragma: no cover - result_scalar = result_scalar.to_pandas() - result_dtype = pd.Series(result_scalar).dtype + mock_result = getattr(self, "custom_reduction").__call_agg__(empty_series) else: - result_dtype = _get_series_reduction_dtype( + mock_result = _get_reduced_series( series.dtype, func_name, axis=axis, @@ -418,7 +453,22 @@ def _call_series(self, series): numeric_only=numeric_only, skipna=skipna, ) - return self.new_scalar([series], dtype=result_dtype) + + if hasattr(mock_result, "to_pandas"): # pragma: no cover + mock_result = mock_result.to_pandas() + result_dtype = pd.Series(mock_result).dtype + if isinstance(mock_result, pd.Series): + self._output_types = [OutputType.series] + kw = { + "dtype": result_dtype, + "index_value": parse_index(mock_result.index, store_data=False), + "shape": (np.nan,), + "name": mock_result.name, + } + else: + self._output_types = [OutputType.scalar] + kw = {"dtype": result_dtype, "shape": ()} + return self.new_tileable([series], **kw) def __call__(self, a): if is_kernel_mode() and not getattr(self, "is_atomic", False): @@ -785,6 +835,7 @@ class ReductionSteps(NamedTuple): mod="%", ) _func_compile_cache = dict() # type: Dict[str, ReductionSteps] +_equal_dim_funcs = {"mode"} class ReductionCompiler: @@ -932,7 +983,10 @@ def _compile_function(self, func, func_name=None, ndim=1) -> ReductionSteps: raise ValueError( f"Custom function should return a Mars object, not {type(func_ret)}" ) - if func_ret.ndim >= ndim: + if ( + func_ret.ndim == ndim + and getattr(func, "name", None) not in _equal_dim_funcs + ) or func_ret.ndim > ndim: raise ValueError("Function not a reduction") agg_graph = func_ret.build_graph() diff --git a/mars/dataframe/reduction/mode.py b/mars/dataframe/reduction/mode.py new file mode 100644 index 0000000000..9e4494fcd7 --- /dev/null +++ b/mars/dataframe/reduction/mode.py @@ -0,0 +1,250 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pandas as pd + +try: + import pyarrow as pa +except ImportError: # pragma: no cover + pa = None + +from ... import opcodes +from ...core import OutputType +from ...config import options +from ...serialization.serializables import BoolField +from ...utils import lazy_import +from ..arrays import ArrowListDtype +from .core import DataFrameReductionOperand, DataFrameReductionMixin, CustomReduction + +cudf = lazy_import("cudf") + + +class ModeReduction(CustomReduction): + pre_with_agg = True + + def __init__( + self, name="mode", axis=0, numeric_only=False, dropna=True, is_gpu=False + ): + super().__init__(name, is_gpu=is_gpu) + self._axis = axis + self._numeric_only = numeric_only + self._dropna = dropna + + def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ + xdf = cudf if self.is_gpu() else pd + if isinstance(in_data, xdf.Series): + return in_data.value_counts(dropna=self._dropna) + else: + if self._axis == 0: + data = dict() + for d, v in in_data.iteritems(): + data[d] = [v.value_counts(dropna=self._dropna).to_dict()] + df = xdf.DataFrame(data) + else: + df = xdf.DataFrame(columns=[0]) + for d, v in in_data.iterrows(): + df.loc[d] = [v.value_counts(dropna=self._dropna).to_dict()] + return df + + def agg(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ + xdf = cudf if self.is_gpu() else pd + if isinstance(in_data, xdf.Series): + return in_data.groupby(in_data.index, dropna=self._dropna).sum() + else: + if self._axis == 0: + data = dict() + for d, v in in_data.iteritems(): + data[d] = [v.apply(pd.Series).sum().to_dict()] + df = xdf.DataFrame(data) + else: + df = xdf.DataFrame(columns=[0]) + for d, v in in_data.iterrows(): + df.loc[d] = [v.apply(pd.Series).sum().to_dict()] + return df + + def post(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ + xdf = cudf if self.is_gpu() else pd + + def _handle_series(s): + summed = s.groupby(s.index, dropna=self._dropna).sum() + if summed.ndim == 2: + summed = summed.iloc[:, 0] + return pd.Series(summed[summed == summed.max()].index) + + if isinstance(in_data, xdf.Series): + return _handle_series(in_data) + else: + in_data_iter = ( + in_data.iteritems() if self._axis == 0 else in_data.iterrows() + ) + s_list = [] + for d, v in in_data_iter: + if isinstance(v.dtype, ArrowListDtype): + v = xdf.Series(v.to_numpy()) + s = _handle_series(v.apply(pd.Series).T) + s.name = d + s_list.append(s) + res = pd.concat(s_list, axis=1) + if self._axis == 0: + return res + return res.T + + +class DataFrameMode(DataFrameReductionOperand, DataFrameReductionMixin): + _op_type_ = opcodes.MODE + _func_name = "mode" + + numeric_only = BoolField("numeric_only", default=None) + use_arrow_dtype = BoolField("use_arrow_dtype", default=None) + + @classmethod + def get_reduction_callable(cls, op: "DataFrameMode"): + return ModeReduction( + name=cls._func_name, + axis=op.axis, + numeric_only=op.numeric_only, + dropna=op.skipna, + is_gpu=op.is_gpu(), + ) + + @property + def dropna(self) -> bool: + return self.skipna + + @classmethod + def tile(cls, op): + ts = yield from super().tile(op) + return ts + + def __call__(self, *args, **kwargs): + t = super().__call__(*args, **kwargs) + return t + + +def mode_dataframe(df, axis=0, numeric_only=False, dropna=True, combine_size=None): + """ + Get the mode(s) of each element along the selected axis. + + The mode of a set of values is the value that appears most often. + It can be multiple values. + + Parameters + ---------- + axis : {0 or 'index', 1 or 'columns'}, default 0 + The axis to iterate over while searching for the mode: + + * 0 or 'index' : get mode of each column + * 1 or 'columns' : get mode of each row. + + numeric_only : bool, default False + If True, only apply to numeric columns. + dropna : bool, default True + Don't consider counts of NaN/NaT. + + Returns + ------- + DataFrame + The modes of each column or row. + + See Also + -------- + Series.mode : Return the highest frequency value in a Series. + Series.value_counts : Return the counts of values in a Series. + + Examples + -------- + >>> import mars.tensor as mt + >>> import mars.dataframe as md + >>> df = md.DataFrame([('bird', 2, 2), + ... ('mammal', 4, mt.nan), + ... ('arthropod', 8, 0), + ... ('bird', 2, mt.nan)], + ... index=('falcon', 'horse', 'spider', 'ostrich'), + ... columns=('species', 'legs', 'wings')) + >>> df.execute() + species legs wings + falcon bird 2 2.0 + horse mammal 4 NaN + spider arthropod 8 0.0 + ostrich bird 2 NaN + + By default, missing values are not considered, and the mode of wings + are both 0 and 2. Because the resulting DataFrame has two rows, + the second row of ``species`` and ``legs`` contains ``NaN``. + + >>> df.mode().execute() + species legs wings + 0 bird 2.0 0.0 + 1 NaN NaN 2.0 + + Setting ``dropna=False`` ``NaN`` values are considered and they can be + the mode (like for wings). + + >>> df.mode(dropna=False).execute() + species legs wings + 0 bird 2 NaN + + Setting ``numeric_only=True``, only the mode of numeric columns is + computed, and columns of other types are ignored. + + >>> df.mode(numeric_only=True).execute() + legs wings + 0 2.0 0.0 + 1 NaN 2.0 + + To compute the mode over columns and not rows, use the axis parameter: + + >>> df.mode(axis='columns', numeric_only=True).execute() + 0 1 + falcon 2.0 NaN + horse 4.0 NaN + spider 0.0 8.0 + ostrich 2.0 NaN + """ + op = DataFrameMode( + axis=axis, + numeric_only=numeric_only, + dropna=dropna, + combine_size=combine_size, + output_types=[OutputType.series], + use_arrow_dtype=options.dataframe.use_arrow_dtype, + ) + return op(df) + + +def mode_series(series, dropna=True, combine_size=None): + """ + Return the mode(s) of the Series. + + The mode is the value that appears most often. There can be multiple modes. + + Always returns Series even if only one value is returned. + + Parameters + ---------- + dropna : bool, default True + Don't consider counts of NaN/NaT. + + Returns + ------- + Series + Modes of the Series in sorted order. + """ + op = DataFrameMode( + dropna=dropna, + combine_size=combine_size, + output_types=[OutputType.scalar], + use_arrow_dtype=options.dataframe.use_arrow_dtype, + ) + return op(series) diff --git a/mars/dataframe/reduction/tests/test_reduction_execution.py b/mars/dataframe/reduction/tests/test_reduction_execution.py index 4ba305c86c..7e7d2c4459 100644 --- a/mars/dataframe/reduction/tests/test_reduction_execution.py +++ b/mars/dataframe/reduction/tests/test_reduction_execution.py @@ -681,6 +681,84 @@ def test_nunique(setup, check_ref_counts): pd.testing.assert_series_equal(result, expected) +def test_mode(setup, check_ref_counts): + config_kw = { + "extra_config": { + "check_shape": False, + "check_index_value": False, + } + } + data1 = pd.Series(np.random.randint(0, 5, size=(20,))) + + series = md.Series(data1) + result = series.mode().execute().fetch() + expected = data1.mode() + pd.testing.assert_series_equal(result, expected) + + series = md.Series(data1, chunk_size=6) + result = series.mode().execute().fetch() + expected = data1.mode() + pd.testing.assert_series_equal(result, expected) + + # test dropna + data2 = data1.copy() + data2[[2, 9, 18]] = np.nan + + series = md.Series(data2) + result = series.mode().execute().fetch() + expected = data2.mode() + pd.testing.assert_series_equal(result, expected) + + series = md.Series(data2, chunk_size=3) + result = series.mode(dropna=False).execute(**config_kw).fetch(**config_kw) + expected = data2.mode(dropna=False) + pd.testing.assert_series_equal(result, expected) + + # test dataframe + data1 = pd.DataFrame( + np.random.randint(0, 6, size=(20, 20)), + columns=["c" + str(i) for i in range(20)], + ) + df = md.DataFrame(data1) + result = df.mode().execute().fetch() + expected = data1.mode() + pd.testing.assert_frame_equal(result, expected) + + df = md.DataFrame(data1, chunk_size=6) + result = df.mode().execute(**config_kw).fetch(**config_kw) + expected = data1.mode() + pd.testing.assert_frame_equal(result, expected) + + df = md.DataFrame(data1) + result = df.mode(axis=1).execute().fetch() + expected = data1.mode(axis=1) + pd.testing.assert_frame_equal(result, expected) + + df = md.DataFrame(data1, chunk_size=3) + result = df.mode(axis=1).execute(**config_kw).fetch(**config_kw) + expected = data1.mode(axis=1) + pd.testing.assert_frame_equal(result, expected) + + # test dropna + data2 = data1.copy() + data2.iloc[[2, 9, 18], [2, 9, 18]] = np.nan + + df = md.DataFrame(data2) + result = df.mode().execute().fetch() + expected = data2.mode() + pd.testing.assert_frame_equal(result, expected) + + df = md.DataFrame(data2, chunk_size=3) + result = df.mode(dropna=False).execute(**config_kw).fetch(**config_kw) + expected = data2.mode(dropna=False) + pd.testing.assert_frame_equal(result, expected) + + df = md.DataFrame(data1, chunk_size=3) + result = df.mode(axis=1).execute(**config_kw).fetch(**config_kw) + expected = data1.mode(axis=1) + pd.testing.assert_frame_equal(result, expected) + + @pytest.mark.skipif(pa is None, reason="pyarrow not installed") def test_use_arrow_dtype_nunique(setup, check_ref_counts): with option_context({"dataframe.use_arrow_dtype": True, "combine_size": 2}): diff --git a/mars/opcodes.py b/mars/opcodes.py index 6036585977..575a0bd918 100644 --- a/mars/opcodes.py +++ b/mars/opcodes.py @@ -270,6 +270,7 @@ SEM = 352 STR_CONCAT = 353 MAD = 354 +MODE = 355 # tensor operand RESHAPE = 401 From 49b7ca2ef8a23a70afe5558e17476b97ee981bcb Mon Sep 17 00:00:00 2001 From: wjsi Date: Tue, 3 Oct 2023 17:20:10 +0800 Subject: [PATCH 2/2] Fix case --- mars/dataframe/base/bloom_filter.py | 2 +- mars/dataframe/reduction/aggregation.py | 41 +++++++++++++------ mars/dataframe/reduction/mode.py | 23 ++++------- mars/dataframe/reduction/nunique.py | 8 ++-- .../tests/test_reduction_execution.py | 35 ++++++++++++---- mars/deploy/oscar/session.py | 2 + mars/deploy/oscar/tests/session.py | 2 + mars/tests/test_session.py | 6 +-- 8 files changed, 76 insertions(+), 43 deletions(-) diff --git a/mars/dataframe/base/bloom_filter.py b/mars/dataframe/base/bloom_filter.py index 65b9a14591..ac3da3a750 100644 --- a/mars/dataframe/base/bloom_filter.py +++ b/mars/dataframe/base/bloom_filter.py @@ -175,7 +175,7 @@ def _build_dataframe_filter(cls, in_data: pd.DataFrame, op: "DataFrameBloomFilte def _convert_to_hashable_dtypes(cls, dtypes: pd.Series): dtypes = dict( (name, dtype) if np.issubdtype(dtype, int) else (name, str) - for name, dtype in dtypes.iteritems() + for name, dtype in dtypes.items() ) return dtypes diff --git a/mars/dataframe/reduction/aggregation.py b/mars/dataframe/reduction/aggregation.py index 34390f4457..32fbc94f73 100644 --- a/mars/dataframe/reduction/aggregation.py +++ b/mars/dataframe/reduction/aggregation.py @@ -139,6 +139,15 @@ def _calc_result_shape(self, df): self.output_types = [OutputType.scalar] return np.array(result_df).dtype, None, 0 + def _get_reduced_dim_unit(self, in_ndim, out_ndim): + """ + If rows can be reduced into multiple columns, return nan, + otherwise returns 1 + """ + if not isinstance(self.raw_func, str) and isinstance(self.raw_func, Iterable): + return 1 + return 1 if in_ndim != out_ndim else np.nan + def __call__(self, df, output_type=None, dtypes=None, index=None): self._output_types = df.op.output_types normalize_reduction_funcs(self, ndim=df.ndim) @@ -154,9 +163,7 @@ def __call__(self, df, output_type=None, dtypes=None, index=None): else: out_ndim = 0 - reduced_len = ( - 1 if df.ndim != out_ndim or isinstance(self.raw_func, list) else np.nan - ) + reduced_len = self._get_reduced_dim_unit(df.ndim, out_ndim) if self.output_types[0] == OutputType.dataframe: if self.axis == 0: new_shape = (len(index) * reduced_len, len(dtypes)) @@ -213,7 +220,7 @@ def _safe_append(d, key, val): @classmethod def _gen_map_chunks( cls, - op, + op: "DataFrameAggregate", in_df, out_df, func_infos: List[ReductionSteps], @@ -232,9 +239,7 @@ def _gen_map_chunks( agg_chunks = np.empty(agg_chunks_shape, dtype=object) dtypes_cache = dict() - reduced_len = ( - 1 if in_df.ndim != out_df.ndim or isinstance(op.raw_func, list) else np.nan - ) + reduced_len = op._get_reduced_dim_unit(in_df.ndim, out_df.ndim) for chunk in in_df.chunks: input_index = chunk.index[1 - axis] if len(chunk.index) > 1 else 0 if input_index not in input_index_to_output: @@ -438,9 +443,7 @@ def _tile_tree(cls, op: "DataFrameAggregate"): chunks = cls._gen_map_chunks( op, in_df, out_df, axis_func_infos, input_index_to_output ) - reduced_len = ( - 1 if in_df.ndim != out_df.ndim or isinstance(op.raw_func, list) else np.nan - ) + reduced_len = op._get_reduced_dim_unit(in_df.ndim, out_df.ndim) while chunks.shape[axis] > combine_size: if axis == 0: new_chunks_shape = ( @@ -715,6 +718,8 @@ def _do_predefined_agg(cls, op: "DataFrameAggregate", input_obj, func_name, kwds raise NotImplementedError("numeric_only not implemented under cudf") if isinstance(input_obj, pd.Index): kwds.pop("skipna", None) + if getattr(input_obj, "ndim", 0) > 1: + kwds["axis"] = op.axis return getattr(input_obj, func_name)(**kwds) @classmethod @@ -865,7 +870,19 @@ def _execute_agg(cls, ctx, op: "DataFrameAggregate"): ser_index = None if agg_series_ndim < out.ndim: ser_index = [func_name] - aggs.append(cls._wrap_df(op, agg_series, index=ser_index)) + if ( + isinstance(agg_series, np.ndarray) + and getattr(func_inputs[0], "ndim", 0) >= 1 + and hasattr(func_inputs[0], "index") + ): + agg_series = cls._wrap_df(op, agg_series, index=ser_index) + if op.axis == 0: + agg_series.columns = func_inputs[0].index + else: + agg_series.index = func_inputs[0].index + else: + agg_series = cls._wrap_df(op, agg_series, index=ser_index) + aggs.append(agg_series) # concatenate to produce final result concat_df = xdf.concat(aggs, axis=axis) @@ -931,7 +948,7 @@ def execute(cls, ctx, op: "DataFrameAggregate"): ): result = op.func[0](in_data) else: - result = in_data.agg(op.raw_func, axis=op.axis) + result = in_data.agg(op.raw_func, axis=op.axis, **op.raw_func_kw) if op.outputs[0].ndim == 1: result = result.astype(op.outputs[0].dtype, copy=False) diff --git a/mars/dataframe/reduction/mode.py b/mars/dataframe/reduction/mode.py index 9e4494fcd7..623e424b97 100644 --- a/mars/dataframe/reduction/mode.py +++ b/mars/dataframe/reduction/mode.py @@ -14,11 +14,6 @@ import pandas as pd -try: - import pyarrow as pa -except ImportError: # pragma: no cover - pa = None - from ... import opcodes from ...core import OutputType from ...config import options @@ -41,6 +36,12 @@ def __init__( self._numeric_only = numeric_only self._dropna = dropna + @staticmethod + def _explode_dict_series(s: pd.Series) -> pd.DataFrame: + exploded = s.apply(pd.Series) + # if exploded.columns.hasnans: + return exploded + def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ xdf = cudf if self.is_gpu() else pd if isinstance(in_data, xdf.Series): @@ -48,7 +49,7 @@ def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ else: if self._axis == 0: data = dict() - for d, v in in_data.iteritems(): + for d, v in in_data.items(): data[d] = [v.value_counts(dropna=self._dropna).to_dict()] df = xdf.DataFrame(data) else: @@ -64,7 +65,7 @@ def agg(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ else: if self._axis == 0: data = dict() - for d, v in in_data.iteritems(): + for d, v in in_data.items(): data[d] = [v.apply(pd.Series).sum().to_dict()] df = xdf.DataFrame(data) else: @@ -85,9 +86,7 @@ def _handle_series(s): if isinstance(in_data, xdf.Series): return _handle_series(in_data) else: - in_data_iter = ( - in_data.iteritems() if self._axis == 0 else in_data.iterrows() - ) + in_data_iter = in_data.items() if self._axis == 0 else in_data.iterrows() s_list = [] for d, v in in_data_iter: if isinstance(v.dtype, ArrowListDtype): @@ -127,10 +126,6 @@ def tile(cls, op): ts = yield from super().tile(op) return ts - def __call__(self, *args, **kwargs): - t = super().__call__(*args, **kwargs) - return t - def mode_dataframe(df, axis=0, numeric_only=False, dropna=True, combine_size=None): """ diff --git a/mars/dataframe/reduction/nunique.py b/mars/dataframe/reduction/nunique.py index 3bcafd3db4..cac883bbec 100644 --- a/mars/dataframe/reduction/nunique.py +++ b/mars/dataframe/reduction/nunique.py @@ -59,7 +59,7 @@ def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ else: if self._axis == 0: data = dict() - for d, v in in_data.iteritems(): + for d, v in in_data.items(): if not self._use_arrow_dtype or xdf is cudf: data[d] = [v.drop_duplicates().to_list()] else: @@ -82,7 +82,7 @@ def agg(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ else: if self._axis == 0: data = dict() - for d, v in in_data.iteritems(): + for d, v in in_data.items(): if not self._use_arrow_dtype or xdf is cudf: data[d] = [v.explode().drop_duplicates().to_list()] else: @@ -103,9 +103,7 @@ def post(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ if isinstance(in_data, xdf.Series): return in_data.explode().nunique(dropna=self._dropna) else: - in_data_iter = ( - in_data.iteritems() if self._axis == 0 else in_data.iterrows() - ) + in_data_iter = in_data.items() if self._axis == 0 else in_data.iterrows() data = dict() for d, v in in_data_iter: if isinstance(v.dtype, ArrowListDtype): diff --git a/mars/dataframe/reduction/tests/test_reduction_execution.py b/mars/dataframe/reduction/tests/test_reduction_execution.py index 7e7d2c4459..2416d57970 100644 --- a/mars/dataframe/reduction/tests/test_reduction_execution.py +++ b/mars/dataframe/reduction/tests/test_reduction_execution.py @@ -36,6 +36,7 @@ cp = lazy_import("cupy", rename="cp") _agg_size_as_series = pd_release_version >= (1, 3) _support_kw_agg = pd_release_version >= (1, 1) +_drop_level_reduction = pd_release_version >= (2, 0) @pytest.fixture @@ -119,6 +120,9 @@ def compute(data, **kwargs): np.testing.assert_equal(r.execute().fetch(), compute(data)) +@pytest.mark.skipif( + _drop_level_reduction, reason="Level reduction not supported for pandas>=2.0" +) @pytest.mark.parametrize("func_name,func_opts", reduction_functions) def test_series_level_reduction(setup, func_name, func_opts: FunctionOptions): def compute(data, **kwargs): @@ -162,6 +166,9 @@ def compute(data, **kwargs): ) +@pytest.mark.skipif( + _drop_level_reduction, reason="Level reduction not supported for pandas>=2.0" +) @pytest.mark.parametrize("func_name,func_opts", reduction_functions) def test_dataframe_reduction( setup, check_ref_counts, func_name, func_opts: FunctionOptions @@ -255,6 +262,9 @@ def compute(data, **kwargs): ) +@pytest.mark.skipif( + _drop_level_reduction, reason="Level reduction not supported for pandas>=2.0" +) @pytest.mark.parametrize("func_name,func_opts", reduction_functions) def test_dataframe_level_reduction( setup, check_ref_counts, func_name, func_opts: FunctionOptions @@ -403,6 +413,9 @@ def compute(data, **kwargs): assert r.execute().fetch() is True +@pytest.mark.skipif( + _drop_level_reduction, reason="Level reduction not supported for pandas>=2.0" +) @pytest.mark.parametrize("func_name", bool_reduction_functions) def test_series_bool_level_reduction(setup, check_ref_counts, func_name): def compute(data, **kwargs): @@ -510,6 +523,9 @@ def compute(data, **kwargs): ) +@pytest.mark.skipif( + _drop_level_reduction, reason="Level reduction not supported for pandas>=2.0" +) @pytest.mark.parametrize("func_name", bool_reduction_functions) def test_dataframe_bool_level_reduction(setup, check_ref_counts, func_name): def compute(data, **kwargs): @@ -685,18 +701,20 @@ def test_mode(setup, check_ref_counts): config_kw = { "extra_config": { "check_shape": False, + "check_dtypes": False, + "check_columns_value": False, "check_index_value": False, } } data1 = pd.Series(np.random.randint(0, 5, size=(20,))) series = md.Series(data1) - result = series.mode().execute().fetch() + result = series.mode().execute(**config_kw).fetch(**config_kw) expected = data1.mode() pd.testing.assert_series_equal(result, expected) series = md.Series(data1, chunk_size=6) - result = series.mode().execute().fetch() + result = series.mode().execute(**config_kw).fetch(**config_kw) expected = data1.mode() pd.testing.assert_series_equal(result, expected) @@ -705,7 +723,7 @@ def test_mode(setup, check_ref_counts): data2[[2, 9, 18]] = np.nan series = md.Series(data2) - result = series.mode().execute().fetch() + result = series.mode().execute(**config_kw).fetch(**config_kw) expected = data2.mode() pd.testing.assert_series_equal(result, expected) @@ -720,7 +738,7 @@ def test_mode(setup, check_ref_counts): columns=["c" + str(i) for i in range(20)], ) df = md.DataFrame(data1) - result = df.mode().execute().fetch() + result = df.mode().execute(**config_kw).fetch(**config_kw) expected = data1.mode() pd.testing.assert_frame_equal(result, expected) @@ -730,7 +748,7 @@ def test_mode(setup, check_ref_counts): pd.testing.assert_frame_equal(result, expected) df = md.DataFrame(data1) - result = df.mode(axis=1).execute().fetch() + result = df.mode(axis=1).execute(**config_kw).fetch(**config_kw) expected = data1.mode(axis=1) pd.testing.assert_frame_equal(result, expected) @@ -744,7 +762,7 @@ def test_mode(setup, check_ref_counts): data2.iloc[[2, 9, 18], [2, 9, 18]] = np.nan df = md.DataFrame(data2) - result = df.mode().execute().fetch() + result = df.mode().execute(**config_kw).fetch(**config_kw) expected = data2.mode() pd.testing.assert_frame_equal(result, expected) @@ -1008,7 +1026,10 @@ def test_dataframe_aggregate(setup, check_ref_counts): mean_9=NamedAgg(9, "mean"), ) result = df.agg(**agg_kw) - pd.testing.assert_frame_equal(result.execute().fetch(), data.agg(**agg_kw)) + pd.testing.assert_frame_equal( + result.execute().fetch(extra_config={"check_shape": False}), + data.agg(**agg_kw), + ) def test_series_aggregate(setup, check_ref_counts): diff --git a/mars/deploy/oscar/session.py b/mars/deploy/oscar/session.py index 218bdce7ad..b5b0abfa8e 100644 --- a/mars/deploy/oscar/session.py +++ b/mars/deploy/oscar/session.py @@ -13,6 +13,7 @@ # limitations under the License. import asyncio +import copy import itertools import logging import json @@ -355,6 +356,7 @@ async def _run_in_background( async def execute(self, *tileables, **kwargs) -> ExecutionInfo: if self._closed: raise RuntimeError("Session closed already") + kwargs = copy.deepcopy(kwargs) fuse_enabled: bool = kwargs.pop("fuse_enabled", None) extra_config: dict = kwargs.pop("extra_config", None) warn_duplicated_execution: bool = kwargs.pop("warn_duplicated_execution", False) diff --git a/mars/deploy/oscar/tests/session.py b/mars/deploy/oscar/tests/session.py index a050d0003f..e46280f2a4 100644 --- a/mars/deploy/oscar/tests/session.py +++ b/mars/deploy/oscar/tests/session.py @@ -13,6 +13,7 @@ # limitations under the License. import asyncio +import copy import inspect import os import uuid @@ -62,6 +63,7 @@ def _process_result(self, tileable, result): return super()._process_result(tileable, result) async def fetch(self, *tileables, **kwargs): + kwargs = copy.deepcopy(kwargs) extra_config = kwargs.pop("extra_config", dict()) if kwargs: unexpected_keys = ", ".join(list(kwargs.keys())) diff --git a/mars/tests/test_session.py b/mars/tests/test_session.py index e4792aaea0..9fe0fa759e 100644 --- a/mars/tests/test_session.py +++ b/mars/tests/test_session.py @@ -301,7 +301,7 @@ def test_iter(setup): raw_data = pd.DataFrame(np.random.randint(1000, size=(20, 10))) df = md.DataFrame(raw_data, chunk_size=5) - for col, series in df.iteritems(): + for col, series in df.items(): pd.testing.assert_series_equal(series.execute().fetch(), raw_data[col]) for i, batch in enumerate(df.iterbatch(batch_size=15)): @@ -331,9 +331,7 @@ def test_iter(setup): pd.testing.assert_series_equal(batch, raw_data.iloc[i * 15 : (i + 1) * 15]) i = 0 - for result_item, expect_item in zip( - s.iteritems(batch_size=15), raw_data.iteritems() - ): + for result_item, expect_item in zip(s.items(batch_size=15), raw_data.items()): assert result_item[0] == expect_item[0] assert result_item[1] == expect_item[1] i += 1