Skip to content
118 changes: 114 additions & 4 deletions pandas/core/groupby/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,61 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs)

agg = aggregate

def _agg_for_resample(
self, func=None, *args, engine=None, engine_kwargs=None, **kwargs
):
relabeling = func is None
columns = None
if relabeling:
columns, func = validate_func_kwargs(kwargs)
kwargs = {}

if isinstance(func, str):
if maybe_use_numba(engine) and engine is not None:
# Not all agg functions support numba, only propagate numba kwargs
# if user asks for numba, and engine is not None
# (if engine is None, the called function will handle the case where
# numba is requested via the global option)
kwargs["engine"] = engine
if engine_kwargs is not None:
kwargs["engine_kwargs"] = engine_kwargs
return getattr(self, func)(*args, **kwargs)

elif isinstance(func, abc.Iterable):
# Catch instances of lists / tuples
# but not the class list / tuple itself.
func = maybe_mangle_lambdas(func)
kwargs["engine"] = engine
kwargs["engine_kwargs"] = engine_kwargs
ret = self._aggregate_multiple_funcs(func, *args, **kwargs)
if relabeling:
# columns is not narrowed by mypy from relabeling flag
assert columns is not None # for mypy
ret.columns = columns
if not self.as_index:
ret = ret.reset_index()
return ret

else:
if maybe_use_numba(engine):
return self._aggregate_with_numba(
func, *args, engine_kwargs=engine_kwargs, **kwargs
)

if self.ngroups == 0:
# e.g. test_evaluate_with_empty_groups without any groups to
# iterate over, we have no output on which to do dtype
# inference. We default to using the existing dtype.
# xref GH#51445
obj = self._obj_with_exclusions
return self.obj._constructor(
[],
name=self.obj.name,
index=self._grouper.result_index,
dtype=obj.dtype,
)
return self._python_agg_general(func, *args, **kwargs)

def _python_agg_general(self, func, *args, **kwargs):
f = lambda x: func(x, *args, **kwargs)

Expand Down Expand Up @@ -1934,6 +1989,61 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs)
relabeling, func, columns, order = reconstruct_func(func, **kwargs)
func = maybe_mangle_lambdas(func)

if maybe_use_numba(engine):
# Not all agg functions support numba, only propagate numba kwargs
# if user asks for numba
kwargs["engine"] = engine
kwargs["engine_kwargs"] = engine_kwargs

op = GroupByApply(self, func, args=args, kwargs=kwargs)
result = op.agg()
if not is_dict_like(func) and result is not None:
# GH #52849
if not self.as_index and is_list_like(func):
return result.reset_index()
else:
return result
elif relabeling:
# this should be the only (non-raising) case with relabeling
# used reordered index of columns
result = cast(DataFrame, result)
result = result.iloc[:, order]
result = cast(DataFrame, result)
# error: Incompatible types in assignment (expression has type
# "Optional[List[str]]", variable has type
# "Union[Union[Union[ExtensionArray, ndarray[Any, Any]],
# Index, Series], Sequence[Any]]")
result.columns = columns # type: ignore[assignment]

if result is None:
# Remove the kwargs we inserted
# (already stored in engine, engine_kwargs arguments)
if "engine" in kwargs:
del kwargs["engine"]
del kwargs["engine_kwargs"]
# at this point func is not a str, list-like, dict-like,
# or a known callable(e.g. sum)
if maybe_use_numba(engine):
return self._aggregate_with_numba(
func, *args, engine_kwargs=engine_kwargs, **kwargs
)
# grouper specific aggregations
result = self._python_agg_general(func, *args, **kwargs)

if not self.as_index:
result = self._insert_inaxis_grouper(result)
result.index = default_index(len(result))

return result

agg = aggregate

def _agg_for_resample(
self, func=None, *args, engine=None, engine_kwargs=None, **kwargs
):
relabeling, func, columns, order = reconstruct_func(func, **kwargs)
func = maybe_mangle_lambdas(func)

if maybe_use_numba(engine):
# Not all agg functions support numba, only propagate numba kwargs
# if user asks for numba
Expand Down Expand Up @@ -2010,8 +2120,6 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs)

return result

agg = aggregate

def _python_agg_general(self, func, *args, **kwargs):
f = lambda x: func(x, *args, **kwargs)

Expand All @@ -2022,9 +2130,11 @@ def _python_agg_general(self, func, *args, **kwargs):

obj = self._obj_with_exclusions

if not len(obj.columns):
if self._grouper._is_resample and not len(obj.columns):
# e.g. test_margins_no_values_no_cols
return self._python_apply_general(f, self._selected_obj)
return obj._constructor(
index=self._grouper.result_index, columns=obj.columns
)

output: dict[int, ArrayLike] = {}
for idx, (name, ser) in enumerate(obj.items()):
Expand Down
13 changes: 8 additions & 5 deletions pandas/core/groupby/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ def __init__(
self._groupings = groupings
self._sort = sort
self.dropna = dropna
self._is_resample = False

@property
def groupings(self) -> list[grouper.Grouping]:
Expand Down Expand Up @@ -976,12 +977,14 @@ def _aggregate_series_pure_python(

for i, group in enumerate(splitter):
res = func(group)
res = extract_result(res)

if not initialized:
# We only do this validation on the first iteration
check_result_array(res, group.dtype)
initialized = True
if self._is_resample:
res = extract_result(res)

if not initialized:
# We only do this validation on the first iteration
check_result_array(res, group.dtype)
initialized = True

result[i] = res

Expand Down
61 changes: 60 additions & 1 deletion pandas/core/resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ def _get_binner(self):
binner, bins, binlabels = self._get_binner_for_time()
assert len(bins) == len(binlabels)
bin_grouper = BinGrouper(bins, binlabels, indexer=self._indexer)
bin_grouper._is_resample = True
return binner, bin_grouper

@overload
Expand Down Expand Up @@ -365,7 +366,21 @@ def aggregate(self, func=None, *args, **kwargs):
return result

agg = aggregate
apply = aggregate

@final
@doc(
_shared_docs["aggregate"],
see_also=_agg_see_also_doc,
examples=_agg_examples_doc,
klass="DataFrame",
axis="",
)
def apply(self, func=None, *args, **kwargs):
result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg()
if result is None:
how = func
result = self._groupby_and_apply(how, *args, **kwargs)
return result

@final
def transform(self, arg, *args, **kwargs):
Expand Down Expand Up @@ -488,6 +503,49 @@ def _groupby_and_aggregate(self, how, *args, **kwargs):

return self._wrap_result(result)

def _groupby_and_apply(self, how, *args, **kwargs):
"""
Re-evaluate the obj with a groupby aggregation.
"""
grouper = self._grouper

# Excludes `on` column when provided
obj = self._obj_with_exclusions

grouped = get_groupby(obj, by=None, grouper=grouper, group_keys=self.group_keys)

try:
if callable(how):
# TODO: test_resample_apply_with_additional_args fails if we go
# through the non-lambda path, not clear that it should.
func = lambda x: how(x, *args, **kwargs)
result = grouped._agg_for_resample(func)
else:
result = grouped._agg_for_resample(how, *args, **kwargs)
except (AttributeError, KeyError):
# we have a non-reducing function; try to evaluate
# alternatively we want to evaluate only a column of the input

# test_apply_to_one_column_of_df the function being applied references
# a DataFrame column, but aggregate_item_by_item operates column-wise
# on Series, raising AttributeError or KeyError
# (depending on whether the column lookup uses getattr/__getitem__)
result = grouped.apply(how, *args, **kwargs)

except ValueError as err:
if "Must produce aggregated value" in str(err):
# raised in _aggregate_named
# see test_apply_without_aggregation, test_apply_with_mutated_index
pass
else:
raise

# we have a non-reducing function
# try to evaluate
result = grouped.apply(how, *args, **kwargs)

return self._wrap_result(result)

@final
def _get_resampler_for_grouping(
self,
Expand Down Expand Up @@ -1763,6 +1821,7 @@ def func(x):
_upsample = _apply
_downsample = _apply
_groupby_and_aggregate = _apply
_groupby_and_apply = _apply

@final
def _gotitem(self, key, ndim, subset=None):
Expand Down
2 changes: 1 addition & 1 deletion pandas/core/reshape/pivot.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ def _generate_marginal_results_without_values(
dropna: bool = True,
):
margin_keys: list | Index
if len(cols) > 0:
if len(table.columns) > 0:
# need to "interleave" the margins
margin_keys = []

Expand Down
Loading
Loading