diff --git a/cwltool/job.py b/cwltool/job.py index e617fcd96..210c79bd3 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -693,6 +693,9 @@ def add_volumes( any_path_okay: bool = False, ) -> None: """Append volume mappings to the runtime option list.""" + stage_source_dir = os.environ.get( + "STAGE_SRC_DIR", os.path.join(tempfile.gettempdir(), "cwl-stg-src-dir") + ) container_outdir = self.builder.outdir for key, vol in (itm for itm in pathmapper.items() if itm[1].staged): host_outdir_tgt = None # type: Optional[str] @@ -706,6 +709,8 @@ def add_volumes( "the designated output directory, also know as " "$(runtime.outdir): {}".format(vol) ) + if stage_source_dir and vol.resolved.startswith(stage_source_dir): + continue # path is already staged; only mount the host directory (at the end of this function) if vol.type in ("File", "Directory"): self.add_file_or_directory_volume(runtime, vol, host_outdir_tgt) elif vol.type == "WritableFile": @@ -721,6 +726,11 @@ def add_volumes( runtime, vol, host_outdir_tgt, secret_store, tmpdir_prefix ) pathmapper.update(key, new_path, vol.target, vol.type, vol.staged) + if stage_source_dir and pathmapper.stagedir != container_outdir: + # mount a single host directory for all staged input files + self.append_volume( + runtime, stage_source_dir, pathmapper.stagedir, writable=True + ) def run( self, diff --git a/cwltool/pathmapper.py b/cwltool/pathmapper.py index c89fac5f4..faa8aa517 100644 --- a/cwltool/pathmapper.py +++ b/cwltool/pathmapper.py @@ -1,6 +1,8 @@ import collections import logging import os +import tempfile +import shutil import stat import urllib import uuid @@ -93,7 +95,10 @@ def visit( basedir: str, copy: bool = False, staged: bool = False, + stage_source_dir: Optional[str] = None, ) -> None: + if stage_source_dir: + os.makedirs(stage_source_dir, exist_ok=True) stagedir = cast(Optional[str], obj.get("dirname")) or stagedir tgt = os.path.join( stagedir, @@ -150,7 +155,15 @@ def visit( else os.path.join(os.path.dirname(deref), rl) ) st = os.lstat(deref) - + if stage_source_dir: + staged_source_file = os.path.join( + stage_source_dir, os.path.basename(deref) + ) + try: + os.link(deref, staged_source_file) + except OSError: + shutil.copyfile(deref, staged_source_file) + deref = staged_source_file self._pathmap[path] = MapperEnt( deref, tgt, "WritableFile" if copy else "File", staged ) @@ -163,19 +176,31 @@ def visit( ) def setup(self, referenced_files: List[CWLObjectType], basedir: str) -> None: - # Go through each file and set the target to its own directory along # with any secondary files. stagedir = self.stagedir + stage_source_dir = os.environ.get( + "STAGE_SRC_DIR", os.path.join(tempfile.gettempdir(), "cwl-stg-src-dir") + ) for fob in referenced_files: + staging_uuid = str(uuid.uuid4()) if self.separateDirs: - stagedir = os.path.join(self.stagedir, "stg%s" % uuid.uuid4()) + # this is what the path will be inside of the container environment + stagedir = os.path.join(self.stagedir, "stg%s" % staging_uuid) + # if STAGE_SRC_DIR is set, this is where input paths will be linked/staged at + unique_stage_source_dir = None + if stage_source_dir: + unique_stage_source_dir = os.path.join( + stage_source_dir, "stg%s" % staging_uuid + ) + os.makedirs(stage_source_dir, exist_ok=True) self.visit( fob, stagedir, basedir, copy=cast(bool, fob.get("writable", False)), staged=True, + stage_source_dir=unique_stage_source_dir, ) def mapper(self, src: str) -> MapperEnt: