18
18
19
19
import shellescape
20
20
from prov .model import PROV
21
- from schema_salad .sourceline import SourceLine
22
21
from six import PY2 , with_metaclass
23
22
from typing_extensions import (TYPE_CHECKING , # pylint: disable=unused-import
24
23
Text )
25
- # move to a regular typing import when Python 3.3-3.6 is no longer supported
24
+ from schema_salad . sourceline import SourceLine
26
25
27
26
from .builder import Builder , HasReqsHints # pylint: disable=unused-import
28
27
from .context import RuntimeContext # pylint: disable=unused-import
29
28
from .context import getdefault
30
29
from .errors import WorkflowException
31
30
from .loghandler import _logger
32
- from .pathmapper import (MapperEnt , PathMapper , ensure_writable ,
33
- ensure_non_writable )
34
- from .process import UnsupportedRequirement , stageFiles
31
+ from .pathmapper import (MapperEnt , PathMapper , # pylint: disable=unused-import
32
+ ensure_writable , ensure_non_writable )
33
+ from .process import UnsupportedRequirement , stage_files
35
34
from .secrets import SecretStore # pylint: disable=unused-import
36
- from .utils import \
37
- bytes2str_in_dicts # pylint: disable=unused-import; pylint: disable=unused-import
38
- from .utils import (DEFAULT_TMP_PREFIX , Directory , copytree_with_merge ,
39
- json_dump , json_dumps , onWindows , processes_to_kill ,
40
- subprocess )
35
+ from .utils import (DEFAULT_TMP_PREFIX , Directory , bytes2str_in_dicts ,
36
+ copytree_with_merge , json_dump , json_dumps , onWindows ,
37
+ processes_to_kill , subprocess )
41
38
42
39
if TYPE_CHECKING :
43
- from .provenance import CreateProvProfile # pylint: disable=unused-import
40
+ from .provenance import ProvenanceProfile # pylint: disable=unused-import
44
41
needs_shell_quoting_re = re .compile (r"""(^$|[\s|&;()<>\'"$@])""" )
45
42
46
43
FORCE_SHELLED_POPEN = os .getenv ("CWLTOOL_FORCE_SHELL_POPEN" , "0" ) == "1"
@@ -194,8 +191,8 @@ def __init__(self,
194
191
self .generatefiles = {"class" : "Directory" , "listing" : [], "basename" : "" } # type: Directory
195
192
self .stagedir = None # type: Optional[Text]
196
193
self .inplace_update = False
197
- self .prov_obj = None # type: Optional[CreateProvProfile ]
198
- self .parent_wf = None # type: Optional[CreateProvProfile ]
194
+ self .prov_obj = None # type: Optional[ProvenanceProfile ]
195
+ self .parent_wf = None # type: Optional[ProvenanceProfile ]
199
196
self .timelimit = None # type: Optional[int]
200
197
self .networkaccess = False # type: bool
201
198
@@ -265,7 +262,6 @@ def _execute(self,
265
262
else :
266
263
stdin_path = rmap [1 ]
267
264
268
-
269
265
stderr_path = None
270
266
if self .stderr is not None :
271
267
abserr = os .path .join (self .outdir , self .stderr )
@@ -344,11 +340,8 @@ def _execute(self,
344
340
if runtimeContext .research_obj is not None and self .prov_obj is not None and \
345
341
runtimeContext .process_run_id is not None :
346
342
#creating entities for the outputs produced by each step (in the provenance document)
347
- self .prov_obj .generate_output_prov (
348
- outputs , runtimeContext .process_run_id , str (self .name ))
349
- self .prov_obj .document .wasEndedBy (
350
- runtimeContext .process_run_id , None , self .prov_obj .workflow_run_uri ,
351
- datetime .datetime .now ())
343
+ self .prov_obj .record_process_end (str (self .name ), runtimeContext .process_run_id ,
344
+ outputs , datetime .datetime .now ())
352
345
if processStatus != "success" :
353
346
_logger .warning (u"[job %s] completed %s" , self .name , processStatus )
354
347
else :
@@ -381,7 +374,7 @@ def _execute(self,
381
374
_logger .debug (u"[job %s] Removing input staging directory %s" , self .name , self .stagedir )
382
375
shutil .rmtree (self .stagedir , True )
383
376
384
- if runtimeContext .rm_tmpdir is not None :
377
+ if runtimeContext .rm_tmpdir :
385
378
_logger .debug (u"[job %s] Removing temporary directory %s" , self .name , self .tmpdir )
386
379
shutil .rmtree (self .tmpdir , True )
387
380
@@ -410,14 +403,17 @@ def run(self,
410
403
if "PATH" not in env :
411
404
env ["PATH" ] = str (os .environ ["PATH" ]) if onWindows () else os .environ ["PATH" ]
412
405
if "SYSTEMROOT" not in env and "SYSTEMROOT" in os .environ :
413
- env ["SYSTEMROOT" ] = str (os .environ ["SYSTEMROOT" ]) if onWindows () else os .environ ["SYSTEMROOT" ]
406
+ env ["SYSTEMROOT" ] = str (os .environ ["SYSTEMROOT" ]) if onWindows () \
407
+ else os .environ ["SYSTEMROOT" ]
414
408
415
- stageFiles (self .pathmapper , ignoreWritable = True , symLink = True , secret_store = runtimeContext .secret_store )
409
+ stage_files (self .pathmapper , ignore_writable = True , symlink = True ,
410
+ secret_store = runtimeContext .secret_store )
416
411
if self .generatemapper is not None :
417
- stageFiles (self .generatemapper , ignoreWritable = self .inplace_update ,
418
- symLink = True , secret_store = runtimeContext .secret_store )
419
- relink_initialworkdir (self .generatemapper , self .outdir ,
420
- self .builder .outdir , inplace_update = self .inplace_update )
412
+ stage_files (self .generatemapper , ignore_writable = self .inplace_update ,
413
+ symlink = True , secret_store = runtimeContext .secret_store )
414
+ relink_initialworkdir (
415
+ self .generatemapper , self .outdir , self .builder .outdir ,
416
+ inplace_update = self .inplace_update )
421
417
422
418
self ._execute ([], env , runtimeContext )
423
419
@@ -481,12 +477,12 @@ def create_file_and_add_volume(self,
481
477
volume , # type: MapperEnt
482
478
host_outdir_tgt , # type: Optional[Text]
483
479
secret_store # type: Optional[SecretStore]
484
- ): # type: (...) -> None
480
+ ): # type: (...) -> Text
485
481
"""Create the file and add a mapping."""
486
482
if not host_outdir_tgt :
487
483
new_file = os .path .join (
488
484
tempfile .mkdtemp (dir = self .tmpdir ),
489
- os .path .basename (volume .resolved ))
485
+ os .path .basename (volume .target ))
490
486
writable = True if volume .type == "CreateWritableFile" else False
491
487
if secret_store :
492
488
contents = secret_store .retrieve (volume .resolved )
@@ -504,7 +500,7 @@ def create_file_and_add_volume(self,
504
500
ensure_writable (host_outdir_tgt or new_file )
505
501
else :
506
502
ensure_non_writable (host_outdir_tgt or new_file )
507
-
503
+ return host_outdir_tgt or new_file
508
504
509
505
510
506
def add_volumes (self ,
@@ -516,7 +512,7 @@ def add_volumes(self,
516
512
"""Append volume mappings to the runtime option list."""
517
513
518
514
container_outdir = self .builder .outdir
519
- for vol in (itm [ 1 ] for itm in pathmapper .items () if itm [1 ].staged ):
515
+ for key , vol in (itm for itm in pathmapper .items () if itm [1 ].staged ):
520
516
host_outdir_tgt = None # type: Optional[Text]
521
517
if vol .target .startswith (container_outdir + "/" ):
522
518
host_outdir_tgt = os .path .join (
@@ -535,11 +531,12 @@ def add_volumes(self,
535
531
self .add_writable_directory_volume (
536
532
runtime , vol , host_outdir_tgt )
537
533
elif vol .type in ["CreateFile" , "CreateWritableFile" ]:
538
- self .create_file_and_add_volume (
534
+ new_path = self .create_file_and_add_volume (
539
535
runtime , vol , host_outdir_tgt , secret_store )
536
+ pathmapper .update (
537
+ key , new_path , vol .target , vol .type , vol .staged )
540
538
541
- def run (self , runtimeContext ):
542
- # type: (RuntimeContext) -> None
539
+ def run (self , runtimeContext ): # type: (RuntimeContext) -> None
543
540
if not os .path .exists (self .tmpdir ):
544
541
os .makedirs (self .tmpdir )
545
542
(docker_req , docker_is_req ) = self .get_requirement ("DockerRequirement" )
0 commit comments