Skip to content
1 change: 1 addition & 0 deletions doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ Groupby/resample/rolling
- Bug in :meth:`.DataFrameGroupBy.median` where nat values gave an incorrect result. (:issue:`57926`)
- Bug in :meth:`.DataFrameGroupBy.quantile` when ``interpolation="nearest"`` is inconsistent with :meth:`DataFrame.quantile` (:issue:`47942`)
- Bug in :meth:`.Resampler.interpolate` on a :class:`DataFrame` with non-uniform sampling and/or indices not aligning with the resulting resampled index would result in wrong interpolation (:issue:`21351`)
- Bug in :meth:`.Series.rolling` when used with a :class:`.BaseIndexer` subclass and computing min/max (:issue:`46726`)
- Bug in :meth:`DataFrame.ewm` and :meth:`Series.ewm` when passed ``times`` and aggregation functions other than mean (:issue:`51695`)
- Bug in :meth:`DataFrame.resample` and :meth:`Series.resample` were not keeping the index name when the index had :class:`ArrowDtype` timestamp dtype (:issue:`61222`)
- Bug in :meth:`DataFrame.resample` changing index type to :class:`MultiIndex` when the dataframe is empty and using an upsample method (:issue:`55572`)
Expand Down
198 changes: 115 additions & 83 deletions pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ from libc.math cimport (
sqrt,
)
from libcpp.deque cimport deque
from libcpp.stack cimport stack
from libcpp.unordered_map cimport unordered_map

from pandas._libs.algos cimport TiebreakEnumType
Expand Down Expand Up @@ -988,39 +989,29 @@ def roll_median_c(const float64_t[:] values, ndarray[int64_t] start,

# ----------------------------------------------------------------------

# Moving maximum / minimum code taken from Bottleneck
# Licence at LICENSES/BOTTLENECK_LICENCE


cdef float64_t init_mm(float64_t ai, Py_ssize_t *nobs, bint is_max) noexcept nogil:

if ai == ai:
nobs[0] = nobs[0] + 1
elif is_max:
ai = MINfloat64
else:
ai = MAXfloat64

return ai


cdef void remove_mm(float64_t aold, Py_ssize_t *nobs) noexcept nogil:
""" remove a value from the mm calc """
if aold == aold:
nobs[0] = nobs[0] - 1


cdef float64_t calc_mm(int64_t minp, Py_ssize_t nobs,
float64_t value) noexcept nogil:
cdef:
float64_t result
cdef int64_t bisect_left(
deque[int64_t]& a,
int64_t x,
int64_t lo=0,
int64_t hi=-1
) nogil:
"""Same as https://docs.python.org/3/library/bisect.html."""

cdef int64_t mid
if hi == -1:
hi = a.size()
while lo < hi:
mid = (lo + hi) // 2
if a.at(mid) < x:
lo = mid + 1
else:
hi = mid
return lo

if nobs >= minp:
result = value
else:
result = NaN
from libc.math cimport isnan

return result
# Prior version of moving maximum / minimum code taken from Bottleneck
# Licence at LICENSES/BOTTLENECK_LICENCE


def roll_max(ndarray[float64_t] values, ndarray[int64_t] start,
Expand Down Expand Up @@ -1068,69 +1059,110 @@ def roll_min(ndarray[float64_t] values, ndarray[int64_t] start,
return _roll_min_max(values, start, end, minp, is_max=0)


cdef _roll_min_max(ndarray[float64_t] values,
ndarray[int64_t] starti,
ndarray[int64_t] endi,
int64_t minp,
bint is_max):
def _roll_min_max(
ndarray[float64_t] values,
ndarray[int64_t] start,
ndarray[int64_t] end,
int64_t minp,
bint is_max
):
cdef:
float64_t ai
int64_t curr_win_size, start
Py_ssize_t i, k, nobs = 0, N = len(starti)
deque Q[int64_t] # min/max always the front
deque W[int64_t] # track the whole window for nobs compute
Py_ssize_t i, i_next, k, valid_start, last_end, last_start, N = len(start)
# Indices of bounded extrema in `values`. `candidates[i]` is always increasing.
# `values[candidates[i]]` is decreasing for max and increasing for min.
deque candidates[int64_t]
# Indices of largest windows that "cover" preceding windows.
stack dominators[int64_t]
ndarray[float64_t, ndim=1] output

Py_ssize_t this_start, this_end, stash_start
int64_t q_idx

output = np.empty(N, dtype=np.float64)
Q = deque[int64_t]()
W = deque[int64_t]()
candidates = deque[int64_t]()
dominators = stack[int64_t]()

# This function was "ported" / translated from sliding_min_max()
# in /pandas/core/_numba/kernels/min_max_.py.
# (See there for credits and some comments.)
# Code translation assumptions/rules:
# - min_periods --> minp
# - deque[0] --> front()
# - deque[-1] --> back()
# - stack[-1] --> top()
# - bool(stack/deque) --> !empty()
# - deque.append() --> push_back()
# - stack.append() --> push()
# - deque.popleft --> pop_front()
# - deque.pop() --> pop_back()

with nogil:
if minp < 1:
minp = 1

if N>2:
i_next = N - 1
for i in range(N - 2, -1, -1):
if start[i_next] < start[i] \
and (
dominators.empty()
or start[dominators.top()] > start[i_next]
):
dominators.push(i_next)
i_next = i

# NaN tracking to guarantee minp
valid_start = -minp

last_end = 0
last_start = -1

# This is using a modified version of the C++ code in this
# SO post: https://stackoverflow.com/a/12239580
# The original impl didn't deal with variable window sizes
# So the code was optimized for that

# first window's size
curr_win_size = endi[0] - starti[0]
# GH 32865
# Anchor output index to values index to provide custom
# BaseIndexer support
for i in range(N):
this_start = start[i]
this_end = end[i]

curr_win_size = endi[i] - starti[i]
if i == 0:
start = starti[i]
else:
start = endi[i - 1]

for k in range(start, endi[i]):
ai = init_mm(values[k], &nobs, is_max)
# Discard previous entries if we find new min or max
if is_max:
while not Q.empty() and ((ai >= values[Q.back()]) or
values[Q.back()] != values[Q.back()]):
Q.pop_back()
else:
while not Q.empty() and ((ai <= values[Q.back()]) or
values[Q.back()] != values[Q.back()]):
Q.pop_back()
Q.push_back(k)
W.push_back(k)

# Discard entries outside and left of current window
while not Q.empty() and Q.front() <= starti[i] - 1:
Q.pop_front()
while not W.empty() and W.front() <= starti[i] - 1:
remove_mm(values[W.front()], &nobs)
W.pop_front()

# Save output based on index in input value array
if not Q.empty() and curr_win_size > 0:
output[i] = calc_mm(minp, nobs, values[Q.front()])
if (not dominators.empty() and dominators.top() == i):
dominators.pop()

if not (this_end > last_end
or (this_end == last_end and this_start >= last_start)):
raise ValueError(
"Start/End ordering requirement is violated at index {}".format(i))

if dominators.empty():
stash_start = this_start
else:
stash_start = min(this_start, start[dominators.top()])

while not candidates.empty() and candidates.front() < stash_start:
candidates.pop_front()

for k in range(last_end, this_end):
if not isnan(values[k]):
valid_start += 1
while valid_start >= 0 and isnan(values[valid_start]):
valid_start += 1

if is_max:
while (not candidates.empty()
and values[k] >= values[candidates.back()]):
candidates.pop_back()
else:
while (not candidates.empty()
and values[k] <= values[candidates.back()]):
candidates.pop_back()
candidates.push_back(k)

if candidates.empty() or this_start > valid_start:
output[i] = NaN
elif candidates.front() >= this_start:
# ^^ This is here to avoid costly bisection for fixed window sizes.
output[i] = values[candidates.front()]
else:
q_idx = bisect_left(candidates, this_start, lo=1)
output[i] = values[candidates[q_idx]]
last_end = this_end
last_start = this_start

return output

Expand Down
Loading
Loading