Skip to content

Commit cb35f7d

Browse files
More CLEM fixes (#306)
Pull request to resolve some of the issues mentioned in issue #304. * More robust sorting of TIFF files once the stack exceeds 100 images * Better logic to distinguish between image series that start with same position number * Skipped processing of binned "Position X_pmd_X--" TIFF files * Skipped processing of "Postion X_histo" XLIF files * Optimised rsync parser logic to read rsync output correctly * Corrected incorrect variable in tiff_to_stack function being passed to zocalo cluster --------- Co-authored-by: Daniel Hatton <[email protected]>
1 parent 2f62f2d commit cb35f7d

File tree

7 files changed

+170
-56
lines changed

7 files changed

+170
-56
lines changed

src/murfey/cli/tiff_to_stack.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,10 @@ def run():
3939
f.resolve()
4040
for f in tiff_file.parent.glob("./*")
4141
if f.suffix in {".tif", ".tiff"}
42-
and f.stem.startswith(tiff_file.stem.split("--")[0])
42+
# Handle cases where series start with the same position number,
43+
# but deviate afterwards
44+
and f.stem.startswith(tiff_file.stem.split("--")[0] + "--")
4345
]
44-
# Sort by series, then channel, then frame
45-
tiff_list.sort(
46-
key=lambda e: (
47-
e.stem.split("--")[0],
48-
e.stem.split("--")[2],
49-
e.stem.split("--")[1],
50-
)
51-
)
5246

5347
# Resolve for metadata argument
5448
if not args.metadata:

src/murfey/client/analyser.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def _find_context(self, file_path: Path) -> bool:
141141
self._context = CLEMContext("leica", self._basepath)
142142
return True
143143

144-
# Checks for tomography and SPA workflows
144+
# Tomography and SPA workflow checks
145145
split_file_name = file_path.name.split("_")
146146
if split_file_name:
147147
# Files starting with "FoilHole" belong to the SPA workflow
@@ -237,15 +237,18 @@ def post_transfer(self, transferred_file: Path):
237237
transferred_file, role=self._role, environment=self._environment
238238
)
239239
except Exception as e:
240-
logger.error(
241-
f"An exception was encountered posting the file for transfer: {e}"
242-
)
240+
logger.error(f"An exception was encountered post transfer: {e}")
243241

244242
def _analyse(self):
245243
logger.info("Analyser thread started")
246244
mdoc_for_reading = None
247245
while not self._halt_thread:
248246
transferred_file = self.queue.get()
247+
transferred_file = (
248+
Path(transferred_file)
249+
if isinstance(transferred_file, str)
250+
else transferred_file
251+
)
249252
if not transferred_file:
250253
self._halt_thread = True
251254
continue
@@ -341,9 +344,15 @@ def _analyse(self):
341344
),
342345
}
343346
)
347+
344348
# If a file with a CLEM context is identified, immediately post it
345349
elif isinstance(self._context, CLEMContext):
350+
logger.info(
351+
f"File {transferred_file.name!r} will be processed as part of CLEM workflow"
352+
)
346353
self.post_transfer(transferred_file)
354+
355+
# Handle files with tomography and SPA context differently
347356
elif not self._extension or self._unseen_xml:
348357
self._find_extension(transferred_file)
349358
if self._extension:

src/murfey/client/contexts/clem.py

Lines changed: 76 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
the CLEM workflow should be processed.
44
"""
55

6-
# import requests
76
import logging
87
from datetime import datetime
98
from pathlib import Path
@@ -13,7 +12,7 @@
1312

1413
from murfey.client.context import Context
1514
from murfey.client.instance_environment import MurfeyInstanceEnvironment
16-
from murfey.util import capture_post, get_machine_config, sanitise
15+
from murfey.util import capture_post, get_machine_config
1716
from murfey.util.clem.xml import get_image_elements
1817

1918
# Create logger object
@@ -74,24 +73,27 @@ def post_transfer(
7473
environment: Optional[MurfeyInstanceEnvironment] = None,
7574
**kwargs,
7675
) -> bool:
76+
7777
super().post_transfer(
7878
transferred_file, role=role, environment=environment, **kwargs
7979
)
8080

8181
# Process files generated by "auto-save" acquisition mode
8282
# These include TIF/TIFF and XLIF files
8383
if transferred_file.suffix in (".tif", ".tiff", ".xlif"):
84+
logger.debug(f"File extension {transferred_file.suffix!r} detected")
85+
8486
# Type checking to satisfy MyPy
8587
if not environment:
8688
logger.warning("No environment passed in")
87-
return True
89+
return False
8890

8991
# Location of the file on the client PC
9092
source = _get_source(transferred_file, environment)
9193
# Type checking to satisfy MyPy
9294
if not source:
9395
logger.warning(f"No source found for file {transferred_file}")
94-
return True
96+
return False
9597

9698
# Get the Path on the DLS file system
9799
file_path = _file_transferred_to(
@@ -101,23 +103,54 @@ def post_transfer(
101103
)
102104
if not file_path:
103105
logger.warning(
104-
f"File associated with {sanitise(str(transferred_file))} not found on the storage system"
106+
f"File {transferred_file.name!r} not found on the storage system"
105107
)
106108
return False
107109

110+
# Skip processing of binned "_pmd" image series
111+
if "_pmd_" in transferred_file.stem:
112+
logger.debug(
113+
f"File {transferred_file.name!r} belongs to the '_pmd_' series of binned images; skipping processing"
114+
)
115+
return True
116+
108117
# Process TIF/TIFF files
109118
if transferred_file.suffix in (".tif", ".tiff"):
119+
logger.debug("Detected a TIFF file")
120+
110121
# Files should be named "PositionX--ZXX--CXX.tif" by default
111-
if not len(transferred_file.stem.split("--")) == 3:
122+
# If Position is repeated, it will add an additional --00X to the end
123+
if len(transferred_file.stem.split("--")) not in [3, 4]:
112124
logger.warning(
113-
"This TIFF file is likely not part of the CLEM workflow"
125+
f"File {transferred_file.name!r} is likely not part of the CLEM workflow"
114126
)
115127
return False
116128

117-
# Get series name from file name
118-
series_name = "/".join(
119-
[*file_path.parent.parts[-2:], file_path.stem.split("--")[0]]
120-
) # The previous 2 parent directories should be unique enough
129+
# Create a unique name for the series
130+
# For standard file name
131+
if len(transferred_file.stem.split("--")) == 3:
132+
series_name = "/".join(
133+
[
134+
*file_path.parent.parts[-2:], # Upper 2 parent directories
135+
file_path.stem.split("--")[0],
136+
]
137+
)
138+
# When this a repeated position
139+
elif len(transferred_file.stem.split("--")) == 4:
140+
series_name = "/".join(
141+
[
142+
*file_path.parent.parts[-2:], # Upper 2 parent directories
143+
"--".join(file_path.stem.split("--")[i] for i in [0, -1]),
144+
]
145+
)
146+
else:
147+
logger.error(
148+
f"Series name could not be generated from file {transferred_file.name!r}"
149+
)
150+
return False
151+
logger.debug(
152+
f"File {transferred_file.name!r} given the series identifier {series_name!r}"
153+
)
121154

122155
# Create key-value pairs containing empty list if not already present
123156
if series_name not in self._tiff_series.keys():
@@ -132,15 +165,28 @@ def post_transfer(
132165
self._tiff_timestamps[series_name].append(
133166
transferred_file.stat().st_ctime
134167
)
168+
logger.debug(
169+
f"Created TIFF file dictionary entries for {series_name!r}"
170+
)
135171

136172
# Process XLIF files
137173
if transferred_file.suffix == ".xlif":
174+
logger.debug("Detected an XLIF file")
175+
176+
# Skip processing of "_histo" histogram XLIF files
177+
if transferred_file.stem.endswith("_histo"):
178+
logger.debug(
179+
f"File {transferred_file.name!r} contains histogram metadata; skipping processing"
180+
)
138181

139182
# XLIF files don't have the "--ZXX--CXX" additions in the file name
140183
# But they have "/Metadata/" as the immediate parent
141184
series_name = "/".join(
142185
[*file_path.parent.parent.parts[-2:], file_path.stem]
143186
) # The previous 2 parent directories should be unique enough
187+
logger.debug(
188+
f"File {transferred_file.name!r} given the series identifier {series_name!r}"
189+
)
144190

145191
# Extract metadata to get the expected size of the series
146192
metadata = parse(transferred_file).getroot()
@@ -162,18 +208,30 @@ def post_transfer(
162208
else 1
163209
)
164210
num_files = num_channels * num_frames
211+
logger.debug(
212+
f"Expected number of files in {series_name!r}: {num_files}"
213+
)
165214

166215
# Update dictionary entries
167216
self._files_in_series[series_name] = num_files
168217
self._series_metadata[series_name] = str(file_path)
169218
self._metadata_size[series_name] = transferred_file.stat().st_size
170219
self._metadata_timestamp[series_name] = transferred_file.stat().st_ctime
220+
logger.debug(f"Created dictionary entries for {series_name!r} metadata")
171221

172222
# Post message if all files for the associated series have been collected
173-
if len(self._tiff_series[series_name]) == self._files_in_series.get(
174-
series_name, 0 # Return 0 if the key hasn't been generated yet
223+
# .get(series_name, 0) returns 0 if no associated key is found
224+
if len(self._tiff_series[series_name]) == 0:
225+
logger.debug(f"TIFF series {series_name!r} not yet loaded")
226+
return True
227+
elif self._files_in_series.get(series_name, 0) == 0:
228+
logger.debug(
229+
f"Metadata for TIFF series {series_name!r} not yet processed"
230+
)
231+
return True
232+
elif len(self._tiff_series[series_name]) == self._files_in_series.get(
233+
series_name, 0
175234
):
176-
177235
# Construct URL for Murfey server to communicate with
178236
url = f"{str(environment.url.geturl())}/sessions/{environment.murfey_session}/tiff_to_stack"
179237
if not url:
@@ -194,6 +252,9 @@ def post_transfer(
194252
"description": "",
195253
},
196254
)
255+
return True
256+
else:
257+
logger.debug(f"TIFF series {series_name!r} is still being processed")
197258

198259
# Process LIF files
199260
if transferred_file.suffix == ".lif":
@@ -233,4 +294,5 @@ def post_transfer(
233294
"description": "",
234295
},
235296
)
297+
return True
236298
return True

src/murfey/client/rsync.py

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
"""
2+
rsync is a Linux process that does fast, versatile remote (and local) file copying.
3+
This module allows Murfey to use rsync to facilitate its file transfers, and to
4+
parse the logs outputted by rsync in order confirm that the file has been correctly
5+
transferred.
6+
"""
7+
18
from __future__ import annotations
29

310
import logging
@@ -251,22 +258,39 @@ def _fake_transfer(self, files: list[Path]) -> bool:
251258

252259
def _transfer(self, files: list[Path]) -> bool:
253260
"""
254-
Actually transfer files in an rsync subprocess
261+
Transfer files via an rsync sub-process, and parses the rsync stdout to verify
262+
the success of the transfer.
255263
"""
256-
previously_transferred = self._files_transferred
257264

258-
next_file: RSyncerUpdate | None = None
265+
# Set up initial variables
266+
files = [f for f in files if f.is_file()]
267+
previously_transferred = self._files_transferred
259268
transfer_success: set[Path] = set()
260269
successful_updates: list[RSyncerUpdate] = []
261-
262-
files = [f for f in files if f.is_file()]
270+
next_file: RSyncerUpdate | None = None
263271

264272
def parse_stdout(line: str):
273+
"""
274+
Reads the stdout from rsync in order to verify the status of transferred
275+
files and other things.
276+
"""
265277
nonlocal next_file
266278

279+
# Miscellaneous rsync stdout lines to skip
267280
if not line:
268281
return
282+
if line.startswith(("building file list", "created directory", "sending")):
283+
return
284+
if line.startswith("sent "):
285+
# sent 6,676 bytes received 397 bytes 4,715.33 bytes/sec
286+
return
287+
if line.startswith("total "):
288+
# total size is 315,265,653 speedup is 44,573.12 (DRY RUN)
289+
return
290+
if line.startswith(("cd", ".d")):
291+
return
269292

293+
# Lines with chr(13), \r, contain file transfer information
270294
if chr(13) in line:
271295
# partial transfer
272296
#
@@ -288,15 +312,8 @@ def parse_stdout(line: str):
288312
successful_updates.append(next_file._replace(file_size=size_bytes))
289313
next_file = None
290314
return
291-
if line.startswith(("building file list", "created directory", "sending")):
292-
return
293-
if line.startswith("sent "):
294-
# sent 6,676 bytes received 397 bytes 4,715.33 bytes/sec
295-
return
296-
if line.startswith("total "):
297-
# total size is 315,265,653 speedup is 44,573.12 (DRY RUN)
298-
return
299315

316+
# Lines starting with "*f" contain file transfer information
300317
if line.startswith((".f", ">f", "<f")):
301318
# .d ./
302319
# .f README.md
@@ -317,7 +334,9 @@ def parse_stdout(line: str):
317334
self._files_transferred - previously_transferred
318335
)
319336
update = RSyncerUpdate(
320-
file_path=Path(line[12:].replace(" ", "")),
337+
file_path=Path(
338+
line[12:].rstrip()
339+
), # Remove trailing newlines, spaces, and carriage returns
321340
file_size=0,
322341
outcome=TransferResult.SUCCESS,
323342
transfer_total=self._files_transferred - previously_transferred,
@@ -334,18 +353,17 @@ def parse_stdout(line: str):
334353
next_file = update
335354
return
336355

337-
if line.startswith(("cd", ".d")):
338-
return
339-
340356
def parse_stderr(line: str):
341-
logger.warning(line)
357+
logger.warning(f"rsync stderr: {line!r}")
342358

343-
relative_filenames = []
359+
# Generate list of relative filenames for this batch of transferred files
360+
relative_filenames: List[Path] = []
344361
for f in files:
345362
try:
346363
relative_filenames.append(f.relative_to(self._basepath))
347364
except ValueError:
348365
raise ValueError(f"File '{f}' is outside of {self._basepath}") from None
366+
349367
if self._remove_files:
350368
if self._required_substrings_for_removal:
351369
rsync_stdin_remove = b"\n".join(
@@ -372,6 +390,9 @@ def parse_stderr(line: str):
372390
else:
373391
rsync_stdin_remove = b""
374392
rsync_stdin = b"\n".join(os.fsencode(f) for f in relative_filenames)
393+
394+
# Create and run rsync subprocesses
395+
# rsync commands to pass to subprocess
375396
rsync_cmd = [
376397
"rsync",
377398
"-iiv",
@@ -381,7 +402,6 @@ def parse_stderr(line: str):
381402
"--files-from=-",
382403
"-p", # preserve permissions
383404
]
384-
385405
rsync_cmd.extend([".", self._remote])
386406

387407
result: subprocess.CompletedProcess | None = None
@@ -415,7 +435,9 @@ def parse_stderr(line: str):
415435

416436
self.notify(successful_updates, secondary=True)
417437

438+
# Compare files from rsync stdout to original list to verify transfer
418439
for f in set(relative_filenames) - transfer_success:
440+
logger.warning(f"Transfer of file {f.name!r} considered a failure")
419441
self._files_transferred += 1
420442
current_outstanding = self.queue.unfinished_tasks - (
421443
self._files_transferred - previously_transferred

0 commit comments

Comments
 (0)