Skip to content

Commit db4b741

Browse files
authored
added pqdm version for multiple processes
1 parent fa1a925 commit db4b741

File tree

1 file changed

+169
-0
lines changed

1 file changed

+169
-0
lines changed

src/segger/validation/xenium_explorer.py

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,3 +469,172 @@ def generate_experiment_file(
469469

470470
with open(output_path, "w") as f:
471471
json.dump(experiment, f, indent=2)
472+
473+
474+
475+
476+
from pqdm.processes import pqdm # or from pqdm.processes import pqdm for process backend
477+
import os
478+
479+
def _process_one_cell(args):
480+
seg_cell_id, seg_cell, area_low, area_high = args
481+
482+
if len(seg_cell) < 5:
483+
return None
484+
485+
cell_convex_hull = generate_boundary(seg_cell)
486+
if cell_convex_hull is None or not isinstance(cell_convex_hull, Polygon):
487+
return None
488+
489+
if not (area_low <= cell_convex_hull.area <= area_high):
490+
return None
491+
492+
# Get original vertices and remove duplicate closing vertex if present
493+
cell_vertices = list(cell_convex_hull.exterior.coords)
494+
if cell_vertices[0] == cell_vertices[-1]:
495+
cell_vertices = cell_vertices[:-1]
496+
497+
n_vertices = len(cell_vertices)
498+
499+
# Sample up to 16 vertices
500+
if n_vertices > 16:
501+
# Evenly sample 16 vertices from original set
502+
indices = np.linspace(0, n_vertices-1, 16, dtype=int)
503+
sampled_vertices = [cell_vertices[i] for i in indices]
504+
else:
505+
sampled_vertices = cell_vertices
506+
507+
# Pad with first vertex if needed
508+
if len(sampled_vertices) < 16:
509+
sampled_vertices += [sampled_vertices[0]] * (16 - len(sampled_vertices))
510+
511+
return {
512+
"seg_cell_id": seg_cell_id,
513+
"cell_area": float(cell_convex_hull.area),
514+
"cell_vertices": sampled_vertices,
515+
"cell_num_vertices": len(sampled_vertices),
516+
}
517+
518+
519+
def seg2explorer_pqdm(
520+
seg_df: pd.DataFrame,
521+
source_path: str,
522+
output_dir: str,
523+
cells_filename: str = "seg_cells",
524+
analysis_filename: str = "seg_analysis",
525+
xenium_filename: str = "seg_experiment.xenium",
526+
analysis_df: Optional[pd.DataFrame] = None,
527+
draw: bool = False,
528+
cell_id_columns: str = "seg_cell_id",
529+
area_low: float = 10,
530+
area_high: float = 100,
531+
n_jobs: int = 1
532+
) -> None:
533+
source_path = Path(source_path)
534+
storage = Path(output_dir)
535+
storage.mkdir(parents=True, exist_ok=True)
536+
537+
grouped_by = seg_df.groupby(cell_id_columns)
538+
539+
# Build a lightweight iterable of work items (id, slice, thresholds)
540+
# NOTE: this will still materialize each group slice, but we avoid copying the whole DF per worker.
541+
work_iter = ((seg_cell_id, seg_cell, area_low, area_high) for seg_cell_id, seg_cell in grouped_by)
542+
543+
# Parallel map with threads (good default). Tune n_jobs.
544+
# n_jobs = min(32, os.cpu_count() or 8)
545+
results = pqdm(work_iter, _process_one_cell, n_jobs=n_jobs, desc="Cells", exception_behaviour="immediate")
546+
547+
# Collate results
548+
cell_id2old_id: Dict[int, Any] = {}
549+
cell_id: List[int] = []
550+
polygon_num_vertices: List[List[int]] = []
551+
polygon_vertices: List[List[Any]] = []
552+
553+
# We need a stable incremental id — use enumerate over kept results
554+
kept = [r for r in results if r is not None]
555+
for cell_incremental_id, r in enumerate(kept):
556+
uint_cell_id = cell_incremental_id + 1
557+
cell_id2old_id[uint_cell_id] = r["seg_cell_id"]
558+
cell_id.append(uint_cell_id)
559+
polygon_num_vertices.append(r["cell_num_vertices"])
560+
polygon_vertices.append(r["cell_vertices"])
561+
562+
# Flatten vertices exactly as before
563+
cell_polygon_vertices = get_flatten_version(polygon_vertices)
564+
565+
source_zarr_store = ZipStore(source_path / "cells.zarr.zip", mode="r") # added this line
566+
existing_store = zarr.open(source_zarr_store, mode="r")
567+
new_store = zarr.open(storage / f"{cells_filename}.zarr.zip", mode="w")
568+
569+
# Create polygon_sets group with the new structure
570+
polygon_group = new_store.create_group("polygon_sets")
571+
572+
# Process cell polygons (set 1)
573+
# cell_polygons = cells["polygon_vertices"][1] # Cell polygons are at index 1
574+
cell_num_vertices = polygon_num_vertices # Cell vertex counts
575+
576+
# Reshape cell polygons to (n_cells, 50) format
577+
n_cells = cell_polygon_vertices.shape[0]
578+
cell_vertices_flat = cell_polygon_vertices.reshape(n_cells, -1)[:, :33] # Take first 50 values
579+
580+
set1 = polygon_group.create_group("1")
581+
set1["cell_index"] = np.arange(1, n_cells + 1, dtype=np.uint32) # 1-based indexing
582+
set1["method"] = np.ones(n_cells, dtype=np.uint32) # All method=1
583+
set1["num_vertices"] = np.array(cell_num_vertices, dtype=np.int32)
584+
set1["vertices"] = cell_vertices_flat.astype(np.float32)
585+
586+
new_store.attrs.update(existing_store.attrs)
587+
new_store.attrs["number_cells"] = n_cells
588+
new_store.store.close()
589+
590+
if analysis_df is None:
591+
analysis_df = pd.DataFrame(
592+
[cell_id2old_id[i] for i in cell_id], columns=[cell_id_columns]
593+
)
594+
analysis_df["default"] = "segger"
595+
596+
zarr_df = pd.DataFrame(
597+
[cell_id2old_id[i] for i in cell_id], columns=[cell_id_columns]
598+
)
599+
clustering_df = pd.merge(zarr_df, analysis_df, how="left", on=cell_id_columns)
600+
clusters_names = [col for col in analysis_df.columns if col != cell_id_columns]
601+
602+
clusters_dict = {
603+
cluster: {
604+
label: idx + 1
605+
for idx, label in enumerate(
606+
sorted(np.unique(clustering_df[cluster].dropna()))
607+
)
608+
}
609+
for cluster in clusters_names
610+
}
611+
612+
new_zarr = zarr.open(storage / f"{analysis_filename}.zarr.zip", mode="w")
613+
new_zarr.create_group("/cell_groups")
614+
for i, cluster in enumerate(clusters_names):
615+
new_zarr["cell_groups"].create_group(str(i))
616+
group_values = [clusters_dict[cluster].get(x, 0) for x in clustering_df[cluster]]
617+
indices, indptr = get_indices_indptr(np.array(group_values))
618+
new_zarr["cell_groups"][str(i)]["indices"] = indices
619+
new_zarr["cell_groups"][str(i)]["indptr"] = indptr
620+
621+
new_zarr["cell_groups"].attrs.update(
622+
{
623+
"major_version": 1,
624+
"minor_version": 0,
625+
"number_groupings": len(clusters_names),
626+
"grouping_names": clusters_names,
627+
"group_names": [
628+
sorted(clusters_dict[cluster], key=clusters_dict[cluster].get)
629+
for cluster in clusters_names
630+
],
631+
}
632+
)
633+
new_zarr.store.close()
634+
635+
generate_experiment_file(
636+
template_path=source_path / "experiment.xenium",
637+
output_path=storage / xenium_filename,
638+
cells_name=cells_filename,
639+
analysis_name=analysis_filename,
640+
)

0 commit comments

Comments
 (0)