Skip to content

Commit 832eeac

Browse files
authored
Merge pull request #13 from DigitalHolography/12-pipeline-possible-danger-of-contamination
feature/Added_the_PipelineDescriptor_dataclass
2 parents d5ed5d5 + a51fac1 commit 832eeac

File tree

5 files changed

+138
-79
lines changed

5 files changed

+138
-79
lines changed

src/angio_eye.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
sv_ttk = None
1616

1717
from pipelines import (
18+
PipelineDescriptor,
1819
ProcessPipeline,
1920
ProcessResult,
2021
load_pipeline_catalog,
@@ -70,7 +71,7 @@ def __init__(self) -> None:
7071
self.title("HDF5 Process")
7172
self.geometry("800x600")
7273
self.h5_file: h5py.File | None = None
73-
self.pipeline_registry: dict[str, ProcessPipeline] = {}
74+
self.pipeline_registry: dict[str, PipelineDescriptor] = {}
7475
self.pipeline_check_vars: dict[str, tk.BooleanVar] = {}
7576
self.last_process_result: ProcessResult | None = None
7677
self.last_process_pipeline: ProcessPipeline | None = None
@@ -329,12 +330,12 @@ def _register_pipelines(self) -> None:
329330
self._populate_pipeline_checks(available, missing)
330331

331332
def _populate_pipeline_checks(
332-
self, available: list[ProcessPipeline], missing: list[ProcessPipeline]
333+
self, available: list[PipelineDescriptor], missing: list[PipelineDescriptor]
333334
) -> None:
334335
for child in self.pipeline_checks_inner.winfo_children():
335336
child.destroy()
336337
self.pipeline_check_vars = {}
337-
rows: list[ProcessPipeline] = [*available, *missing]
338+
rows: list[PipelineDescriptor] = [*available, *missing]
338339
for idx, pipeline in enumerate(rows):
339340
is_available = getattr(pipeline, "available", True)
340341
var = tk.BooleanVar(value=is_available)
@@ -422,8 +423,8 @@ def run_selected_pipeline(self) -> None:
422423
"Missing pipeline", "Select a pipeline before running."
423424
)
424425
return
425-
pipeline = self.pipeline_registry.get(name)
426-
if pipeline is None:
426+
pipeline_desc = self.pipeline_registry.get(name)
427+
if pipeline_desc is None:
427428
messagebox.showerror(
428429
"Pipeline missing", f"Pipeline '{name}' is not registered."
429430
)
@@ -432,6 +433,7 @@ def run_selected_pipeline(self) -> None:
432433
messagebox.showwarning("Missing file", "Load a .h5 file first.")
433434
return
434435
try:
436+
pipeline = pipeline_desc.instantiate()
435437
result = pipeline.run(self.h5_file)
436438
except Exception as exc: # noqa: BLE001
437439
messagebox.showerror("Pipeline error", f"Pipeline failed: {exc}")
@@ -522,7 +524,7 @@ def run_batch(self) -> None:
522524
)
523525
return
524526

525-
pipelines: list[ProcessPipeline] = []
527+
pipelines: list[PipelineDescriptor] = []
526528
missing: list[str] = []
527529
for name in selected_names:
528530
pipeline = self.pipeline_registry.get(name)
@@ -646,15 +648,16 @@ def _safe_pipeline_suffix(self, name: str) -> str:
646648
def _run_pipelines_on_file(
647649
self,
648650
h5_path: Path,
649-
pipelines: Sequence[ProcessPipeline],
651+
pipelines: Sequence[PipelineDescriptor],
650652
output_root: Path,
651653
) -> None:
652654
data_dir = output_root / h5_path.stem
653655
data_dir.mkdir(parents=True, exist_ok=True)
654656
combined_h5_out = data_dir / f"{h5_path.stem}_pipelines_result.h5"
655657
pipeline_results: list[tuple[str, ProcessResult]] = []
656658
with h5py.File(h5_path, "r") as h5file:
657-
for pipeline in pipelines:
659+
for pipeline_desc in pipelines:
660+
pipeline = pipeline_desc.instantiate()
658661
result = pipeline.run(h5file)
659662
pipeline_results.append((pipeline.name, result))
660663
self._log_batch(f"[OK] {h5_path.name} -> {pipeline.name}")

src/cli.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,30 @@
1919
import sys
2020
import tempfile
2121
import zipfile
22-
from collections.abc import Iterable, Sequence
22+
from collections.abc import Sequence
2323
from pathlib import Path
2424

2525
import h5py
2626

27-
from pipelines import ProcessPipeline, ProcessResult, load_all_pipelines
27+
from pipelines import (
28+
PipelineDescriptor,
29+
ProcessResult,
30+
load_pipeline_catalog,
31+
)
2832
from pipelines.core.utils import write_combined_results_h5
2933

3034

31-
def _build_pipeline_registry() -> dict[str, ProcessPipeline]:
32-
pipelines = load_all_pipelines()
33-
return {p.name: p for p in pipelines}
35+
def _build_pipeline_registry() -> dict[str, PipelineDescriptor]:
36+
available, _ = load_pipeline_catalog()
37+
# pipelines = load_all_pipelines()
38+
return {p.name: p for p in available}
3439

3540

3641
def _load_pipeline_list(
37-
path: Path, registry: dict[str, ProcessPipeline]
38-
) -> list[ProcessPipeline]:
42+
path: Path, registry: dict[str, PipelineDescriptor]
43+
) -> list[PipelineDescriptor]:
3944
raw_lines = path.read_text(encoding="utf-8").splitlines()
40-
selected: list[ProcessPipeline] = []
45+
selected: list[PipelineDescriptor] = []
4146
missing: list[str] = []
4247
for line in raw_lines:
4348
name = line.strip()
@@ -92,7 +97,7 @@ def _prepare_data_root(
9297

9398
def _run_pipelines_on_file(
9499
h5_path: Path,
95-
pipelines: Sequence[ProcessPipeline],
100+
pipelines: Sequence[PipelineDescriptor],
96101
output_root: Path,
97102
) -> list[Path]:
98103
outputs: list[Path] = []
@@ -101,7 +106,8 @@ def _run_pipelines_on_file(
101106
combined_h5_out = data_dir / f"{h5_path.stem}_pipelines_result.h5"
102107
pipeline_results: list[tuple[str, ProcessResult]] = []
103108
with h5py.File(h5_path, "r") as h5file:
104-
for pipeline in pipelines:
109+
for pipeline_desc in pipelines:
110+
pipeline = pipeline_desc.instantiate()
105111
result = pipeline.run(h5file)
106112
pipeline_results.append((pipeline.name, result))
107113
print(f"[OK] {h5_path.name} -> {pipeline.name}")
@@ -200,7 +206,7 @@ def run_cli(
200206
shutil.rmtree(work_tempdir_path, ignore_errors=True)
201207

202208

203-
def main(argv: Iterable[str] | None = None) -> int:
209+
def main(argv: Sequence[str] | None = None) -> int:
204210
parser = argparse.ArgumentParser(
205211
description="Run AngioEye pipelines over a folder of HDF5 files."
206212
)

src/pipelines/__init__.py

Lines changed: 45 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -4,35 +4,16 @@
44
import pkgutil
55

66
# import inspect
7-
from .core.base import PIPELINE_REGISTRY, ProcessPipeline, ProcessResult
7+
from .core.base import (
8+
PIPELINE_REGISTRY,
9+
MissingPipeline,
10+
PipelineDescriptor,
11+
ProcessPipeline,
12+
ProcessResult,
13+
)
814
from .core.utils import write_combined_results_h5, write_result_h5
915

1016

11-
class MissingPipeline(ProcessPipeline):
12-
"""Placeholder for pipelines whose dependencies are missing."""
13-
14-
available = False
15-
missing_deps: list[str]
16-
requires: list[str]
17-
18-
def __init__(
19-
self, name: str, description: str, missing_deps: list[str], requires: list[str]
20-
) -> None:
21-
super().__init__()
22-
self.name = name
23-
self.description = description or "Pipeline unavailable (missing dependencies)."
24-
self.missing_deps = missing_deps
25-
self.requires = requires
26-
27-
def run(self, h5file):
28-
missing = ", ".join(
29-
self.missing_deps or self.requires or ["unknown dependency"]
30-
)
31-
raise ImportError(
32-
f"Pipeline '{self.name}' unavailable. Missing dependencies: {missing}"
33-
)
34-
35-
3617
def _module_docstring(module_name: str) -> str:
3718
spec = importlib.util.find_spec(module_name)
3819
if not spec or not spec.origin:
@@ -96,9 +77,9 @@ def _missing_requirements(requires: list[str]) -> list[str]:
9677
return missing
9778

9879

99-
def _discover_pipelines() -> tuple[list[ProcessPipeline], list[MissingPipeline]]:
100-
available: list[ProcessPipeline] = []
101-
missing: list[MissingPipeline] = []
80+
def _discover_pipelines() -> tuple[list[PipelineDescriptor], list[PipelineDescriptor]]:
81+
available: list[PipelineDescriptor] = []
82+
missing: list[PipelineDescriptor] = []
10283
# seen_classes = set()
10384

10485
for module_info in pkgutil.iter_modules(__path__):
@@ -124,28 +105,32 @@ def _discover_pipelines() -> tuple[list[ProcessPipeline], list[MissingPipeline]]
124105
except Exception as e:
125106
# Fallback for unknown failures (SyntaxError, etc.)
126107
missing.append(
127-
MissingPipeline(
128-
module_info.name, f"Error: {e}", ["Unknown"], ["Unknown"]
108+
PipelineDescriptor(
109+
name=module_info.name,
110+
description=f"Import Error: {e}",
111+
available=False,
112+
error_msg=str(e),
129113
)
130114
)
131115

132116
for _name, cls in PIPELINE_REGISTRY.items():
117+
desc = PipelineDescriptor(
118+
name=cls.name,
119+
description=cls.description,
120+
available=cls.available,
121+
requires=cls.requires,
122+
missing_deps=cls.missing_deps,
123+
pipeline_cls=cls,
124+
)
133125
if getattr(cls, "is_available", True):
134-
inst = cls()
126+
# inst = cls()
135127
# The GUI needs thoses values
136-
inst.name = cls.name
137-
inst.available = True
138-
inst.requires = cls.required_deps
139-
available.append(inst)
128+
# inst.name = cls.name
129+
# inst.available = True
130+
# inst.requires = cls.required_deps
131+
available.append(desc)
140132
else:
141-
missing.append(
142-
MissingPipeline(
143-
name=getattr(cls, "name", cls.__name__),
144-
description=getattr(cls, "description", ""),
145-
missing_deps=getattr(cls, "missing_deps", []),
146-
requires=getattr(cls, "required_deps", []),
147-
)
148-
)
133+
missing.append(desc)
149134

150135
# except ImportError as exc:
151136
# # Capture missing dependency if ModuleNotFoundError has a name.
@@ -192,15 +177,23 @@ def _discover_pipelines() -> tuple[list[ProcessPipeline], list[MissingPipeline]]
192177
return available, missing
193178

194179

195-
def load_all_pipelines(include_missing: bool = False) -> list[ProcessPipeline]:
196-
"""
197-
Discover and instantiate pipelines. Optionally include placeholders for missing deps.
198-
"""
199-
available, missing = _discover_pipelines()
200-
return available + missing if include_missing else available
180+
# def load_all_pipelines(
181+
# include_missing: bool = False,
182+
# ) -> list[type[ProcessPipeline] | MissingPipeline]:
183+
# """
184+
# Discover pipelines. Optionally include placeholders for missing deps.
185+
# """
186+
# available, missing = _discover_pipelines()
187+
# # Cast to a common list type for the type checker
188+
# combined: list[type[ProcessPipeline] | MissingPipeline] = list(available)
189+
# if include_missing:
190+
# combined.extend(missing)
191+
# return combined
201192

202193

203-
def load_pipeline_catalog() -> tuple[list[ProcessPipeline], list[MissingPipeline]]:
194+
def load_pipeline_catalog() -> tuple[
195+
list[PipelineDescriptor], list[PipelineDescriptor]
196+
]:
204197
"""Return (available, missing) pipelines for UI/CLI surfaces."""
205198
return _discover_pipelines()
206199

@@ -216,7 +209,7 @@ def load_pipeline_catalog() -> tuple[list[ProcessPipeline], list[MissingPipeline
216209
"ProcessResult",
217210
"write_result_h5",
218211
"write_combined_results_h5",
219-
"load_all_pipelines",
212+
# "load_all_pipelines",
220213
"load_pipeline_catalog",
221214
"MissingPipeline",
222215
*[_cls.__name__ for _cls in (p.__class__ for p in _AVAILABLE)],

src/pipelines/core/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .base import ProcessPipeline, ProcessResult
1+
from .base import MissingPipeline, ProcessPipeline, ProcessResult
22
from .utils import (
33
safe_h5_key,
44
write_combined_results_h5,
@@ -7,6 +7,7 @@
77

88
__all__ = [
99
"ProcessPipeline",
10+
"MissingPipeline",
1011
"ProcessResult",
1112
"safe_h5_key",
1213
"write_result_h5",

0 commit comments

Comments
 (0)