Skip to content

Commit 13f35a7

Browse files
authored
Optimise CLEM Data Transfer and Registration (#707)
* Added optional MachineConfig key to blacklist directories and files with certain string patterns from being transferred * Added logic to blacklist files and folders based on provided substrings; added 'substrings_blacklist' as an attribute to the DirWatcher class * Pass substrings blacklist from machine config to DirWatcher when initialising it * Register a dataset as an atlas based on the image width/height instead of based on keywords in the file path * Renamed 'CLEMAlignAndMergeParameters' to 'CLEMProcessingParameters', and added the threshold at which a dataset is considered an Atlas database entry to it
1 parent 52829f1 commit 13f35a7

File tree

6 files changed

+230
-14
lines changed

6 files changed

+230
-14
lines changed

src/murfey/client/multigrid_control.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,13 @@ def rsync_result(update: RSyncerUpdate):
468468
session_id=self._environment.murfey_session,
469469
data=rsyncer_data,
470470
)
471-
self._environment.watchers[source] = DirWatcher(source, settling_time=30)
471+
self._environment.watchers[source] = DirWatcher(
472+
source,
473+
settling_time=30,
474+
substrings_blacklist=self._machine_config.get(
475+
"substrings_blacklist", {"directories": [], "files": []}
476+
),
477+
)
472478

473479
if not self.analysers.get(source) and analyse:
474480
log.info(f"Starting analyser for {source}")

src/murfey/client/watchdir.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def __init__(
3232
path: str | os.PathLike,
3333
settling_time: float = 60,
3434
appearance_time: float | None = None,
35+
substrings_blacklist: dict[str, dict] = {},
3536
transfer_all: bool = True,
3637
status_bar: StatusBar | None = None,
3738
):
@@ -42,6 +43,7 @@ def __init__(
4243
self._statusbar = status_bar
4344
self.settling_time = settling_time
4445
self._appearance_time = appearance_time
46+
self._substrings_blacklist = substrings_blacklist
4547
self._transfer_all = transfer_all
4648
self._modification_overwrite: float | None = None
4749
self._init_time: float = time.time()
@@ -128,7 +130,7 @@ def scan(self, modification_time: float | None = None, transfer_all: bool = Fals
128130
settling_time=scan_completion
129131
)
130132

131-
# Create a list of files sroted based on their timestamps
133+
# Create a list of files sorted based on their timestamps
132134
files_for_transfer = []
133135
time_ordered_file_candidates = sorted(
134136
self._file_candidates,
@@ -150,8 +152,9 @@ def scan(self, modification_time: float | None = None, transfer_all: bool = Fals
150152
continue
151153

152154
if (
153-
self._file_candidates[x].settling_time + self.settling_time # type: ignore
154-
< time.time()
155+
current_file_settling_time := self._file_candidates[x].settling_time
156+
) is not None and (
157+
current_file_settling_time + self.settling_time < time.time()
155158
):
156159
try:
157160
file_stat = os.stat(x)
@@ -252,15 +255,28 @@ def _scan_directory(
252255
raise
253256
for entry in directory_contents:
254257
entry_name = os.path.join(path, entry.name)
255-
if entry.is_dir() and (
258+
# Skip any directories with matching blacklisted substrings
259+
if entry.is_dir() and any(
260+
char in entry.name
261+
for char in self._substrings_blacklist.get("directories", [])
262+
):
263+
log.debug(f"Skipping blacklisted directory {str(entry.name)!r}")
264+
continue
265+
elif entry.is_dir() and (
256266
modification_time is None or entry.stat().st_ctime >= modification_time
257267
):
258268
result.update(self._scan_directory(entry_name))
259269
else:
260270
# Exclude textual log
261271
if "textual" in str(entry):
262272
continue
263-
273+
# Exclude files with blacklisted substrings
274+
if any(
275+
char in entry.name
276+
for char in self._substrings_blacklist.get("files", [])
277+
):
278+
log.debug(f"Skipping blacklisted file {str(entry.name)!r}")
279+
continue
264280
# Get file statistics and append file to dictionary
265281
try:
266282
file_stat = entry.stat()

src/murfey/util/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ class MachineConfig(BaseModel): # type: ignore
4848
analyse_created_directories: list[str] = []
4949
gain_reference_directory: Optional[Path] = None
5050
eer_fractionation_file_template: str = ""
51+
substrings_blacklist: dict[str, list] = {
52+
"directories": [],
53+
"files": [],
54+
}
5155

5256
# Data transfer setup -------------------------------------------------------------
5357
# Rsync setup

src/murfey/util/processing_params.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,18 @@ def cryolo_model_path(visit: str, instrument_name: str) -> Path:
5454
return machine_config.default_model
5555

5656

57-
class CLEMAlignAndMergeParameters(BaseModel):
57+
class CLEMProcessingParameters(BaseModel):
58+
# Atlas vs GridSquare registration threshold
59+
atlas_threshold: float = 0.0015 # in m
60+
61+
# Image alignment and merging-specific parameters
5862
crop_to_n_frames: Optional[int] = 50
5963
align_self: Literal["enabled", ""] = "enabled"
6064
flatten: Literal["mean", "min", "max", ""] = "mean"
6165
align_across: Literal["enabled", ""] = "enabled"
6266

6367

64-
default_clem_align_and_merge_parameters = CLEMAlignAndMergeParameters()
68+
default_clem_processing_parameters = CLEMProcessingParameters()
6569

6670

6771
class SPAParameters(BaseModel):

src/murfey/workflows/clem/register_preprocessing_results.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from murfey.server import _transport_object
2323
from murfey.util.models import GridSquareParameters
2424
from murfey.util.processing_params import (
25-
default_clem_align_and_merge_parameters as processing_params,
25+
default_clem_processing_parameters as processing_params,
2626
)
2727
from murfey.workflows.clem import get_db_entry
2828
from murfey.workflows.clem.align_and_merge import submit_cluster_request
@@ -51,6 +51,14 @@ class CLEMPreprocessingResult(BaseModel):
5151
extent: list[float] # [x0, x1, y0, y1]
5252

5353

54+
def _is_clem_atlas(result: CLEMPreprocessingResult):
55+
# If an image has a width/height of at least 1.5 mm, it should qualify as an atlas
56+
return (
57+
max(result.pixels_x * result.pixel_size, result.pixels_y * result.pixel_size)
58+
>= processing_params.atlas_threshold
59+
)
60+
61+
5462
def _register_clem_image_series(
5563
session_id: int,
5664
result: CLEMPreprocessingResult,
@@ -142,9 +150,7 @@ def _register_clem_image_series(
142150

143151
# Add metadata for this series
144152
clem_img_series.search_string = str(output_file.parent / "*tiff")
145-
clem_img_series.data_type = (
146-
"atlas" if "Overview_" in result.series_name else "grid_square"
147-
)
153+
clem_img_series.data_type = "atlas" if _is_clem_atlas(result) else "grid_square"
148154
clem_img_series.number_of_members = result.number_of_members
149155
clem_img_series.pixels_x = result.pixels_x
150156
clem_img_series.pixels_y = result.pixels_y
@@ -181,7 +187,7 @@ def _register_dcg_and_atlas(
181187
dcg_name += f"--{result.series_name.split('--')[1]}"
182188

183189
# Determine values for atlas
184-
if "Overview_" in result.series_name: # These are atlas datasets
190+
if _is_clem_atlas(result):
185191
output_file = list(result.output_files.values())[0]
186192
atlas_name = str(output_file.parent / "*.tiff")
187193
atlas_pixel_size = result.pixel_size
@@ -197,7 +203,7 @@ def _register_dcg_and_atlas(
197203
dcg_entry = dcg_search[0]
198204
# Update atlas if registering atlas dataset
199205
# and data collection group already exists
200-
if "Overview_" in result.series_name:
206+
if _is_clem_atlas(result):
201207
atlas_message = {
202208
"session_id": session_id,
203209
"dcgid": dcg_entry.id,

tests/client/test_watchdir.py

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
import os
2+
import queue
3+
import threading
4+
from pathlib import Path
5+
6+
import pytest
7+
8+
from murfey.client.watchdir import DirWatcher
9+
from tests.conftest import ExampleVisit
10+
11+
12+
def test_dirwatcher_initialises(tmp_path: Path):
13+
# Check that the DirWatcher initialises with the default attributes
14+
watcher = DirWatcher(path=str(tmp_path))
15+
assert watcher._basepath == os.fspath(str(tmp_path))
16+
assert watcher._lastscan == {}
17+
assert watcher._file_candidates == {}
18+
assert watcher._statusbar is None
19+
assert watcher.settling_time == 60
20+
assert watcher._appearance_time is None
21+
assert watcher._substrings_blacklist == {}
22+
assert watcher._transfer_all is True
23+
assert watcher._modification_overwrite is None
24+
assert isinstance(watcher._init_time, float)
25+
assert isinstance(watcher.queue, queue.Queue)
26+
assert isinstance(watcher.thread, threading.Thread)
27+
assert watcher._stopping is False
28+
assert watcher._halt_thread is False
29+
30+
# Check that the string representation is as expected
31+
assert str(watcher) == f"<DirWatcher ({os.fspath(str(tmp_path))})>"
32+
33+
34+
@pytest.fixture
35+
def clem_visit_dir(tmp_path: Path):
36+
visit_name = f"{ExampleVisit.proposal_code}{ExampleVisit.proposal_number}-{ExampleVisit.visit_number}"
37+
visit_dir = tmp_path / "clem" / "data" / "2025" / visit_name
38+
visit_dir.mkdir(parents=True, exist_ok=True)
39+
return visit_dir
40+
41+
42+
@pytest.fixture
43+
def clem_test_files(clem_visit_dir: Path):
44+
# Create test files for the DirWatcher to scan
45+
file_list: list[Path] = []
46+
project_dir = clem_visit_dir / "images" / "test_grid"
47+
48+
# Example atlas collection
49+
for s in range(20):
50+
file_list.append(
51+
project_dir
52+
/ "Overview 1"
53+
/ "Image 1"
54+
/ f"Image 1--Stage{str(s).zfill(2)}.tif"
55+
)
56+
file_list.append(
57+
project_dir / "Overview 1" / "Image 1" / "Metadata" / "Image 1.xlif"
58+
)
59+
60+
# Example image stack collection
61+
for c in range(3):
62+
for z in range(10):
63+
file_list.append(
64+
project_dir
65+
/ "TileScan 1"
66+
/ "Position 1"
67+
/ f"Position 1--C{str(c).zfill(2)}--Z{str(z).zfill(2)}.tif"
68+
)
69+
file_list.append(
70+
project_dir / "TileScan 1" / "Position 1" / "Metadata" / "Position 1.xlif"
71+
)
72+
73+
# Create all files and directories specified
74+
for file in file_list:
75+
if not file.parent.exists():
76+
file.parent.mkdir(parents=True)
77+
if not file.exists():
78+
file.touch()
79+
return sorted(file_list)
80+
81+
82+
@pytest.fixture
83+
def clem_junk_files(clem_visit_dir: Path):
84+
# Create junk files that are to be blacklisted from the CLEM workflow
85+
file_list: list[Path] = []
86+
project_dir = clem_visit_dir / "images" / "test_grid"
87+
88+
# Create junk atlas data
89+
for n in range(5):
90+
for s in range(20):
91+
file_list.append(
92+
project_dir
93+
/ "Image 1"
94+
/ f"Image 1_pmd_{n}"
95+
/ f"Image 1_pmd_{n}--Stage{str(s).zfill(2)}.tif"
96+
)
97+
file_list.append(
98+
project_dir / "Image 1" / f"Image 1_pmd_{n}" / "Metadata" / "Image 1.xlif"
99+
)
100+
101+
# Creat junk image data
102+
for n in range(5):
103+
for c in range(3):
104+
for z in range(10):
105+
file_list.append(
106+
project_dir
107+
/ "Position 1"
108+
/ f"Position 1_pmd_{n}"
109+
/ f"Position 1_pmd_{n}--C{str(c).zfill(2)}--Z{str(z).zfill(2)}.tif"
110+
)
111+
file_list.append(
112+
project_dir
113+
/ "Position 1"
114+
/ f"Position 1_pmd_{n}"
115+
/ "Metadata"
116+
/ "Position 1.xlif"
117+
)
118+
119+
# Create remaining junk files
120+
for file_path in (
121+
"1.xlef",
122+
"Metadata/IOManagerConfiguation.xlif",
123+
"Metadata/Overview 1.xlcf",
124+
"Metadata/TileScan 1.xlcf",
125+
"Overview 1/Image 1/Image 1_histo.lof",
126+
"TileScan 1/Position 1/Position 1_histo.lof",
127+
"Overview 1/Image 1/Metadata/Image 1_histo.xlif",
128+
"TileScan 1/Position 1/Metadata/Position 1_histo.xlif",
129+
):
130+
file_list.append(project_dir / file_path)
131+
132+
# Create files and directoriees
133+
for file in file_list:
134+
if not file.parent.exists():
135+
file.parent.mkdir(parents=True)
136+
if not file.exists():
137+
file.touch()
138+
return sorted(file_list)
139+
140+
141+
scan_directory_params_matrix: tuple[tuple[str, dict[str, list[str]]], ...] = (
142+
# Workflow type | Substrings blacklist
143+
(
144+
"clem",
145+
{
146+
"directories": [
147+
"_pmd_",
148+
],
149+
"files": [
150+
".xlef",
151+
".xlcf",
152+
"_histo.lof",
153+
"_histo.xlif",
154+
"IOManagerConfiguation.xlif",
155+
],
156+
},
157+
),
158+
)
159+
160+
161+
@pytest.mark.parametrize("test_params", scan_directory_params_matrix)
162+
def test_scan_directory(
163+
clem_visit_dir: Path,
164+
clem_test_files: list[Path],
165+
clem_junk_files: list[Path],
166+
test_params: tuple[str, dict[str, list[str]]],
167+
):
168+
# Unpack test params
169+
workflow_type, substrings_blacklist = test_params
170+
171+
# Initialise different watchers based on the workflow to test and run the scan
172+
if workflow_type == "clem":
173+
watcher = DirWatcher(
174+
path=str(clem_visit_dir),
175+
substrings_blacklist=substrings_blacklist,
176+
)
177+
result = watcher._scan_directory()
178+
179+
# Check that the result does not contain the junk files
180+
assert [str(file) for file in clem_test_files] == sorted(result.keys())

0 commit comments

Comments
 (0)