Skip to content

Task fusion fails #331

@hvasbath

Description

@hvasbath

Dear all,

thank you for developing and maintaining dask_geopandas.
I have a minimum working example below (please copy paste) using synthetic data.

import geopandas as gpd
import dask_geopandas as dgpd
import dask.dataframe as dd
import pandas as pd
import numpy as np
from shapely.geometry import Point, box
from pyproj import CRS

def generate_and_aggregate_geodata(n_points=5000, grid_spacing=100):
    """
    Generate synthetic point data and perform spatial aggregation on a 100x100m grid.
    Returns a GeoDataFrame with mean values and counts per grid cell.
    """

    # Use a projected CRS for accurate distance (e.g., UTM zone for Berlin)
    crs_proj = CRS.from_epsg(32633)  # UTM zone 33N (meters)
    crs_latlon = CRS.from_epsg(4326)

    # 1. Create random point data in lat/lon and project it
    np.random.seed(42)
    lon_min, lon_max = 13.0, 13.1
    lat_min, lat_max = 52.4, 52.5

    lons = np.random.uniform(lon_min, lon_max, n_points)
    lats = np.random.uniform(lat_min, lat_max, n_points)
    values = np.random.normal(loc=20, scale=5, size=n_points)  # e.g., temperature

    df = pd.DataFrame({
        "value": values,
        "geometry": [Point(lon, lat) for lon, lat in zip(lons, lats)]
    })
    gdf = gpd.GeoDataFrame(df, geometry="geometry", crs=crs_latlon).to_crs(crs_proj)
    dgdf = dgpd.from_geopandas(gdf, npartitions=4)

    # 2. Create 100x100m grid over same bounding box
    bounds = dgdf.total_bounds  # minx, miny, maxx, maxy
    minx, miny, maxx, maxy = bounds

    x_coords = np.arange(minx, maxx, grid_spacing)
    y_coords = np.arange(miny, maxy, grid_spacing)

    grid_polygons = []
    for x in x_coords:
        for y in y_coords:
            cell = box(x, y, x + grid_spacing, y + grid_spacing)
            grid_polygons.append(cell)

    grid_df = gpd.GeoDataFrame(geometry=grid_polygons, crs=crs_proj)
    grid_df["grid_id"] = grid_df.index
    dgrid = dgpd.from_geopandas(grid_df, npartitions=2)  # small grid, 1 partition is fine
    dgrid = dgrid.set_index("grid_id")

    # 3. Spatial join and align to assign each point to a grid cell, keeping empty cells
    joined = dgdf.sjoin(dgrid, predicate="intersects")
    joined_idx = joined.set_index("grid_id")

    aligned, _ = joined_idx.align(dgrid, join="outer") 

    # 4. Group by grid cell and aggregate
    aggregated = aligned.groupby("grid_id").agg({"value": "mean"}).rename(columns={"value": "mean_value"})
    counts = aligned.groupby("grid_id").size()
    nan_mask = aggregated.mean_value.notnull()

    masked_counts = (counts * nan_mask).rename("count")
    #masked_counts.compute()          # just works fine
    #masked_counts.to_dask_array().compute()    # fails
    
    # 5. Join aggregates back to grid to preserve empty cells
    result = dgrid.merge(aggregated, on="grid_id", how="left")
    result = result.merge(masked_counts.to_frame(), on="grid_id", how="left")
    return result

# Example usage:
if __name__ == "__main__":
    result_gdf = generate_and_aggregate_geodata()
    print(result_gdf.compute())  # just works
    result_gdf.to_csv("aggregated_data.csv")  # fails

This fails with:

  File "/homefs/home/hannes_ldap/Software/leif/examples/sandbox.py", line 78, in <module>
    result_gdf.to_csv("aggregated_data.csv")  # fails
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/homefs/home/hannes_ldap/Software/leif/venv/lib/python3.11/site-packages/dask/dataframe/dask_expr/_collection.py", line 2437, in to_csv
    return to_csv(self, filename, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/homefs/home/hannes_ldap/Software/leif/venv/lib/python3.11/site-packages/dask/dataframe/io/csv.py", line 912, in to_csv
    dfs = df.to_delayed()
          ^^^^^^^^^^^^^^^
  File "/homefs/home/hannes_ldap/Software/leif/venv/lib/python3.11/site-packages/dask/dataframe/dask_expr/_collection.py", line 2512, in to_delayed
    graph = frame.__dask_graph__()
            ^^^^^^^^^^^^^^^^^^^^^^
  File "/homefs/home/hannes_ldap/Software/leif/venv/lib/python3.11/site-packages/dask/dataframe/dask_expr/_collection.py", line 543, in __dask_graph__
    return out.__dask_graph__()
           ^^^^^^^^^^^^^^^^^^^^
  File "/homefs/home/hannes_ldap/Software/leif/venv/lib/python3.11/site-packages/dask/_expr.py", line 578, in __dask_graph__
    layers.append(expr._layer())
                  ^^^^^^^^^^^^^
  File "/homefs/home/hannes_ldap/Software/leif/venv/lib/python3.11/site-packages/dask/_expr.py", line 291, in _layer
    return {
           ^
  File "/homefs/home/hannes_ldap/Software/leif/venv/lib/python3.11/site-packages/dask/_expr.py", line 292, in <dictcomp>
    (self._name, i): self._task((self._name, i), i)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/homefs/home/hannes_ldap/Software/leif/venv/lib/python3.11/site-packages/dask/dataframe/dask_expr/_expr.py", line 3801, in _task
    t = _expr._task(subname, subname[1])
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/homefs/home/hannes_ldap/Software/leif/venv/lib/python3.11/site-packages/dask/dataframe/dask_expr/_expr.py", line 3805, in _task
    return Task.fuse(*internal_tasks, key=name)  # type: ignore
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/homefs/home/hannes_ldap/Software/leif/venv/lib/python3.11/site-packages/dask/_task_spec.py", line 479, in fuse
    raise ValueError(f"Cannot fuse tasks with multiple outputs {leafs}")
ValueError: Cannot fuse tasks with multiple outputs {('getitem-7390cbbd3960fccaef7766e92c4a2ee4', 0), ('align-0735ebd8d27504c9a98729cfff7e2ff4', 0)}

Please note: this also fails with the same error, when calling compute on masked_counts.to_dask_array() (see inline in the example above).

The involved packages in my venv:

pandas==2.2.2
dask==2025.5.1
dask-expr==2.0.0
dask-geopandas==0.5.0
shapely==2.1.1
pyproj==3.6.1

If someone can give me directions of how to investigate this- I am happy todo so. Your help and comments are much appreciated!

P.S.: For a similar operation I also get always these failing task leafs:
{('set_geometry-a3bece751b6ae9af0d978bd25a007ae1', 0), ('to_crs-8e71f1c5205ec6601372a7d0e781a251', 0), ('to_crs-74966ca826756ec0cbf419f4f2fad7c3', 0), ('centroid-2aebd5ea4ac13099f27a27ae14dd0584', 0), ('set_geometry-60e6f9a0428aa8311886a768bfbfde13', 0)}
Just not able yet to provide a minimum working example. But may be related?!

P.S.S.:
To parquet just works...

result_gdf.to_parquet("aggregated_data.pq")

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions