@@ -46,35 +46,25 @@ def _safe_len(df: Any) -> int:
4646 Monitor: https://github.com/rapidsai/dask-cuda/issues and https://github.com/rapidsai/cudf/issues
4747 for fixes to groupby aggregation errors on empty DataFrames.
4848 """
49- # Check type module without importing dask_cudf (dask imports are slow)
5049 type_module = type (df ).__module__
5150 if 'dask_cudf' in type_module :
5251 try :
53- # Only import if we're reasonably sure it's a dask_cudf DataFrame
5452 import dask_cudf
5553 if isinstance (df , dask_cudf .DataFrame ):
56- # Use map_partitions to get length of each partition, then sum
57- # This avoids the problematic groupby aggregations that fail on lazy operations
5854 try :
59- # map_partitions(len) returns scalar per partition, forming a Series
60- # meta should be pd.Series with appropriate dtype, not bare int
6155 partition_lengths = df .map_partitions (len , meta = pd .Series ([], dtype = 'int64' ))
6256 total_length = partition_lengths .sum ().compute ()
6357 return int (total_length )
6458 except Exception as e :
6559 logger .warning ("Could not compute length for dask_cudf DataFrame via map_partitions: %s" , e )
66- # Fallback: try direct compute (may fail on empty DataFrames with lazy ops)
6760 return len (df .compute ())
6861 except ImportError as e :
69- # Unexpected: module name contains 'dask_cudf' but can't import - raise it
7062 logger .error ("DataFrame type from dask_cudf module but import failed: %s" , e )
7163 raise
7264 except AttributeError as e :
73- # Unexpected: imported dask_cudf but isinstance/attribute access failed
7465 logger .error ("Imported dask_cudf but attribute error occurred: %s" , e )
7566 raise
7667
77- # For all other DataFrame types, use standard len()
7868 return len (df )
7969
8070
@@ -171,39 +161,31 @@ def materialize_nodes(
171161
172162 g : Plottable = self
173163
174- # Handle cross-engine coercion when engine is explicitly set
175- # Use module string checks to avoid importing cudf when not installed
176164 if engine != EngineAbstract .AUTO :
177165 engine_val = Engine (engine .value )
178166 if engine_val == Engine .CUDF :
179- # Coerce pandas to cuDF (only if it's actually pandas, not dask/etc)
180167 if g ._nodes is not None and isinstance (g ._nodes , pd .DataFrame ):
181168 import cudf
182169 g = g .nodes (cudf .DataFrame .from_pandas (g ._nodes ), g ._node )
183170 if g ._edges is not None and isinstance (g ._edges , pd .DataFrame ):
184171 import cudf
185172 g = g .edges (cudf .DataFrame .from_pandas (g ._edges ), g ._source , g ._destination , edge = g ._edge )
186173 elif engine_val == Engine .PANDAS :
187- # Coerce cuDF to pandas (only if it's actually cudf, not dask_cudf/etc)
188174 if g ._nodes is not None and 'cudf' in type (g ._nodes ).__module__ and 'dask' not in type (g ._nodes ).__module__ :
189175 g = g .nodes (g ._nodes .to_pandas (), g ._node )
190176 if g ._edges is not None and 'cudf' in type (g ._edges ).__module__ and 'dask' not in type (g ._edges ).__module__ :
191177 g = g .edges (g ._edges .to_pandas (), g ._source , g ._destination , edge = g ._edge )
192178
193- # Check reuse first - if we have nodes and reuse is True, just return
194179 if reuse :
195180 if g ._nodes is not None and _safe_len (g ._nodes ) > 0 :
196181 if g ._node is None :
197182 logger .warning (
198183 "Must set node id binding, not just nodes; set via .bind() or .nodes()"
199184 )
200- # raise ValueError('Must set node id binding, not just nodes; set via .bind() or .nodes()')
201185 else :
202186 return g
203187
204- # Only check for edges if we actually need to materialize
205188 if g ._edges is None :
206- # If no edges but we have nodes via reuse, that's OK
207189 if reuse and g ._nodes is not None and _safe_len (g ._nodes ) > 0 :
208190 return g
209191 raise ValueError ("Missing edges" )
@@ -213,7 +195,6 @@ def materialize_nodes(
213195 )
214196 if _safe_len (g ._edges ) == 0 :
215197 return g
216- # TODO use built-ins for igraph/nx/...
217198
218199 node_id = g ._node if g ._node is not None else "id"
219200 engine_concrete : Engine
@@ -242,8 +223,6 @@ def raiser(df: Any):
242223 else :
243224 engine_concrete = Engine (engine .value )
244225
245- # Use engine-specific concat for Series
246- # Note: Cross-engine coercion is handled at the start of this function
247226 concat_fn = df_concat (engine_concrete )
248227 concat_df = concat_fn ([g ._edges [g ._source ], g ._edges [g ._destination ]])
249228 nodes_df = concat_df .rename (node_id ).drop_duplicates ().to_frame ().reset_index (drop = True )
@@ -254,13 +233,9 @@ def get_indegrees(self, col: str = "degree_in"):
254233 g = self
255234 g_nodes = g .materialize_nodes ()
256235
257- # Handle empty edges case - skip groupby for dask_cudf compatibility
258- # When edges are empty, all nodes have in-degree of 0
259236 if _safe_len (g ._edges ) == 0 :
260237 if col not in g_nodes ._nodes .columns :
261- # Use assign() for engine compatibility (pandas, cudf, dask, dask_cudf)
262238 nodes_df = g_nodes ._nodes .assign (** {col : 0 })
263- # Convert to int32 to match normal degree column dtype
264239 nodes_df = nodes_df .assign (** {col : nodes_df [col ].astype ("int32" )})
265240 else :
266241 nodes_df = g_nodes ._nodes .copy ()
@@ -274,7 +249,6 @@ def get_indegrees(self, col: str = "degree_in"):
274249 .rename (columns = {g ._source : col , g ._destination : g_nodes ._node })
275250 )
276251
277- # Use safe_merge for engine type coercion
278252 nodes_subset = g_nodes ._nodes [
279253 [c for c in g_nodes ._nodes .columns if c != col ]
280254 ]
@@ -359,7 +333,6 @@ def keep_nodes(self, nodes):
359333 """
360334 g = self .materialize_nodes ()
361335
362- #convert to Dict[Str, Union[Series, List-like]]
363336 if isinstance (nodes , dict ):
364337 pass
365338 elif isinstance (nodes , np .ndarray ) or isinstance (nodes , list ):
@@ -373,41 +346,28 @@ def keep_nodes(self, nodes):
373346 nodes = {g ._node : nodes .to_numpy ()}
374347 else :
375348 raise ValueError ('Unexpected nodes type: {}' .format (type (nodes )))
376- #convert to Dict[Str, List-like]
377- #print('nodes mid', nodes)
378349 nodes = {
379350 k : v if isinstance (v , np .ndarray ) or isinstance (v , list ) else v .to_numpy ()
380351 for k , v in nodes .items ()
381352 }
382353
383- #print('self nodes', g._nodes)
384- #print('pre nodes', nodes)
385- #print('keys', list(nodes.keys()))
386354 hits = g ._nodes [list (nodes .keys ())].isin (nodes )
387- #print('hits', hits)
388355 hits_s = hits [g ._node ]
389356 for c in hits .columns :
390357 if c != g ._node :
391358 hits_s = hits_s & hits [c ]
392- #print('hits_s', hits_s)
393359 new_nodes = g ._nodes [hits_s ]
394- #print(new_nodes)
395360 new_node_ids = new_nodes [g ._node ].to_numpy ()
396- #print('new_node_ids', new_node_ids)
397- #print('new node_ids', type(new_node_ids), len(g._nodes), '->', len(new_node_ids))
398361 new_edges_hits_df = (
399362 g ._edges [[g ._source , g ._destination ]]
400363 .isin ({
401364 g ._source : new_node_ids ,
402365 g ._destination : new_node_ids
403366 })
404367 )
405- #print('new_edges_hits_df', new_edges_hits_df)
406368 new_edges = g ._edges [
407369 new_edges_hits_df [g ._source ] & new_edges_hits_df [g ._destination ]
408370 ]
409- #print('new_edges', new_edges)
410- #print('new edges', len(g._edges), '->', len(new_edges))
411371 return g .nodes (new_nodes ).edges (new_edges )
412372
413373 def get_topological_levels (
@@ -456,7 +416,6 @@ def get_topological_levels(
456416 raise ValueError (
457417 "Cyclic graph in get_topological_levels(); remove cycles or set allow_cycles=True"
458418 )
459- # tie break by picking biggest node
460419 max_degree = g2 ._nodes ["degree" ].max ()
461420 roots = g2 ._nodes [g2 ._nodes ["degree" ] == max_degree ][:1 ]
462421 if warn_cycles :
@@ -479,7 +438,6 @@ def get_topological_levels(
479438 g2 = g2 .drop_nodes (roots [g2 ._node ])
480439 nodes_df0 = nodes_with_levels [0 ]
481440 if len (nodes_with_levels ) > 1 :
482- # Use engine-aware concat for cuDF/pandas compatibility
483441 engine = resolve_engine (EngineAbstract .AUTO , nodes_df0 )
484442 concat_fn = df_concat (engine )
485443 nodes_df = concat_fn ([nodes_df0 ] + nodes_with_levels [1 :])
@@ -489,8 +447,6 @@ def get_topological_levels(
489447 if self ._nodes is None :
490448 return self .nodes (nodes_df )
491449 else :
492- # use orig cols, esp. in case collisions like degree
493- # Use safe_merge for engine type coercion
494450 levels_df = nodes_df [[g2_base ._node , level_col ]]
495451 out_df = safe_merge (g2_base ._nodes , levels_df , on = g2_base ._node , how = 'left' )
496452 return self .nodes (out_df )
@@ -523,7 +479,6 @@ def collapse(
523479 :returns:A new Graphistry instance with nodes and edges DataFrame containing collapsed nodes and edges given by column attribute -- nodes and edges DataFrames contain six new columns `collapse_{node | edges}` and `final_{node | edges}`, while original (node, src, dst) columns are left untouched
524480 :rtype: Plottable
525481 """
526- # TODO FIXME CHECK SELF LOOPS?
527482 return collapse_by (
528483 self ,
529484 start_node = node ,
@@ -561,17 +516,7 @@ def chain(self, *args, **kwargs):
561516 stacklevel = 2
562517 )
563518 return chain_base (self , * args , ** kwargs )
564- # Preserve original docstring after deprecation notice
565519 chain .__doc__ = (chain .__doc__ or "" ) + "\n \n " + (chain_base .__doc__ or "" )
566-
567- # chain_let removed from public API - use gfql() instead
568- # (chain_let_base still available internally for gfql dispatch)
569-
570- # Commented out to remove from public API - use gfql() instead
571- # def chain_let(self, *args, **kwargs):
572- # """Execute a DAG of named graph operations with dependency resolution."""
573- # return chain_let_base(self, *args, **kwargs)
574- # chain_let.__doc__ = chain_let_base.__doc__
575520
576521 def gfql (self , * args , ** kwargs ):
577522 return gfql_base (self , * args , ** kwargs )
@@ -589,7 +534,6 @@ def chain_remote(self, *args, **kwargs) -> Plottable:
589534 stacklevel = 2
590535 )
591536 return chain_remote_base (self , * args , ** kwargs )
592- # Preserve original docstring after deprecation notice
593537 chain_remote .__doc__ = (chain_remote .__doc__ or "" ) + "\n \n " + (chain_remote_base .__doc__ or "" )
594538
595539 def chain_remote_shape (self , * args , ** kwargs ) -> pd .DataFrame :
@@ -604,7 +548,6 @@ def chain_remote_shape(self, *args, **kwargs) -> pd.DataFrame:
604548 stacklevel = 2
605549 )
606550 return chain_remote_shape_base (self , * args , ** kwargs )
607- # Preserve original docstring after deprecation notice
608551 chain_remote_shape .__doc__ = (chain_remote_shape .__doc__ or "" ) + "\n \n " + (chain_remote_shape_base .__doc__ or "" )
609552
610553 def gfql_remote (
0 commit comments