6
6
import re
7
7
import shutil
8
8
import tempfile
9
- from six .moves import urllib
10
- from six import string_types , u
11
9
from functools import partial
10
+ from typing import Any , Callable , Dict , Generator , Optional , Text , Union , cast
11
+
12
+ from six import string_types , u
12
13
13
14
import schema_salad .validate as validate
14
15
import shellescape
15
16
from schema_salad .ref_resolver import file_uri , uri_file_path
16
17
from schema_salad .sourceline import SourceLine , indent
17
- from typing import Any , Callable , cast , Generator , Text , Union
18
+ from six . moves import urllib
18
19
19
- from .builder import CONTENT_LIMIT , substitute , Builder
20
- from .pathmapper import adjustFileObjs , adjustDirObjs , visit_class
20
+ from .builder import CONTENT_LIMIT , Builder , substitute
21
21
from .errors import WorkflowException
22
- from .job import CommandLineJob
23
- from .pathmapper import PathMapper , get_listing , trim_listing
24
- from .process import Process , shortname , uniquename , normalizeFilesDirs , compute_checksums
22
+ from .flatten import flatten
23
+ from .job import CommandLineJob , DockerCommandLineJob , JobBase
24
+ from .pathmapper import (PathMapper , adjustDirObjs , adjustFileObjs ,
25
+ get_listing , trim_listing , visit_class )
26
+ from .process import (Process , UnsupportedRequirement ,
27
+ _logger_validation_warnings , compute_checksums ,
28
+ normalizeFilesDirs , shortname , uniquename )
25
29
from .stdfsaccess import StdFsAccess
26
30
from .utils import aslist
27
31
28
32
ACCEPTLIST_EN_STRICT_RE = re .compile (r"^[a-zA-Z0-9._+-]+$" )
29
33
ACCEPTLIST_EN_RELAXED_RE = re .compile (r".*" ) # Accept anything
30
34
ACCEPTLIST_RE = ACCEPTLIST_EN_STRICT_RE
31
35
32
- from .flatten import flatten
33
36
34
37
_logger = logging .getLogger ("cwltool" )
35
38
@@ -150,6 +153,7 @@ def run(self, **kwargs):
150
153
# walk over input as implicit reassignment doesn't reach everything in builder.bindings
151
154
def check_adjust (builder , f ):
152
155
# type: (Builder, Dict[Text, Any]) -> Dict[Text, Any]
156
+
153
157
f ["path" ] = builder .pathmapper .mapper (f ["location" ])[1 ]
154
158
f ["dirname" ], f ["basename" ] = os .path .split (f ["path" ])
155
159
if f ["class" ] == "File" :
@@ -171,20 +175,28 @@ def __init__(self, toolpath_object, **kwargs):
171
175
# type: (Dict[Text, Any], **Any) -> None
172
176
super (CommandLineTool , self ).__init__ (toolpath_object , ** kwargs )
173
177
174
- def makeJobRunner (self ): # type: () -> CommandLineJob
175
- return CommandLineJob ()
178
+ def makeJobRunner (self , use_container = True ): # type: (Optional[bool]) -> JobBase
179
+ dockerReq , _ = self .get_requirement ("DockerRequirement" )
180
+ if dockerReq and use_container :
181
+ return DockerCommandLineJob ()
182
+ else :
183
+ for t in reversed (self .requirements ):
184
+ if t ["class" ] == "DockerRequirement" :
185
+ raise UnsupportedRequirement (
186
+ "--no-container, but this CommandLineTool has "
187
+ "DockerRequirement under 'requirements'." )
188
+ return CommandLineJob ()
176
189
177
190
def makePathMapper (self , reffiles , stagedir , ** kwargs ):
178
191
# type: (List[Any], Text, **Any) -> PathMapper
179
- dockerReq , _ = self .get_requirement ("DockerRequirement" )
180
192
return PathMapper (reffiles , kwargs ["basedir" ], stagedir )
181
193
182
194
def job (self ,
183
195
job_order , # type: Dict[Text, Text]
184
196
output_callbacks , # type: Callable[[Any, Any], Any]
185
197
** kwargs # type: Any
186
198
):
187
- # type: (...) -> Generator[Union[CommandLineJob , CallbackJob], None, None]
199
+ # type: (...) -> Generator[Union[JobBase , CallbackJob], None, None]
188
200
189
201
jobname = uniquename (kwargs .get ("name" , shortname (self .tool .get ("id" , "job" ))))
190
202
@@ -199,9 +211,9 @@ def job(self,
199
211
cachebuilder .stagedir ,
200
212
separateDirs = False )
201
213
_check_adjust = partial (check_adjust , cachebuilder )
202
-
203
214
visit_class ([cachebuilder .files , cachebuilder .bindings ],
204
215
("File" , "Directory" ), _check_adjust )
216
+
205
217
cmdline = flatten (map (cachebuilder .generate_arg , cachebuilder .bindings ))
206
218
(docker_req , docker_is_req ) = self .get_requirement ("DockerRequirement" )
207
219
if docker_req and kwargs .get ("use_container" ) is not False :
@@ -264,7 +276,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
264
276
265
277
reffiles = copy .deepcopy (builder .files )
266
278
267
- j = self .makeJobRunner ()
279
+ j = self .makeJobRunner (kwargs . get ( "use_container" ) )
268
280
j .builder = builder
269
281
j .joborder = builder .job
270
282
j .stdin = None
@@ -368,8 +380,39 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
368
380
ls [i ] = t ["entry" ]
369
381
j .generatefiles [u"listing" ] = ls
370
382
383
+ inplaceUpdateReq = self .get_requirement ("http://commonwl.org/cwltool#InplaceUpdateRequirement" )[0 ]
384
+
385
+ if inplaceUpdateReq :
386
+ j .inplace_update = inplaceUpdateReq ["inplaceUpdate" ]
371
387
normalizeFilesDirs (j .generatefiles )
372
388
389
+ readers = {}
390
+ muts = set ()
391
+
392
+ if builder .mutation_manager :
393
+ def register_mut (f ):
394
+ muts .add (f ["location" ])
395
+ builder .mutation_manager .register_mutation (j .name , f )
396
+
397
+ def register_reader (f ):
398
+ if f ["location" ] not in muts :
399
+ builder .mutation_manager .register_reader (j .name , f )
400
+ readers [f ["location" ]] = f
401
+
402
+ for li in j .generatefiles ["listing" ]:
403
+ li = cast (Dict [Text , Any ], li )
404
+ if li .get ("writable" ) and j .inplace_update :
405
+ adjustFileObjs (li , register_mut )
406
+ adjustDirObjs (li , register_mut )
407
+ else :
408
+ adjustFileObjs (li , register_reader )
409
+ adjustDirObjs (li , register_reader )
410
+
411
+ adjustFileObjs (builder .files , register_reader )
412
+ adjustFileObjs (builder .bindings , register_reader )
413
+ adjustDirObjs (builder .files , register_reader )
414
+ adjustDirObjs (builder .bindings , register_reader )
415
+
373
416
j .environment = {}
374
417
evr = self .get_requirement ("EnvVarRequirement" )[0 ]
375
418
if evr :
@@ -391,16 +434,17 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
391
434
j .pathmapper = builder .pathmapper
392
435
j .collect_outputs = partial (
393
436
self .collect_output_ports , self .tool ["outputs" ], builder ,
394
- compute_checksum = kwargs .get ("compute_checksum" , True ))
437
+ compute_checksum = kwargs .get ("compute_checksum" , True ),
438
+ jobname = jobname ,
439
+ readers = readers )
395
440
j .output_callback = output_callbacks
396
441
397
442
yield j
398
443
399
- def collect_output_ports (self , ports , builder , outdir , compute_checksum = True ):
400
- # type: (Set[Dict[Text, Any]], Builder, Text, bool) -> Dict[Text, Union[Text, List[Any], Dict[Text, Any]]]
444
+ def collect_output_ports (self , ports , builder , outdir , compute_checksum = True , jobname = "" , readers = None ):
445
+ # type: (Set[Dict[Text, Any]], Builder, Text, bool, Text, Dict[Text, Any] ) -> Dict[Text, Union[Text, List[Any], Dict[Text, Any]]]
401
446
ret = {} # type: Dict[Text, Union[Text, List[Any], Dict[Text, Any]]]
402
447
try :
403
-
404
448
fs_access = builder .make_fs_access (outdir )
405
449
custom_output = fs_access .join (outdir , "cwl.output.json" )
406
450
if fs_access .exists (custom_output ):
@@ -429,14 +473,22 @@ def collect_output_ports(self, ports, builder, outdir, compute_checksum=True):
429
473
visit_class (ret , ("File" , "Directory" ), cast (Callable [[Any ], Any ], revmap ))
430
474
visit_class (ret , ("File" , "Directory" ), remove_path )
431
475
normalizeFilesDirs (ret )
476
+ if builder .mutation_manager :
477
+ adjustFileObjs (ret , builder .mutation_manager .set_generation )
432
478
visit_class (ret , ("File" , "Directory" ), partial (check_valid_locations , fs_access ))
479
+
433
480
if compute_checksum :
434
481
adjustFileObjs (ret , partial (compute_checksums , fs_access ))
435
482
436
- validate .validate_ex (self .names .get_name ("outputs_record_schema" , "" ), ret )
483
+ validate .validate_ex (self .names .get_name ("outputs_record_schema" , "" ), ret ,
484
+ strict = False , logger = _logger_validation_warnings )
437
485
return ret if ret is not None else {}
438
486
except validate .ValidationException as e :
439
487
raise WorkflowException ("Error validating output record. " + Text (e ) + "\n in " + json .dumps (ret , indent = 4 ))
488
+ finally :
489
+ if builder .mutation_manager and readers :
490
+ for r in readers .values ():
491
+ builder .mutation_manager .release_reader (jobname , r )
440
492
441
493
def collect_output (self , schema , builder , outdir , fs_access , compute_checksum = True ):
442
494
# type: (Dict[Text, Any], Builder, Text, StdFsAccess, bool) -> Union[Dict[Text, Any], List[Union[Dict[Text, Any], Text]]]
0 commit comments