Skip to content

Commit 9925e9c

Browse files
committed
handle partially written result files and lockfile getting deleted before they are detected
1 parent eca83ad commit 9925e9c

File tree

3 files changed

+30
-5
lines changed

3 files changed

+30
-5
lines changed

pydra/engine/job.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class Job(ty.Generic[TaskType]):
8585
bindings: dict[str, ty.Any] | None = None # Bindings for the job environment
8686

8787
_inputs: dict[str, ty.Any] | None = None
88+
_run_start_time: datetime | None
8889

8990
def __init__(
9091
self,
@@ -145,6 +146,7 @@ def __init__(
145146
self.audit = submitter.audit
146147
self.cache_dir = submitter.cache_dir
147148
self.cache_locations = submitter.cache_locations
149+
self._run_start_time = None
148150

149151
@property
150152
def cache_dir(self):
@@ -430,9 +432,12 @@ def done(self):
430432
@property
431433
def run_start_time(self) -> datetime | None:
432434
"""Check whether the job is currently running."""
435+
if self._run_start_time is not None:
436+
return self._run_start_time
433437
if not self.lockfile.exists():
434438
return None
435-
return datetime.fromtimestamp(self.lockfile.stat().st_ctime)
439+
self._run_start_time = datetime.fromtimestamp(self.lockfile.stat().st_ctime)
440+
return self._run_start_time
436441

437442
def _combined_output(self, return_inputs=False):
438443
combined_results = []

pydra/engine/result.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
from pathlib import Path
44
import typing as ty
55
import attrs
6+
import pickle
7+
import time
68
import os
79
import cloudpickle as cp
810
import getpass
@@ -127,7 +129,12 @@ class RuntimeSpec:
127129
network: bool = False
128130

129131

130-
def load_result(checksum, cache_locations):
132+
def load_result(
133+
checksum: str,
134+
cache_locations: list[Path],
135+
retries: int = 10,
136+
polling_interval: float = 0.1,
137+
) -> Result | None:
131138
"""
132139
Restore a result from the cache.
133140
@@ -139,6 +146,11 @@ def load_result(checksum, cache_locations):
139146
List of cache directories, in order of priority, where
140147
the checksum will be looked for.
141148
149+
Returns
150+
-------
151+
result : :obj:`Result` | None
152+
The result object if found, otherwise None.
153+
142154
"""
143155
if not cache_locations:
144156
return None
@@ -148,8 +160,16 @@ def load_result(checksum, cache_locations):
148160
if (location / checksum).exists():
149161
result_file = location / checksum / "_result.pklz"
150162
if result_file.exists() and result_file.stat().st_size > 0:
151-
with open(result_file, "rb") as fp:
152-
return cp.load(fp)
163+
# Load the result file, retrying if necessary while waiting for the file
164+
# to be written completely.
165+
for _ in range(retries):
166+
try:
167+
with open(result_file, "rb") as fp:
168+
return cp.load(fp)
169+
except (pickle.UnpicklingError, EOFError):
170+
# if the file is not finished writing
171+
# wait and retry
172+
time.sleep(polling_interval)
153173
return None
154174
return None
155175

pydra/engine/submitter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -738,7 +738,7 @@ def update_status(self) -> None:
738738
job.run_start_time,
739739
)
740740
# Check to see if any previously running tasks have completed
741-
for index, (job, start_time) in list(self.running.items()):
741+
for index, (job, _) in list(self.running.items()):
742742
if job.done:
743743
self.successful[job.state_index] = self.running.pop(index)[0]
744744
elif job.errored:

0 commit comments

Comments
 (0)