Skip to content

Commit 62ae257

Browse files
author
Peter Amstutz
authored
Job cache fix (#1137)
* Fix local job caching, manage status with file locking. * Locking with fcntl.flock or mscvrt.locking
1 parent 3e9bca4 commit 62ae257

File tree

2 files changed

+56
-11
lines changed

2 files changed

+56
-11
lines changed

cwltool/command_line_tool.py

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@
5252
from .stdfsaccess import StdFsAccess # pylint: disable=unused-import
5353
from .utils import (aslist, convert_pathsep_to_unix,
5454
docker_windows_path_adjust, json_dumps, onWindows,
55-
random_outdir, windows_default_container_id)
55+
random_outdir, windows_default_container_id,
56+
shared_file_lock, upgrade_lock)
5657
if TYPE_CHECKING:
5758
from .provenance import ProvenanceProfile # pylint: disable=unused-import
5859

@@ -353,8 +354,9 @@ def job(self,
353354

354355
interesting = {"DockerRequirement",
355356
"EnvVarRequirement",
356-
"CreateFileRequirement",
357-
"ShellCommandRequirement"}
357+
"InitialWorkDirRequirement",
358+
"ShellCommandRequirement",
359+
"NetworkAccess"}
358360
for rh in (self.original_requirements, self.original_hints):
359361
for r in reversed(rh):
360362
if r["class"] in interesting and r["class"] not in keydict:
@@ -369,34 +371,55 @@ def job(self,
369371
keydictstr, cachekey)
370372

371373
jobcache = os.path.join(runtimeContext.cachedir, cachekey)
372-
jobcachepending = "{}.{}.pending".format(
373-
jobcache, threading.current_thread().ident)
374374

375-
if os.path.isdir(jobcache) and not os.path.isfile(jobcachepending):
375+
# Create a lockfile to manage cache status.
376+
jobcachepending = "{}.status".format(jobcache)
377+
jobcachelock = None
378+
jobstatus = None
379+
380+
# Opens the file for read/write, or creates an empty file.
381+
jobcachelock = open(jobcachepending, "a+")
382+
383+
# get the shared lock to ensure no other process is trying
384+
# to write to this cache
385+
shared_file_lock(jobcachelock)
386+
jobcachelock.seek(0)
387+
jobstatus = jobcachelock.read()
388+
389+
if os.path.isdir(jobcache) and jobstatus == "success":
376390
if docker_req and runtimeContext.use_container:
377391
cachebuilder.outdir = runtimeContext.docker_outdir or random_outdir()
378392
else:
379393
cachebuilder.outdir = jobcache
380394

381395
_logger.info("[job %s] Using cached output in %s", jobname, jobcache)
382396
yield CallbackJob(self, output_callbacks, cachebuilder, jobcache)
397+
# we're done with the cache so release lock
398+
jobcachelock.close()
383399
return
384400
else:
385401
_logger.info("[job %s] Output of job will be cached in %s", jobname, jobcache)
402+
403+
# turn shared lock into an exclusive lock since we'll
404+
# be writing the cache directory
405+
upgrade_lock(jobcachelock)
406+
386407
shutil.rmtree(jobcache, True)
387408
os.makedirs(jobcache)
388409
runtimeContext = runtimeContext.copy()
389410
runtimeContext.outdir = jobcache
390-
open(jobcachepending, "w").close()
391411

392-
def rm_pending_output_callback(output_callbacks, jobcachepending,
412+
def update_status_output_callback(output_callbacks, jobcachelock,
393413
outputs, processStatus):
394-
if processStatus == "success":
395-
os.remove(jobcachepending)
414+
# save status to the lockfile then release the lock
415+
jobcachelock.seek(0)
416+
jobcachelock.truncate()
417+
jobcachelock.write(processStatus)
418+
jobcachelock.close()
396419
output_callbacks(outputs, processStatus)
397420

398421
output_callbacks = partial(
399-
rm_pending_output_callback, output_callbacks, jobcachepending)
422+
update_status_output_callback, output_callbacks, jobcachelock)
400423

401424
builder = self._init_job(job_order, runtimeContext)
402425

cwltool/utils.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,3 +240,25 @@ def random_outdir(): # type: () -> Text
240240
if not hasattr(random_outdir, 'outdir'):
241241
random_outdir.outdir = '/' + ''.join([random.choice(string.ascii_letters) for _ in range(6)]) # type: ignore # nosec
242242
return random_outdir.outdir # type: ignore
243+
244+
245+
#
246+
# Simple multi-platform (fcntl/msvrt) file locking wrapper
247+
#
248+
try:
249+
import fcntl # type: ignore
250+
251+
def shared_file_lock(fd): # type: (IO) -> None
252+
fcntl.flock(fd.fileno(), fcntl.LOCK_SH) # type: ignore
253+
254+
def upgrade_lock(fd): # type: (IO) -> None
255+
fcntl.flock(fd.fileno(), fcntl.LOCK_EX) # type: ignore
256+
257+
except ImportError:
258+
import msvcrt # type: ignore
259+
260+
def shared_file_lock(fd): # type: (IO) -> None
261+
msvcrt.locking(fd.fileno(), msvcrt.LK_LOCK, 1024) # type: ignore
262+
263+
def upgrade_lock(fd): # type: (IO) -> None
264+
pass

0 commit comments

Comments
 (0)