1414
1515import zarr
1616from anndata import AnnData
17+ from dask ._task_spec import Task
1718from dask .array import Array as DaskArray
1819from dask .dataframe import DataFrame as DaskDataFrame
1920from geopandas import GeoDataFrame
@@ -301,6 +302,19 @@ def _get_backing_files(element: DaskArray | DaskDataFrame) -> list[str]:
301302 return files
302303
303304
305+ def _find_piece_dict (obj : dict [str , tuple [str | None ]] | Task ) -> dict [str , tuple [str | None | None ]] | None :
306+ """Recursively search for dict containing the key 'piece' in Dask task specs containing the parquet file path."""
307+ if isinstance (obj , dict ):
308+ if "piece" in obj :
309+ return obj
310+ elif hasattr (obj , "args" ): # Handles dask._task_spec.* objects like Task and List
311+ for v in obj .args :
312+ result = _find_piece_dict (v )
313+ if result is not None :
314+ return result
315+ return None
316+
317+
304318def _search_for_backing_files_recursively (subgraph : Any , files : list [str ]) -> None :
305319 # see the types allowed for the dask graph here: https://docs.dask.org/en/stable/spec.html
306320
@@ -327,25 +341,31 @@ def _search_for_backing_files_recursively(subgraph: Any, files: list[str]) -> No
327341 path = getattr (v .store , "path" , None ) if getattr (v .store , "path" , None ) else v .store .root
328342 files .append (str (UPath (path ).resolve ()))
329343 elif name .startswith ("read-parquet" ) or name .startswith ("read_parquet" ):
330- if hasattr (v , "creation_info" ):
331- # https://github.com/dask/dask/blob/ff2488aec44d641696e0b7aa41ed9e995c710705/dask/dataframe/io/parquet/core.py#L625
332- t = v .creation_info ["args" ]
333- if not isinstance (t , tuple ) or len (t ) != 1 :
334- raise ValueError (
335- f"Unable to parse the parquet file from the dask subgraph { subgraph } . Please "
336- f"report this bug."
337- )
338- parquet_file = t [0 ]
339- files .append (str (UPath (parquet_file ).resolve ()))
340- elif isinstance (v , tuple ) and len (v ) > 1 and isinstance (v [1 ], dict ) and "piece" in v [1 ]:
344+ # Here v is a read_parquet task with arguments and the only value is a dictionary.
345+ if "piece" in v .args [0 ]:
341346 # https://github.com/dask/dask/blob/ff2488aec44d641696e0b7aa41ed9e995c710705/dask/dataframe/io/parquet/core.py#L870
342- parquet_file , check0 , check1 = v [ 1 ]["piece" ]
347+ parquet_file , check0 , check1 = v . args [ 0 ]["piece" ]
343348 if not parquet_file .endswith (".parquet" ) or check0 is not None or check1 is not None :
344349 raise ValueError (
345350 f"Unable to parse the parquet file from the dask subgraph { subgraph } . Please "
346351 f"report this bug."
347352 )
348353 files .append (os .path .realpath (parquet_file ))
354+ else :
355+ # This occurs when for example points and images are mixed, the main task still starts with
356+ # read_parquet, but the execution happens through a subgraph which we iterate over to get the
357+ # actual read_parquet task.
358+ for task in v .args [0 ].values ():
359+ # Recursively go through tasks, this is required because differences between dask versions.
360+ piece_dict = _find_piece_dict (task )
361+ if isinstance (piece_dict , dict ) and "piece" in piece_dict :
362+ parquet_file , check0 , check1 = piece_dict ["piece" ] # type: ignore[misc]
363+ if not parquet_file .endswith (".parquet" ) or check0 is not None or check1 is not None :
364+ raise ValueError (
365+ f"Unable to parse the parquet file from the dask subgraph { subgraph } . Please "
366+ f"report this bug."
367+ )
368+ files .append (os .path .realpath (parquet_file ))
349369
350370
351371def _backed_elements_contained_in_path (path : Path , object : SpatialData | SpatialElement | AnnData ) -> list [bool ]:
0 commit comments