diff --git a/src/nested_dask/core.py b/src/nested_dask/core.py index bce5bc1..d09a3d9 100644 --- a/src/nested_dask/core.py +++ b/src/nested_dask/core.py @@ -11,6 +11,7 @@ import pandas as pd import pyarrow as pa from dask.dataframe.dask_expr._collection import new_collection +from dask.dataframe.dask_expr._expr import no_default as dsk_no_default from nested_pandas.series.dtype import NestedDtype from nested_pandas.series.packer import pack, pack_flat, pack_lists from pandas._libs import lib @@ -731,7 +732,7 @@ def sort_values( meta=self._meta, ) - def reduce(self, func, *args, meta=None, **kwargs) -> NestedFrame: + def reduce(self, func, *args, meta=dsk_no_default, infer_nesting=True, **kwargs) -> NestedFrame: """ Takes a function and applies it to each top-level row of the NestedFrame. @@ -751,7 +752,15 @@ def reduce(self, func, *args, meta=None, **kwargs) -> NestedFrame: Positional arguments to pass to the function, the first *args should be the names of the columns to apply the function to. meta : dataframe or series-like, optional - The dask meta of the output. + The dask meta of the output. If not provided, dask will try to + infer the metadata. This may lead to unexpected results, so + providing meta is recommended. + infer_nesting : bool, default True + If True, the function will pack output columns into nested + structures based on column names adhering to a nested naming + scheme. E.g. "nested.b" and "nested.c" will be packed into a column + called "nested" with columns "b" and "c". If False, all outputs + will be returned as base columns. kwargs : keyword arguments, optional Keyword arguments to pass to the function. @@ -773,6 +782,26 @@ def reduce(self, func, *args, meta=None, **kwargs) -> NestedFrame: >>> '''reduce will return a NestedFrame with two columns''' >>> return {"sum_col1": sum(col1), "sum_col2": sum(col2)} + When using nesting inference (infer_nesting=True), the output may + contain nested columns. In such cases, the meta should be provided with + the appropriate dtype for these columns. For example, the following + function, which produces a nested column "lc": + + >>> def complex_output(flux): + >>> return {"max_flux": np.max(flux), + >>> "lc.flux_quantiles": np.quantile(flux, [0.1, 0.2, 0.3, 0.4, 0.5]), + >>> "lc.labels": [0.1, 0.2, 0.3, 0.4, 0.5]} + + Would require the following meta: + + >>> # create a NestedDtype for the nested column "lc" + >>> from nested_pandas.series.dtype import NestedDtype + >>> lc_dtype = NestedDtype(pa.struct([pa.field("flux_quantiles", pa.list_(pa.float64())), + >>> pa.field("labels", pa.list_(pa.float64()))])) + >>> # use the lc_dtype in meta creation + >>> result_meta = npd.NestedFrame({'max_flux':pd.Series([], dtype='float'), + >>> 'lc':pd.Series([], dtype=lc_dtype)}) + """ # Handle meta shorthands to produce nestedframe output @@ -787,7 +816,9 @@ def reduce(self, func, *args, meta=None, **kwargs) -> NestedFrame: # apply nested_pandas reduce via map_partitions # wrap the partition in a npd.NestedFrame call for: # https://github.com/lincc-frameworks/nested-dask/issues/21 - return self.map_partitions(lambda x: npd.NestedFrame(x).reduce(func, *args, **kwargs), meta=meta) + return self.map_partitions( + lambda x: npd.NestedFrame(x).reduce(func, *args, infer_nesting=infer_nesting, **kwargs), meta=meta + ) def to_parquet(self, path, by_layer=True, **kwargs) -> None: """Creates parquet file(s) with the data of a NestedFrame, either diff --git a/tests/nested_dask/test_nestedframe.py b/tests/nested_dask/test_nestedframe.py index 5c25284..2d64a5d 100644 --- a/tests/nested_dask/test_nestedframe.py +++ b/tests/nested_dask/test_nestedframe.py @@ -363,6 +363,43 @@ def mean_arr(arr): # type: ignore assert isinstance(reduced.compute(), npd.NestedFrame) +def test_reduce_output_inference(): + """test the extension of the reduce result nesting inference""" + + ndd = generate_data(20, 20, npartitions=2, seed=1) + + def complex_output(flux): + return { + "max_flux": np.max(flux), + "lc.flux_quantiles": np.quantile(flux, [0.1, 0.2, 0.3, 0.4, 0.5]), + "lc.labels": [0.1, 0.2, 0.3, 0.4, 0.5], + "meta.colors": ["green", "red", "blue"], + } + + # this sucks + result_meta = npd.NestedFrame( + { + "max_flux": pd.Series([], dtype="float"), + "lc": pd.Series( + [], + dtype=NestedDtype( + pa.struct( + [ + pa.field("flux_quantiles", pa.list_(pa.float64())), + pa.field("labels", pa.list_(pa.float64())), + ] + ) + ), + ), + "meta": pd.Series([], dtype=NestedDtype(pa.struct([pa.field("colors", pa.list_(pa.string()))]))), + } + ) + result = ndd.reduce(complex_output, "nested.flux", infer_nesting=True, meta=result_meta) + + assert list(result.dtypes) == list(result.compute().dtypes) + assert list(result.columns) == list(result.compute().columns) + + def test_to_parquet_combined(test_dataset, tmp_path): """test to_parquet when saving all layers to a single directory"""