|
26 | 26 | from segger.prediction.boundary import generate_boundaries |
27 | 27 |
|
28 | 28 | from scipy.sparse.csgraph import connected_components as cc |
29 | | -from typing import Union, Dict |
| 29 | +from typing import Union, Dict, Tuple |
30 | 30 | import dask.dataframe as dd |
31 | 31 | from dask import delayed |
32 | 32 | from dask.diagnostics import ProgressBar |
@@ -287,14 +287,19 @@ def sparse_multiply(embeddings, edge_index, shape) -> coo_matrix: |
287 | 287 | # shape = batch[from_type].x.shape[0], batch[to_type].x.shape[0] |
288 | 288 | indices = torch.argwhere(edge_index != -1).T |
289 | 289 | indices[1] = edge_index[edge_index != -1] |
290 | | - rows = cp.fromDlpack(to_dlpack(indices[0, :].to("cuda"))) |
291 | | - columns = cp.fromDlpack(to_dlpack(indices[1, :].to("cuda"))) |
| 290 | + indices_gpu = indices.to("cuda") # Keep reference |
| 291 | + rows = cp.fromDlpack(to_dlpack(indices_gpu[0, :])) |
| 292 | + columns = cp.fromDlpack(to_dlpack(indices_gpu[1, :])) |
| 293 | + del indices_gpu # Delete only after CuPy arrays exist |
| 294 | + stream = cp.cuda.get_current_stream() |
| 295 | + stream.synchronize() # <-- ADD THIS |
292 | 296 | # print(rows) |
293 | 297 | del indices |
294 | 298 | values = similarity[edge_index != -1].flatten() |
295 | 299 | sparse_result = coo_matrix( |
296 | 300 | (cp.fromDlpack(to_dlpack(values)), (rows, columns)), shape=shape |
297 | 301 | ) |
| 302 | + stream.synchronize() |
298 | 303 | return sparse_result |
299 | 304 | # Free GPU memory after computation |
300 | 305 |
|
@@ -364,21 +369,23 @@ def _get_id(): |
364 | 369 | # Convert sparse matrix to dense format (on GPU) |
365 | 370 | dense_scores = scores.toarray() # Convert to dense NumPy array |
366 | 371 | del scores # Remove from memory |
367 | | - cp.get_default_memory_pool().free_all_blocks() # Free CuPy memory |
| 372 | + cp.cuda.Stream.null.synchronize() |
| 373 | + # cp.get_default_memory_pool().free_all_blocks() # Free CuPy memory |
368 | 374 |
|
369 | 375 | # Step 2: Maximize score and assign transcripts based on score threshold |
370 | 376 | belongs = cp.max(dense_scores, axis=1) # Max score per transcript |
371 | 377 | assignments["score"] = cp.asnumpy(belongs) # Move back to CPU |
372 | 378 |
|
373 | 379 | mask = assignments["score"] >= score_cut # Mask for assigned transcripts |
| 380 | + cp.cuda.Stream.null.synchronize() |
374 | 381 | all_ids = np.concatenate(batch["bd"].id) # Boundary IDs as NumPy array |
375 | 382 | assignments["segger_cell_id"] = np.where( |
376 | 383 | mask, all_ids[cp.argmax(dense_scores, axis=1).get()], None |
377 | 384 | ) |
378 | 385 |
|
379 | 386 | # Clear memory after score processing |
380 | 387 | del dense_scores |
381 | | - cp.get_default_memory_pool().free_all_blocks() # Free CuPy memory |
| 388 | + # cp.get_default_memory_pool().free_all_blocks() # Free CuPy memory |
382 | 389 | torch.cuda.empty_cache() |
383 | 390 |
|
384 | 391 | assignments["bound"] = np.where( |
@@ -470,7 +477,7 @@ def _get_id(): |
470 | 477 | delayed_write_output_ddf.persist() # Schedule writing |
471 | 478 |
|
472 | 479 | # Free memory after computation |
473 | | - cp.get_default_memory_pool().free_all_blocks() # Free CuPy memory |
| 480 | + # cp.get_default_memory_pool().free_all_blocks() # Free CuPy memory |
474 | 481 | torch.cuda.empty_cache() |
475 | 482 |
|
476 | 483 |
|
|
0 commit comments