Skip to content

Commit 88d656c

Browse files
stxue1dependabot[bot]mr-cadamnovak
authored
Add caching support for CWL (#5187)
* Add cachedir argument and make sure directories exist * Add a test * Do a copy when not bypassing filestore and when caching is enabled * Fix test * Add cache test * Bump mypy from 1.13.0 to 1.14.1 (#5193) * Bump mypy from 1.13.0 to 1.14.1 Bumps [mypy](https://github.com/python/mypy) from 1.13.0 to 1.14.1. - [Changelog](https://github.com/python/mypy/blob/master/CHANGELOG.md) - [Commits](python/mypy@v1.13.0...v1.14.1) --- updated-dependencies: - dependency-name: mypy dependency-type: direct:development update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> * type stubs: upgrade for newer stricter mypy 1.14 https://typing.readthedocs.io/en/latest/spec/enums.html#defining-members --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Michael R. Crusoe <michael.crusoe@gmail.com> * Update cwltool * Add cachedir argument and make sure directories exist * Add a test * Do a copy when not bypassing filestore and when caching is enabled * Fix test * Add cache test * Update options documentation and enable bypassing file store when CWL task caching is enabled * Fix CWL cachedir test * Remove dead code and add clarifications * Apply suggestions from code review Co-authored-by: Adam Novak <anovak@soe.ucsc.edu> --------- Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Michael R. Crusoe <michael.crusoe@gmail.com> Co-authored-by: Adam Novak <anovak@soe.ucsc.edu>
1 parent fba1732 commit 88d656c

File tree

5 files changed

+78
-5
lines changed

5 files changed

+78
-5
lines changed

docs/running/cliOptions.rst

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -300,10 +300,9 @@ Allows configuring Toil's data storage.
300300
copies the files into the output directory. Applies to
301301
filesystem-based job stores only. (Default=False)
302302
--caching BOOL
303-
Set caching options. This must be set to "false"
304-
to use a batch system that does not support
305-
cleanup. Set to "true" if caching
306-
is desired.
303+
Enable or disable worker level file caching. Set to "true" if
304+
caching is desired. By default, caching is enabled on supported
305+
batch systems. Does not affect CWL or WDL task caching.
307306
--symlinkJobStoreReads BOOL
308307
Allow reads and container mounts from a JobStore's
309308
shared filesystem directly via symlink. Can be turned

src/toil/cwl/cwltoil.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3599,6 +3599,7 @@ def fill_in_files(
35993599
"""
36003600
Given a mapping of filenames to Toil file IDs, replace the filename with the file IDs throughout the CWL object.
36013601
"""
3602+
36023603
def fill_in_file(filename: str) -> FileID:
36033604
"""
36043605
Return the file name's associated Toil file ID
@@ -4261,8 +4262,18 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int:
42614262
runtime_context.move_outputs = "leave"
42624263
runtime_context.rm_tmpdir = False
42634264
runtime_context.streaming_allowed = not options.disable_streaming
4265+
if options.cachedir is not None:
4266+
runtime_context.cachedir = os.path.abspath(options.cachedir)
4267+
# Automatically bypass the file store to be compatible with cwltool caching
4268+
# Otherwise, the CWL caching code makes links to temporary local copies
4269+
# of filestore files and caches those.
4270+
logger.debug("CWL task caching is turned on. Bypassing file store.")
4271+
options.bypass_file_store = True
42644272
if options.mpi_config_file is not None:
42654273
runtime_context.mpi_config = MpiConfig.load(options.mpi_config_file)
4274+
if cwltool.main.check_working_directories(runtime_context) is not None:
4275+
logger.error("Failed to create directory. If using tmpdir_prefix, tmpdir_outdir_prefix, or cachedir, consider changing directory locations.")
4276+
return 1
42664277
setattr(runtime_context, "bypass_file_store", options.bypass_file_store)
42674278
if options.bypass_file_store and options.destBucket:
42684279
# We use the file store to write to buckets, so we can't do this (yet?)

src/toil/options/common.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,8 @@ def is_within(x: Union[int, float]) -> bool:
466466
)
467467

468468
caching = file_store_options.add_mutually_exclusive_group()
469-
caching_help = "Enable or disable caching for your workflow, specifying this overrides default from job store"
469+
caching_help = ("Enable or disable worker level file caching for your workflow, specifying this overrides default from batch system. "
470+
"Does not affect CWL or WDL task caching.")
470471
caching.add_argument(
471472
"--caching",
472473
dest="caching",

src/toil/options/cwl.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,3 +419,13 @@ def add_cwl_options(parser: ArgumentParser, suppress: bool = True) -> None:
419419
type=str,
420420
help=suppress_help or "Specify a cloud bucket endpoint for output files.",
421421
)
422+
parser.add_argument(
423+
"--cachedir",
424+
type=str,
425+
help=suppress_help
426+
or "Directory to cache intermediate workflow outputs to avoid "
427+
"recomputing steps. Can be very helpful in the development and "
428+
"troubleshooting of CWL documents. This automatically bypasses the file store."
429+
" Not to be confused with --caching.",
430+
dest="cachedir"
431+
)

src/toil/test/cwl/cwlTest.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,50 @@ def path_with_bogus_rev() -> str:
703703
except subprocess.CalledProcessError:
704704
pass
705705

706+
def test_caching(self) -> None:
707+
log.info("Running CWL caching test.")
708+
from toil.cwl import cwltoil
709+
710+
outDir = self._createTempDir()
711+
cacheDir = self._createTempDir()
712+
713+
cwlDir = os.path.join(self._projectRootPath(), "src", "toil", "test", "cwl")
714+
log_path = os.path.join(outDir, "log")
715+
cmd = [
716+
"--outdir",
717+
outDir,
718+
"--jobStore",
719+
os.path.join(outDir, "jobStore"),
720+
"--clean=always",
721+
"--no-container",
722+
"--cachedir",
723+
cacheDir,
724+
os.path.join(cwlDir, "revsort.cwl"),
725+
os.path.join(cwlDir, "revsort-job.json"),
726+
]
727+
st = StringIO()
728+
ret = cwltoil.main(cmd, stdout=st)
729+
assert ret == 0
730+
# cwltool hashes certain steps into directories, ensure it exists
731+
# since cwltool caches per task and revsort has 2 cwl tasks, there should be 2 directories and 2 status files
732+
assert (len(os.listdir(cacheDir)) == 4)
733+
734+
# Rerun the workflow to ensure there is a cache hit and that we don't rerun the tools
735+
st = StringIO()
736+
cmd = [
737+
"--writeLogsFromAllJobs=True",
738+
"--writeLogs",
739+
log_path
740+
] + cmd
741+
ret = cwltoil.main(cmd, stdout=st)
742+
assert ret == 0
743+
744+
# Ensure all of the worker logs are using their cached outputs
745+
for file in os.listdir(log_path):
746+
assert "Using cached output" in open(os.path.join(log_path, file), encoding="utf-8").read()
747+
748+
749+
706750
@needs_aws_s3
707751
def test_streamable(self, extra_args: Optional[list[str]] = None) -> None:
708752
"""
@@ -1203,6 +1247,14 @@ def test_run_conformance_with_caching(self) -> None:
12031247
junit_file=os.path.join(self.rootDir, "caching-conformance-1.2.junit.xml"),
12041248
)
12051249

1250+
@slow
1251+
@pytest.mark.timeout(CONFORMANCE_TEST_TIMEOUT)
1252+
def test_run_conformance_with_task_caching(self) -> None:
1253+
self.test_run_conformance(
1254+
junit_file=os.path.join(self.rootDir, "task-caching-conformance-1.2.junit.xml"),
1255+
extra_args=["--cachedir", self._createTempDir("task_cache")]
1256+
)
1257+
12061258
@slow
12071259
@pytest.mark.timeout(CONFORMANCE_TEST_TIMEOUT)
12081260
def test_run_conformance_with_in_place_update(self) -> None:

0 commit comments

Comments
 (0)