diff --git a/mars/dataframe/groupby/aggregation.py b/mars/dataframe/groupby/aggregation.py index 7d593814c5..f9ee600ed7 100644 --- a/mars/dataframe/groupby/aggregation.py +++ b/mars/dataframe/groupby/aggregation.py @@ -25,7 +25,7 @@ from ... import opcodes as OperandDef from ...config import options from ...core.custom_log import redirect_custom_log -from ...core import ENTITY_TYPE, OutputType +from ...core import ENTITY_TYPE, OutputType, recursive_tile from ...core.context import get_context from ...core.operand import OperandStage from ...serialization.serializables import ( @@ -51,6 +51,7 @@ ReductionCompiler, ReductionSteps, ReductionAggStep, + CustomReduction, ) from ..reduction.aggregation import is_funcs_aggregate, normalize_reduction_funcs from ..utils import parse_index, build_concatenated_rows_frame, is_cudf @@ -63,6 +64,8 @@ _support_get_group_without_as_index = pd_release_version[:2] > (1, 0) +_FUNCS_PREFER_SHUFFLE = {"nunique"} + class SizeRecorder: def __init__(self): @@ -94,6 +97,7 @@ def get(self): "skew": lambda x, bias=False: x.skew(bias=bias), "kurt": lambda x, bias=False: x.kurt(bias=bias), "kurtosis": lambda x, bias=False: x.kurtosis(bias=bias), + "nunique": lambda x: x.nunique(), } _series_col_name = "col_name" @@ -161,6 +165,8 @@ class DataFrameGroupByAgg(DataFrameOperand, DataFrameOperandMixin): method = StringField("method") use_inf_as_na = BoolField("use_inf_as_na") + map_on_shuffle = AnyField("map_on_shuffle") + # for chunk combine_size = Int32Field("combine_size") chunk_store_limit = Int64Field("chunk_store_limit") @@ -419,10 +425,29 @@ def _tile_with_shuffle( in_df: TileableType, out_df: TileableType, func_infos: ReductionSteps, + agg_chunks: List[ChunkType] = None, ): - # First, perform groupby and aggregation on each chunk. - agg_chunks = cls._gen_map_chunks(op, in_df.chunks, out_df, func_infos) - return cls._perform_shuffle(op, agg_chunks, in_df, out_df, func_infos) + if op.map_on_shuffle is None: + op.map_on_shuffle = all( + agg_fun.custom_reduction is None for agg_fun in func_infos.agg_funcs + ) + + if not op.map_on_shuffle: + groupby_params = op.groupby_params.copy() + selection = groupby_params.pop("selection", None) + groupby = in_df.groupby(**groupby_params) + if selection: + groupby = groupby[selection] + result = groupby.transform( + op.raw_func, _call_agg=True, index=out_df.index_value + ) + return (yield from recursive_tile(result)) + else: + # First, perform groupby and aggregation on each chunk. + agg_chunks = agg_chunks or cls._gen_map_chunks( + op, in_df.chunks, out_df, func_infos + ) + return cls._perform_shuffle(op, agg_chunks, in_df, out_df, func_infos) @classmethod def _perform_shuffle( @@ -622,8 +647,10 @@ def _tile_auto( else: # otherwise, use shuffle logger.debug("Choose shuffle method for groupby operand %s", op) - return cls._perform_shuffle( - op, chunks + left_chunks, in_df, out_df, func_infos + return ( + yield from cls._tile_with_shuffle( + op, in_df, out_df, func_infos, chunks + left_chunks + ) ) @classmethod @@ -636,12 +663,16 @@ def tile(cls, op: "DataFrameGroupByAgg"): func_infos = cls._compile_funcs(op, in_df) if op.method == "auto": - if len(in_df.chunks) <= op.combine_size: + if set(op.func) & _FUNCS_PREFER_SHUFFLE: + return ( + yield from cls._tile_with_shuffle(op, in_df, out_df, func_infos) + ) + elif len(in_df.chunks) <= op.combine_size: return cls._tile_with_tree(op, in_df, out_df, func_infos) else: return (yield from cls._tile_auto(op, in_df, out_df, func_infos)) if op.method == "shuffle": - return cls._tile_with_shuffle(op, in_df, out_df, func_infos) + return (yield from cls._tile_with_shuffle(op, in_df, out_df, func_infos)) elif op.method == "tree": return cls._tile_with_tree(op, in_df, out_df, func_infos) else: # pragma: no cover @@ -694,39 +725,66 @@ def _pack_inputs(agg_funcs: List[ReductionAggStep], in_data): return out_dict @staticmethod - def _do_custom_agg(op, custom_reduction, *input_objs): + def _do_custom_agg_single(op, custom_reduction: CustomReduction, input_obj): + if op.stage == OperandStage.map: + if custom_reduction.pre_with_agg: + apply_fun = custom_reduction.pre + else: + + def apply_fun(obj): + return custom_reduction.agg(custom_reduction.pre(obj)) + + elif op.stage == OperandStage.agg: + if custom_reduction.post_with_agg: + apply_fun = custom_reduction.post + else: + + def apply_fun(obj): + return custom_reduction.post(custom_reduction.agg(obj)) + + else: + apply_fun = custom_reduction.agg + + res = input_obj.apply(apply_fun) + return (res,) + + @staticmethod + def _do_custom_agg_multiple(op, custom_reduction: CustomReduction, *input_objs): xdf = cudf if op.gpu else pd results = [] out = op.outputs[0] for group_key in input_objs[0].groups.keys(): group_objs = [o.get_group(group_key) for o in input_objs] + agg_done = False if op.stage == OperandStage.map: - result = custom_reduction.pre(group_objs[0]) + res_tuple = custom_reduction.pre(group_objs[0]) agg_done = custom_reduction.pre_with_agg - if not isinstance(result, tuple): - result = (result,) + if not isinstance(res_tuple, tuple): + res_tuple = (res_tuple,) else: - result = group_objs + res_tuple = group_objs if not agg_done: - result = custom_reduction.agg(*result) - if not isinstance(result, tuple): - result = (result,) + res_tuple = custom_reduction.agg(*res_tuple) + if not isinstance(res_tuple, tuple): + res_tuple = (res_tuple,) if op.stage == OperandStage.agg: - result = custom_reduction.post(*result) - if not isinstance(result, tuple): - result = (result,) - - if out.ndim == 2: - result = tuple(r.to_frame().T for r in result) - if op.stage == OperandStage.agg: - result = tuple(r.astype(out.dtypes) for r in result) - else: - result = tuple(xdf.Series(r) for r in result) + res_tuple = custom_reduction.post(*res_tuple) + if not isinstance(res_tuple, tuple): + res_tuple = (res_tuple,) + + new_res_list = [] + for r in res_tuple: + if out.ndim == 2 and r.ndim == 1: + r = r.to_frame().T + elif out.ndim < 2: + if getattr(r, "ndim", 0) == 2: + r = r.iloc[0, :] + else: + r = xdf.Series(r) - for r in result: if len(input_objs[0].grouper.names) == 1: r.index = xdf.Index( [group_key], name=input_objs[0].grouper.names[0] @@ -735,7 +793,21 @@ def _do_custom_agg(op, custom_reduction, *input_objs): r.index = xdf.MultiIndex.from_tuples( [group_key], names=input_objs[0].grouper.names ) - results.append(result) + + if op.groupby_params.get("selection"): + # correct columns for groupby-selection-agg paradigms + selection = op.groupby_params["selection"] + r.columns = [selection] if input_objs[0].ndim == 1 else selection + + if out.ndim == 2 and op.stage == OperandStage.agg: + dtype_cols = set(out.dtypes.index) & set(r.columns) + conv_dtypes = { + k: v for k, v in out.dtypes.items() if k in dtype_cols + } + r = r.astype(conv_dtypes) + new_res_list.append(r) + + results.append(tuple(new_res_list)) if not results and op.stage == OperandStage.agg: empty_df = pd.DataFrame( [], columns=out.dtypes.index, index=out.index_value.to_pandas()[:0] @@ -745,6 +817,13 @@ def _do_custom_agg(op, custom_reduction, *input_objs): concat_result = tuple(xdf.concat(parts) for parts in zip(*results)) return concat_result + @classmethod + def _do_custom_agg(cls, op, custom_reduction, *input_objs, output_limit: int = 1): + if output_limit == 1: + return cls._do_custom_agg_single(op, custom_reduction, input_objs[0]) + else: + return cls._do_custom_agg_multiple(op, custom_reduction, *input_objs) + @staticmethod def _do_predefined_agg(input_obj, agg_func, single_func=False, **kwds): ndim = getattr(input_obj, "ndim", None) or input_obj.obj.ndim @@ -839,12 +918,16 @@ def _wrapped_func(col): _agg_func_name, custom_reduction, _output_key, - _output_limit, + output_limit, kwds, ) in op.agg_funcs: input_obj = ret_map_groupbys[input_key] if map_func_name == "custom_reduction": - agg_dfs.extend(cls._do_custom_agg(op, custom_reduction, input_obj)) + agg_dfs.extend( + cls._do_custom_agg( + op, custom_reduction, input_obj, output_limit=output_limit + ) + ) else: single_func = map_func_name == op.raw_func agg_dfs.append( @@ -885,12 +968,16 @@ def _execute_combine(cls, ctx, op: "DataFrameGroupByAgg"): agg_func_name, custom_reduction, output_key, - _output_limit, + output_limit, kwds, ) in op.agg_funcs: input_obj = in_data_dict[output_key] if agg_func_name == "custom_reduction": - combines.extend(cls._do_custom_agg(op, custom_reduction, *input_obj)) + combines.extend( + cls._do_custom_agg( + op, custom_reduction, *input_obj, output_limit=output_limit + ) + ) else: combines.append( cls._do_predefined_agg(input_obj, agg_func_name, **kwds) @@ -925,7 +1012,7 @@ def _execute_agg(cls, ctx, op: "DataFrameGroupByAgg"): agg_func_name, custom_reduction, output_key, - _output_limit, + output_limit, kwds, ) in op.agg_funcs: if agg_func_name == "custom_reduction": @@ -933,7 +1020,7 @@ def _execute_agg(cls, ctx, op: "DataFrameGroupByAgg"): cls._get_grouped(op, o, ctx) for o in in_data_dict[output_key] ) in_data_dict[output_key] = cls._do_custom_agg( - op, custom_reduction, *input_obj + op, custom_reduction, *input_obj, output_limit=output_limit )[0] else: input_obj = cls._get_grouped(op, in_data_dict[output_key], ctx) @@ -1017,7 +1104,15 @@ def execute(cls, ctx, op: "DataFrameGroupByAgg"): pd.reset_option("mode.use_inf_as_na") -def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs): +def agg( + groupby, + func=None, + method="auto", + combine_size=None, + map_on_shuffle=None, + *args, + **kwargs, +): """ Aggregate using one or more operations on grouped data. @@ -1033,7 +1128,11 @@ def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs): in distributed mode and use 'tree' in local mode. combine_size : int The number of chunks to combine when method is 'tree' - + map_on_shuffle : bool + When not specified, will decide whether to perform aggregation on the + map stage of shuffle (currently no aggregation when there is custom + reduction in functions). Otherwise, whether to call map on map stage + of shuffle is determined by the value. Returns ------- @@ -1080,5 +1179,6 @@ def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs): combine_size=combine_size or options.combine_size, chunk_store_limit=options.chunk_store_limit, use_inf_as_na=use_inf_as_na, + map_on_shuffle=map_on_shuffle, ) return agg_op(groupby) diff --git a/mars/dataframe/groupby/tests/test_groupby_execution.py b/mars/dataframe/groupby/tests/test_groupby_execution.py index c1208e2194..2cace472b9 100644 --- a/mars/dataframe/groupby/tests/test_groupby_execution.py +++ b/mars/dataframe/groupby/tests/test_groupby_execution.py @@ -1241,13 +1241,16 @@ def test_groupby_nunique(setup): # test with as_index=False mdf = md.DataFrame(df1, chunk_size=13) if _agg_size_as_frame: + res = mdf.groupby("b", as_index=False)["a"].nunique().execute().fetch() + expected = df1.groupby("b", as_index=False)["a"].nunique() pd.testing.assert_frame_equal( - mdf.groupby("b", as_index=False)["a"] - .nunique() - .execute() - .fetch() - .sort_values(by="b", ignore_index=True), - df1.groupby("b", as_index=False)["a"] - .nunique() - .sort_values(by="b", ignore_index=True), + res.sort_values(by="b", ignore_index=True), + expected.sort_values(by="b", ignore_index=True), + ) + + res = mdf.groupby("b", as_index=False)[["a", "c"]].nunique().execute().fetch() + expected = df1.groupby("b", as_index=False)[["a", "c"]].nunique() + pd.testing.assert_frame_equal( + res.sort_values(by="b", ignore_index=True), + expected.sort_values(by="b", ignore_index=True), ) diff --git a/mars/dataframe/merge/concat.py b/mars/dataframe/merge/concat.py index 7bb3cab721..e82d986b61 100644 --- a/mars/dataframe/merge/concat.py +++ b/mars/dataframe/merge/concat.py @@ -324,7 +324,10 @@ def _auto_concat_dataframe_chunks(chunk, inputs): ) if chunk.op.axis is not None: - return xdf.concat(inputs, axis=op.axis) + try: + return xdf.concat(inputs, axis=op.axis) + except: + raise # auto generated concat when executing a DataFrame if len(inputs) == 1: diff --git a/mars/dataframe/reduction/aggregation.py b/mars/dataframe/reduction/aggregation.py index 6945ecec9a..4248dcdcd9 100644 --- a/mars/dataframe/reduction/aggregation.py +++ b/mars/dataframe/reduction/aggregation.py @@ -78,6 +78,7 @@ def where_function(cond, var1, var2): "skew": lambda x, skipna=True, bias=False: x.skew(skipna=skipna, bias=bias), "kurt": lambda x, skipna=True, bias=False: x.kurt(skipna=skipna, bias=bias), "kurtosis": lambda x, skipna=True, bias=False: x.kurtosis(skipna=skipna, bias=bias), + "nunique": lambda x: x.nunique(), } diff --git a/mars/dataframe/reduction/core.py b/mars/dataframe/reduction/core.py index 14d4050038..f668b71fda 100644 --- a/mars/dataframe/reduction/core.py +++ b/mars/dataframe/reduction/core.py @@ -655,6 +655,8 @@ class CustomReduction: # set to True when pre() already performs aggregation pre_with_agg = False + # set to True when post() already performs aggregation + post_with_agg = False def __init__(self, name=None, is_gpu=None): self.name = name or "" @@ -972,13 +974,15 @@ def _compile_function(self, func, func_name=None, ndim=1) -> ReductionSteps: else: map_func_name, agg_func_name = step_func_name, step_func_name + op_custom_reduction = getattr(t.op, "custom_reduction", None) + # build agg description agg_funcs.append( ReductionAggStep( agg_input_key, map_func_name, agg_func_name, - custom_reduction, + op_custom_reduction or custom_reduction, t.key, output_limit, t.op.get_reduction_args(axis=self._axis), diff --git a/mars/dataframe/reduction/nunique.py b/mars/dataframe/reduction/nunique.py index 07d6abf243..3ff1854e56 100644 --- a/mars/dataframe/reduction/nunique.py +++ b/mars/dataframe/reduction/nunique.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import numpy as np import pandas as pd try: @@ -24,93 +25,110 @@ from ...config import options from ...serialization.serializables import BoolField from ...utils import lazy_import -from ..arrays import ArrowListArray, ArrowListDtype +from ..arrays import ArrowListArray from .core import DataFrameReductionOperand, DataFrameReductionMixin, CustomReduction +cp = lazy_import("cupy", globals=globals(), rename="cp") cudf = lazy_import("cudf", globals=globals()) class NuniqueReduction(CustomReduction): pre_with_agg = True + post_with_agg = True def __init__( - self, name="unique", axis=0, dropna=True, use_arrow_dtype=False, is_gpu=False + self, name="nunique", axis=0, dropna=True, use_arrow_dtype=False, is_gpu=False ): super().__init__(name, is_gpu=is_gpu) self._axis = axis self._dropna = dropna self._use_arrow_dtype = use_arrow_dtype - @staticmethod - def _drop_duplicates_to_arrow(v, explode=False): + def _get_modules(self): + if not self.is_gpu(): + return np, pd + else: # pragma: no cover + return cp, cudf + + def _drop_duplicates(self, value, explode=False, agg=False): + xp, xdf = self._get_modules() + use_arrow_dtype = self._use_arrow_dtype and xp is not cp + if self._use_arrow_dtype and xp is not cp and hasattr(value, "to_numpy"): + value = value.to_numpy() + else: + value = value.values + if explode: - v = v.explode() - try: - return ArrowListArray([v.drop_duplicates().to_numpy()]) - except pa.ArrowInvalid: - # fallback due to diverse dtypes - return [v.drop_duplicates().to_list()] + if len(value) == 0: + if not use_arrow_dtype: + return [xp.array([], dtype=object)] + else: + return [ArrowListArray([])] + value = xp.concatenate(value) + + value = xdf.unique(value) + + if not agg: + if not use_arrow_dtype: + return [value] + else: + try: + return ArrowListArray([value]) + except pa.ArrowInvalid: + # fallback due to diverse dtypes + return [value] + else: + if self._dropna: + return xp.sum(xdf.notna(value)) + return len(value) def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ - xdf = cudf if self.is_gpu() else pd + xp, xdf = self._get_modules() + out_dtype = object if not self._use_arrow_dtype or xp is cp else None if isinstance(in_data, xdf.Series): - unique_values = in_data.drop_duplicates() - return xdf.Series(unique_values, name=in_data.name) + unique_values = self._drop_duplicates(in_data) + return xdf.Series(unique_values, name=in_data.name, dtype=out_dtype) else: if self._axis == 0: data = dict() for d, v in in_data.iteritems(): - if not self._use_arrow_dtype or xdf is cudf: - data[d] = [v.drop_duplicates().to_list()] - else: - data[d] = self._drop_duplicates_to_arrow(v) - df = xdf.DataFrame(data) + data[d] = self._drop_duplicates(v) + df = xdf.DataFrame(data, copy=False, dtype=out_dtype) else: df = xdf.DataFrame(columns=[0]) for d, v in in_data.iterrows(): - if not self._use_arrow_dtype or xdf is cudf: - df.loc[d] = [v.drop_duplicates().to_list()] - else: - df.loc[d] = self._drop_duplicates_to_arrow(v) + df.loc[d] = self._drop_duplicates(v) return df def agg(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ - xdf = cudf if self.is_gpu() else pd + xp, xdf = self._get_modules() + out_dtype = object if not self._use_arrow_dtype or xp is cp else None if isinstance(in_data, xdf.Series): - unique_values = in_data.explode().drop_duplicates() - return xdf.Series(unique_values, name=in_data.name) + unique_values = self._drop_duplicates(in_data, explode=True) + return xdf.Series(unique_values, name=in_data.name, dtype=out_dtype) else: if self._axis == 0: data = dict() for d, v in in_data.iteritems(): - if not self._use_arrow_dtype or xdf is cudf: - data[d] = [v.explode().drop_duplicates().to_list()] - else: - v = pd.Series(v.to_numpy()) - data[d] = self._drop_duplicates_to_arrow(v, explode=True) - df = xdf.DataFrame(data) + data[d] = self._drop_duplicates(v, explode=True) + df = xdf.DataFrame(data, copy=False, dtype=out_dtype) else: df = xdf.DataFrame(columns=[0]) for d, v in in_data.iterrows(): - if not self._use_arrow_dtype or xdf is cudf: - df.loc[d] = [v.explode().drop_duplicates().to_list()] - else: - df.loc[d] = self._drop_duplicates_to_arrow(v, explode=True) + df.loc[d] = self._drop_duplicates(v, explode=True) return df def post(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ - xdf = cudf if self.is_gpu() else pd + xp, xdf = self._get_modules() if isinstance(in_data, xdf.Series): - return in_data.explode().nunique(dropna=self._dropna) + return self._drop_duplicates(in_data, explode=True, agg=True) else: in_data_iter = ( in_data.iteritems() if self._axis == 0 else in_data.iterrows() ) data = dict() for d, v in in_data_iter: - if isinstance(v.dtype, ArrowListDtype): - v = xdf.Series(v.to_numpy()) - data[d] = v.explode().nunique(dropna=self._dropna) + data[d] = self._drop_duplicates(v, explode=True, agg=True) return xdf.Series(data) diff --git a/mars/dataframe/reduction/tests/test_reduction_execution.py b/mars/dataframe/reduction/tests/test_reduction_execution.py index 81d4a88e27..be6ca29815 100644 --- a/mars/dataframe/reduction/tests/test_reduction_execution.py +++ b/mars/dataframe/reduction/tests/test_reduction_execution.py @@ -671,6 +671,12 @@ def test_nunique(setup, check_ref_counts): expected = data1.nunique(axis=1) pd.testing.assert_series_equal(result, expected) + # test with agg func + df = md.DataFrame(data1, chunk_size=3) + result = df.agg("nunique").execute().fetch() + expected = data1.agg("nunique") + pd.testing.assert_series_equal(result, expected) + @pytest.mark.skipif(pa is None, reason="pyarrow not installed") def test_use_arrow_dtype_nunique(setup, check_ref_counts):