Skip to content

Commit 782da09

Browse files
committed
feat: enhance spectralis processing with parallel file handling and improved error management
1 parent 66b7bfe commit 782da09

File tree

1 file changed

+154
-86
lines changed

1 file changed

+154
-86
lines changed

spectralis_enface_flipped_combined.py

Lines changed: 154 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
import os
1717
import tempfile
18+
import threading
19+
from concurrent.futures import ThreadPoolExecutor, as_completed
1820
from contextlib import suppress
1921

2022
import azure.storage.filedatalake as azurelake # type: ignore
@@ -43,6 +45,12 @@
4345
# Process only this patient ID when set (None = all patients)
4446
PATIENT_ID_FILTER = None
4547

48+
# Number of parallel workers for download/transform/upload (per device)
49+
MAX_WORKERS = 8
50+
51+
# Lock so print from workers doesn't interleave
52+
_print_lock = threading.Lock()
53+
4654

4755
def get_file_system_client():
4856
"""Connect to Azure Data Lake (b2aistaging) using project config."""
@@ -93,6 +101,11 @@ def list_files_in_folder(fs_client, folder_path):
93101
return files
94102

95103

104+
def _safe_print(msg: str) -> None:
105+
with _print_lock:
106+
print(msg)
107+
108+
96109
def flip_and_rotate_spectralis(input_path: str, output_path: str) -> None:
97110
"""Load DICOM, flip horizontally and rotate 180°. Handles 2D and 3D (multi-frame)."""
98111
ds = pydicom.dcmread(input_path)
@@ -118,11 +131,63 @@ def flip_and_rotate_spectralis(input_path: str, output_path: str) -> None:
118131
ds.save_as(output_path, write_like_original=False)
119132

120133

134+
def _process_one_heidelberg_file(
135+
fs_client: azurelake.FileSystemClient, patient_id: str, remote_path: str
136+
) -> tuple[bool, str]:
137+
"""Download one file, flip+rotate, upload. Returns (success, out_blob_path or error)."""
138+
original_name = os.path.basename(remote_path)
139+
out_name = HEIDELBERG_PREFIX + original_name
140+
out_blob_path = f"{BASE_OUTPUT}/{patient_id}/{out_name}"
141+
142+
download_fd = tempfile.NamedTemporaryFile(
143+
delete=False, suffix=".dcm", prefix="enface_dl_"
144+
)
145+
write_fd = tempfile.NamedTemporaryFile(
146+
delete=False, suffix=".dcm", prefix="enface_out_"
147+
)
148+
try:
149+
download_path = download_fd.name
150+
write_path = write_fd.name
151+
finally:
152+
download_fd.close()
153+
write_fd.close()
154+
155+
try:
156+
file_client = fs_client.get_file_client(file_path=remote_path)
157+
with open(download_path, "wb") as f:
158+
f.write(file_client.download_file().readall())
159+
except Exception as e:
160+
for p in (download_path, write_path):
161+
with suppress(FileNotFoundError):
162+
os.unlink(p)
163+
return False, f"Download failed {remote_path}: {e}"
164+
165+
try:
166+
flip_and_rotate_spectralis(download_path, write_path)
167+
except Exception as e:
168+
for p in (download_path, write_path):
169+
with suppress(FileNotFoundError):
170+
os.unlink(p)
171+
return False, f"Transform failed {remote_path}: {e}"
172+
173+
try:
174+
out_client = fs_client.get_file_client(file_path=out_blob_path)
175+
with open(write_path, "rb") as f:
176+
out_client.upload_data(f.read(), overwrite=True)
177+
return True, out_blob_path
178+
except Exception as e:
179+
return False, f"Upload failed {out_blob_path}: {e}"
180+
finally:
181+
for p in (download_path, write_path):
182+
with suppress(FileNotFoundError):
183+
os.unlink(p)
184+
185+
121186
def process_heidelberg_spectralis(fs_client):
122187
"""
123188
Process heidelberg_spectralis: for each patient folder, download each image,
124189
flip + rotate, save as heidelberg_spectralis_<original_filename>, upload to
125-
enface-flipped-combined/{patient_id}/.
190+
enface-flipped-combined/{patient_id}/. Runs file tasks in parallel.
126191
"""
127192
input_prefix = f"{BASE_ORIGINAL}/{HEIDELBERG_SPECTRALIS_SUBFOLDER}"
128193
patient_ids = list_patient_folders(fs_client, input_prefix)
@@ -136,62 +201,74 @@ def process_heidelberg_spectralis(fs_client):
136201
return
137202
print(f" (filter: only patient {PATIENT_ID_FILTER})")
138203

139-
for patient_id in patient_ids:
140-
patient_path = f"{input_prefix}/{patient_id}"
141-
file_paths = list_files_in_folder(fs_client, patient_path)
142-
for remote_path in file_paths:
143-
original_name = os.path.basename(remote_path)
144-
out_name = HEIDELBERG_PREFIX + original_name
145-
out_blob_path = f"{BASE_OUTPUT}/{patient_id}/{out_name}"
146-
147-
download_fd = tempfile.NamedTemporaryFile(
148-
delete=False, suffix=".dcm", prefix="enface_dl_"
149-
)
150-
write_fd = tempfile.NamedTemporaryFile(
151-
delete=False, suffix=".dcm", prefix="enface_out_"
152-
)
153-
try:
154-
download_path = download_fd.name
155-
write_path = write_fd.name
156-
finally:
157-
download_fd.close()
158-
write_fd.close()
159-
160-
try:
161-
file_client = fs_client.get_file_client(file_path=remote_path)
162-
with open(download_path, "wb") as f:
163-
f.write(file_client.download_file().readall())
164-
except Exception as e:
165-
print(f" [SKIP] Download failed {remote_path}: {e}")
166-
os.unlink(download_path)
167-
os.unlink(write_path)
168-
continue
169-
170-
try:
171-
flip_and_rotate_spectralis(download_path, write_path)
172-
except Exception as e:
173-
print(f" [SKIP] Transform failed {remote_path}: {e}")
174-
os.unlink(download_path)
175-
os.unlink(write_path)
176-
continue
177-
178-
try:
179-
out_client = fs_client.get_file_client(file_path=out_blob_path)
180-
with open(write_path, "rb") as f:
181-
out_client.upload_data(f.read(), overwrite=True)
182-
print(f" [OK] {out_blob_path}")
183-
except Exception as e:
184-
print(f" [SKIP] Upload failed {out_blob_path}: {e}")
185-
finally:
186-
os.unlink(download_path)
187-
os.unlink(write_path)
204+
tasks = [
205+
(patient_id, remote_path)
206+
for patient_id in patient_ids
207+
for remote_path in list_files_in_folder(
208+
fs_client, f"{input_prefix}/{patient_id}"
209+
)
210+
]
211+
212+
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
213+
futures = {
214+
executor.submit(
215+
_process_one_heidelberg_file, fs_client, patient_id, remote_path
216+
): (patient_id, remote_path)
217+
for patient_id, remote_path in tasks
218+
}
219+
for future in as_completed(futures):
220+
success, msg = future.result()
221+
if success:
222+
_safe_print(f" [OK] {msg}")
223+
else:
224+
_safe_print(f" [SKIP] {msg}")
225+
226+
227+
def _process_one_other_file(
228+
fs_client: azurelake.FileSystemClient,
229+
patient_id: str,
230+
remote_path: str,
231+
file_prefix: str,
232+
) -> tuple[bool, str]:
233+
"""Download one file, upload with prefix. Returns (success, out_blob_path or error)."""
234+
original_name = os.path.basename(remote_path)
235+
out_name = file_prefix + original_name
236+
out_blob_path = f"{BASE_OUTPUT}/{patient_id}/{out_name}"
237+
238+
fd = tempfile.NamedTemporaryFile(
239+
delete=False, suffix=os.path.splitext(original_name)[1] or ".bin"
240+
)
241+
try:
242+
download_path = fd.name
243+
finally:
244+
fd.close()
245+
246+
try:
247+
file_client = fs_client.get_file_client(file_path=remote_path)
248+
with open(download_path, "wb") as f:
249+
f.write(file_client.download_file().readall())
250+
except Exception as e:
251+
with suppress(FileNotFoundError):
252+
os.unlink(download_path)
253+
return False, f"Download failed {remote_path}: {e}"
254+
255+
try:
256+
out_client = fs_client.get_file_client(file_path=out_blob_path)
257+
with open(download_path, "rb") as f:
258+
out_client.upload_data(f.read(), overwrite=True)
259+
return True, out_blob_path
260+
except Exception as e:
261+
return False, f"Upload failed {out_blob_path}: {e}"
262+
finally:
263+
with suppress(FileNotFoundError):
264+
os.unlink(download_path)
188265

189266

190267
def process_other_device(fs_client, subfolder_name, file_prefix):
191268
"""
192269
Process one of topcon_maestro2, topcon_triton, zeiss_cirrus: for each patient,
193270
download each file, rename to <file_prefix><original_filename>, upload to
194-
enface-flipped-combined/{patient_id}/ (no image transform).
271+
enface-flipped-combined/{patient_id}/ (no image transform). Runs file tasks in parallel.
195272
"""
196273
input_prefix = f"{BASE_ORIGINAL}/{subfolder_name}"
197274
patient_ids = list_patient_folders(fs_client, input_prefix)
@@ -205,40 +282,31 @@ def process_other_device(fs_client, subfolder_name, file_prefix):
205282
return
206283
print(f" (filter: only patient {PATIENT_ID_FILTER})")
207284

208-
for patient_id in patient_ids:
209-
patient_path = f"{input_prefix}/{patient_id}"
210-
file_paths = list_files_in_folder(fs_client, patient_path)
211-
for remote_path in file_paths:
212-
original_name = os.path.basename(remote_path)
213-
out_name = file_prefix + original_name
214-
out_blob_path = f"{BASE_OUTPUT}/{patient_id}/{out_name}"
215-
216-
fd = tempfile.NamedTemporaryFile(
217-
delete=False, suffix=os.path.splitext(original_name)[1] or ".bin"
218-
)
219-
try:
220-
download_path = fd.name
221-
finally:
222-
fd.close()
223-
224-
try:
225-
file_client = fs_client.get_file_client(file_path=remote_path)
226-
with open(download_path, "wb") as f:
227-
f.write(file_client.download_file().readall())
228-
except Exception as e:
229-
print(f" [SKIP] Download failed {remote_path}: {e}")
230-
os.unlink(download_path)
231-
continue
232-
233-
try:
234-
out_client = fs_client.get_file_client(file_path=out_blob_path)
235-
with open(download_path, "rb") as f:
236-
out_client.upload_data(f.read(), overwrite=True)
237-
print(f" [OK] {out_blob_path}")
238-
except Exception as e:
239-
print(f" [SKIP] Upload failed {out_blob_path}: {e}")
240-
finally:
241-
os.unlink(download_path)
285+
tasks = [
286+
(patient_id, remote_path)
287+
for patient_id in patient_ids
288+
for remote_path in list_files_in_folder(
289+
fs_client, f"{input_prefix}/{patient_id}"
290+
)
291+
]
292+
293+
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
294+
futures = {
295+
executor.submit(
296+
_process_one_other_file,
297+
fs_client,
298+
patient_id,
299+
remote_path,
300+
file_prefix,
301+
): (patient_id, remote_path)
302+
for patient_id, remote_path in tasks
303+
}
304+
for future in as_completed(futures):
305+
success, msg = future.result()
306+
if success:
307+
_safe_print(f" [OK] {msg}")
308+
else:
309+
_safe_print(f" [SKIP] {msg}")
242310

243311

244312
def main():
@@ -251,7 +319,7 @@ def main():
251319
# 2) Other devices: rename only, upload to same enface-flipped-combined folder
252320
for subfolder, prefix in OTHER_DEVICES:
253321
print(f"--- {subfolder} (rename only) ---")
254-
# process_other_device(fs_client, subfolder, prefix)
322+
process_other_device(fs_client, subfolder, prefix)
255323

256324
print("Done.")
257325

0 commit comments

Comments
 (0)