Skip to content

Commit 6fb807d

Browse files
committed
Fix binning by datetime.
1 parent 36a3769 commit 6fb807d

File tree

2 files changed

+30
-11
lines changed

2 files changed

+30
-11
lines changed

flox/core.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1566,7 +1566,7 @@ def groupby_reduce(
15661566
if kwargs["fill_value"] is None:
15671567
kwargs["fill_value"] = agg.fill_value[agg.name]
15681568

1569-
partial_agg = partial(dask_groupby_agg, agg=agg, split_out=split_out, **kwargs)
1569+
partial_agg = partial(dask_groupby_agg, split_out=split_out, **kwargs)
15701570

15711571
if method in ["split-reduce", "cohorts"]:
15721572
cohorts = find_group_cohorts(
@@ -1585,15 +1585,14 @@ def groupby_reduce(
15851585
array_subset = np.take(array_subset, idxr, axis=ax)
15861586
numblocks = np.prod([len(array_subset.chunks[ax]) for ax in axis])
15871587

1588-
# First deep copy becasue we might be doping blockwise,
1589-
# which sets agg.finalize=None, then map-reduce (GH102)
1590-
agg = copy.deepcopy(agg)
1591-
15921588
# get final result for these groups
15931589
r, *g = partial_agg(
15941590
array_subset,
15951591
by[np.ix_(*indexer)],
15961592
expected_groups=pd.Index(cohort),
1593+
# First deep copy becasue we might be doping blockwise,
1594+
# which sets agg.finalize=None, then map-reduce (GH102)
1595+
agg=copy.deepcopy(agg),
15971596
# reindex to expected_groups at the blockwise step.
15981597
# this approach avoids replacing non-cohort members with
15991598
# np.nan or some other sentinel value, and preserves dtypes
@@ -1619,6 +1618,7 @@ def groupby_reduce(
16191618
array,
16201619
by,
16211620
expected_groups=None if method == "blockwise" else expected_groups,
1621+
agg=agg,
16221622
reindex=reindex,
16231623
method=method,
16241624
sort=sort,

tests/test_core.py

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -856,16 +856,35 @@ def test_map_reduce_blockwise_mixed():
856856

857857

858858
@requires_dask
859-
@pytest.mark.parametrize("method", ["blockwise", "split-reduce", "map-reduce", "cohorts"])
859+
@pytest.mark.parametrize("method", ["split-reduce", "blockwise", "map-reduce", "cohorts"])
860860
def test_group_by_datetime(engine, method):
861-
t = pd.date_range("2000-01-01", "2000-12-31", freq="D").to_series()
862-
data = t.dt.dayofyear
863-
actual, _ = groupby_reduce(
864-
dask.array.from_array(data.values, chunks=365),
865-
t,
861+
kwargs = dict(
866862
func="mean",
867863
method=method,
868864
engine=engine,
869865
)
866+
t = pd.date_range("2000-01-01", "2000-12-31", freq="D").to_series()
867+
data = t.dt.dayofyear
868+
daskarray = dask.array.from_array(data.values, chunks=30)
869+
870+
actual, _ = groupby_reduce(daskarray, t, **kwargs)
870871
expected = data.to_numpy().astype(float)
871872
assert_equal(expected, actual)
873+
874+
if method == "blockwise":
875+
return None
876+
877+
edges = pd.date_range("1999-12-31", "2000-12-31", freq="M").to_series().to_numpy()
878+
actual, _ = groupby_reduce(daskarray, t.to_numpy(), isbin=True, expected_groups=edges, **kwargs)
879+
expected = data.resample("M").mean().to_numpy()
880+
assert_equal(expected, actual)
881+
882+
actual, _ = groupby_reduce(
883+
np.broadcast_to(daskarray, (2, 3, daskarray.shape[-1])),
884+
t.to_numpy(),
885+
isbin=True,
886+
expected_groups=edges,
887+
**kwargs,
888+
)
889+
expected = np.broadcast_to(expected, (2, 3, expected.shape[-1]))
890+
assert_equal(expected, actual)

0 commit comments

Comments
 (0)