14
14
import shellescape
15
15
from schema_salad .ref_resolver import file_uri , uri_file_path
16
16
from schema_salad .sourceline import SourceLine , indent
17
- from typing import Any , Callable , cast , Generator , Text , Union
17
+ from typing import Any , Callable , cast , Generator , Text , Union , Dict
18
18
19
- from .builder import CONTENT_LIMIT , substitute , Builder , adjustFileObjs
20
- from .pathmapper import adjustDirObjs
19
+ from .builder import CONTENT_LIMIT , substitute , Builder
20
+ from .pathmapper import adjustFileObjs , adjustDirObjs , visit_class
21
21
from .errors import WorkflowException
22
- from .job import CommandLineJob
22
+ from .job import JobBase , CommandLineJob , DockerCommandLineJob
23
23
from .pathmapper import PathMapper , get_listing , trim_listing
24
- from .process import Process , shortname , uniquename , normalizeFilesDirs , compute_checksums
24
+ from .process import Process , shortname , uniquename , normalizeFilesDirs , compute_checksums , _logger_validation_warnings
25
25
from .stdfsaccess import StdFsAccess
26
26
from .utils import aslist
27
27
@@ -106,6 +106,8 @@ def revmap_file(builder, outdir, f):
106
106
revmap_f = builder .pathmapper .reversemap (path )
107
107
if revmap_f :
108
108
f ["location" ] = revmap_f [1 ]
109
+ elif path == builder .outdir :
110
+ f ["location" ] = outdir
109
111
elif path .startswith (builder .outdir ):
110
112
f ["location" ] = builder .fs_access .join (outdir , path [len (builder .outdir ) + 1 :])
111
113
return f
@@ -148,6 +150,7 @@ def run(self, **kwargs):
148
150
# walk over input as implicit reassignment doesn't reach everything in builder.bindings
149
151
def check_adjust (builder , f ):
150
152
# type: (Builder, Dict[Text, Any]) -> Dict[Text, Any]
153
+
151
154
f ["path" ] = builder .pathmapper .mapper (f ["location" ])[1 ]
152
155
f ["dirname" ], f ["basename" ] = os .path .split (f ["path" ])
153
156
if f ["class" ] == "File" :
@@ -156,26 +159,36 @@ def check_adjust(builder, f):
156
159
raise WorkflowException ("Invalid filename: '%s' contains illegal characters" % (f ["basename" ]))
157
160
return f
158
161
162
+ def check_valid_locations (fs_access , ob ):
163
+ if ob ["location" ].startswith ("_:" ):
164
+ pass
165
+ if ob ["class" ] == "File" and not fs_access .isfile (ob ["location" ]):
166
+ raise validate .ValidationException ("Does not exist or is not a File: '%s'" % ob ["location" ])
167
+ if ob ["class" ] == "Directory" and not fs_access .isdir (ob ["location" ]):
168
+ raise validate .ValidationException ("Does not exist or is not a Directory: '%s'" % ob ["location" ])
159
169
160
170
class CommandLineTool (Process ):
161
171
def __init__ (self , toolpath_object , ** kwargs ):
162
172
# type: (Dict[Text, Any], **Any) -> None
163
173
super (CommandLineTool , self ).__init__ (toolpath_object , ** kwargs )
164
174
165
- def makeJobRunner (self ): # type: () -> CommandLineJob
166
- return CommandLineJob ()
175
+ def makeJobRunner (self ): # type: () -> JobBase
176
+ dockerReq , _ = self .get_requirement ("DockerRequirement" )
177
+ if dockerReq :
178
+ return DockerCommandLineJob ()
179
+ else :
180
+ return CommandLineJob ()
167
181
168
182
def makePathMapper (self , reffiles , stagedir , ** kwargs ):
169
183
# type: (List[Any], Text, **Any) -> PathMapper
170
- dockerReq , _ = self .get_requirement ("DockerRequirement" )
171
184
return PathMapper (reffiles , kwargs ["basedir" ], stagedir )
172
185
173
186
def job (self ,
174
187
job_order , # type: Dict[Text, Text]
175
188
output_callbacks , # type: Callable[[Any, Any], Any]
176
189
** kwargs # type: Any
177
190
):
178
- # type: (...) -> Generator[Union[CommandLineJob , CallbackJob], None, None]
191
+ # type: (...) -> Generator[Union[JobBase , CallbackJob], None, None]
179
192
180
193
jobname = uniquename (kwargs .get ("name" , shortname (self .tool .get ("id" , "job" ))))
181
194
@@ -190,10 +203,9 @@ def job(self,
190
203
cachebuilder .stagedir ,
191
204
separateDirs = False )
192
205
_check_adjust = partial (check_adjust , cachebuilder )
193
- adjustFileObjs (cachebuilder .files , _check_adjust )
194
- adjustFileObjs (cachebuilder .bindings , _check_adjust )
195
- adjustDirObjs (cachebuilder .files , _check_adjust )
196
- adjustDirObjs (cachebuilder .bindings , _check_adjust )
206
+ visit_class ([cachebuilder .files , cachebuilder .bindings ],
207
+ ("File" , "Directory" ), _check_adjust )
208
+
197
209
cmdline = flatten (map (cachebuilder .generate_arg , cachebuilder .bindings ))
198
210
(docker_req , docker_is_req ) = self .get_requirement ("DockerRequirement" )
199
211
if docker_req and kwargs .get ("use_container" ) is not False :
@@ -290,10 +302,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
290
302
291
303
_check_adjust = partial (check_adjust , builder )
292
304
293
- adjustFileObjs (builder .files , _check_adjust )
294
- adjustFileObjs (builder .bindings , _check_adjust )
295
- adjustDirObjs (builder .files , _check_adjust )
296
- adjustDirObjs (builder .bindings , _check_adjust )
305
+ visit_class ([builder .files , builder .bindings ], ("File" , "Directory" ), _check_adjust )
297
306
298
307
if self .tool .get ("stdin" ):
299
308
with SourceLine (self .tool , "stdin" , validate .ValidationException ):
@@ -363,8 +372,39 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
363
372
ls [i ] = t ["entry" ]
364
373
j .generatefiles [u"listing" ] = ls
365
374
375
+ inplaceUpdateReq = self .get_requirement ("http://commonwl.org/cwltool#InplaceUpdateRequirement" )[0 ]
376
+
377
+ if inplaceUpdateReq :
378
+ j .inplace_update = inplaceUpdateReq ["inplaceUpdate" ]
366
379
normalizeFilesDirs (j .generatefiles )
367
380
381
+ readers = {}
382
+ muts = set ()
383
+
384
+ if builder .mutation_manager :
385
+ def register_mut (f ):
386
+ muts .add (f ["location" ])
387
+ builder .mutation_manager .register_mutation (j .name , f )
388
+
389
+ def register_reader (f ):
390
+ if f ["location" ] not in muts :
391
+ builder .mutation_manager .register_reader (j .name , f )
392
+ readers [f ["location" ]] = f
393
+
394
+ for li in j .generatefiles ["listing" ]:
395
+ li = cast (Dict [Text , Any ], li )
396
+ if li .get ("writable" ) and j .inplace_update :
397
+ adjustFileObjs (li , register_mut )
398
+ adjustDirObjs (li , register_mut )
399
+ else :
400
+ adjustFileObjs (li , register_reader )
401
+ adjustDirObjs (li , register_reader )
402
+
403
+ adjustFileObjs (builder .files , register_reader )
404
+ adjustFileObjs (builder .bindings , register_reader )
405
+ adjustDirObjs (builder .files , register_reader )
406
+ adjustDirObjs (builder .bindings , register_reader )
407
+
368
408
j .environment = {}
369
409
evr = self .get_requirement ("EnvVarRequirement" )[0 ]
370
410
if evr :
@@ -386,16 +426,17 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
386
426
j .pathmapper = builder .pathmapper
387
427
j .collect_outputs = partial (
388
428
self .collect_output_ports , self .tool ["outputs" ], builder ,
389
- compute_checksum = kwargs .get ("compute_checksum" , True ))
429
+ compute_checksum = kwargs .get ("compute_checksum" , True ),
430
+ jobname = jobname ,
431
+ readers = readers )
390
432
j .output_callback = output_callbacks
391
433
392
434
yield j
393
435
394
- def collect_output_ports (self , ports , builder , outdir , compute_checksum = True ):
395
- # type: (Set[Dict[Text, Any]], Builder, Text, bool) -> Dict[Text, Union[Text, List[Any], Dict[Text, Any]]]
436
+ def collect_output_ports (self , ports , builder , outdir , compute_checksum = True , jobname = "" , readers = None ):
437
+ # type: (Set[Dict[Text, Any]], Builder, Text, bool, Text, Dict[Text, Any] ) -> Dict[Text, Union[Text, List[Any], Dict[Text, Any]]]
396
438
ret = {} # type: Dict[Text, Union[Text, List[Any], Dict[Text, Any]]]
397
439
try :
398
-
399
440
fs_access = builder .make_fs_access (outdir )
400
441
custom_output = fs_access .join (outdir , "cwl.output.json" )
401
442
if fs_access .exists (custom_output ):
@@ -419,21 +460,27 @@ def collect_output_ports(self, ports, builder, outdir, compute_checksum=True):
419
460
% (shortname (port ["id" ]), indent (u (str (e )))))
420
461
421
462
if ret :
463
+ revmap = partial (revmap_file , builder , outdir )
422
464
adjustDirObjs (ret , trim_listing )
423
- adjustFileObjs (ret ,
424
- cast (Callable [[Any ], Any ], # known bug in mypy
425
- # https://github.com/python/mypy/issues/797
426
- partial (revmap_file , builder , outdir )))
427
- adjustFileObjs (ret , remove_path )
428
- adjustDirObjs (ret , remove_path )
465
+ visit_class (ret , ("File" , "Directory" ), cast (Callable [[Any ], Any ], revmap ))
466
+ visit_class (ret , ("File" , "Directory" ), remove_path )
429
467
normalizeFilesDirs (ret )
468
+ if builder .mutation_manager :
469
+ adjustFileObjs (ret , builder .mutation_manager .set_generation )
470
+ visit_class (ret , ("File" , "Directory" ), partial (check_valid_locations , fs_access ))
471
+
430
472
if compute_checksum :
431
473
adjustFileObjs (ret , partial (compute_checksums , fs_access ))
432
474
433
- validate .validate_ex (self .names .get_name ("outputs_record_schema" , "" ), ret )
475
+ validate .validate_ex (self .names .get_name ("outputs_record_schema" , "" ), ret ,
476
+ strict = False , logger = _logger_validation_warnings )
434
477
return ret if ret is not None else {}
435
478
except validate .ValidationException as e :
436
- raise WorkflowException ("Error validating output record, " + Text (e ) + "\n in " + json .dumps (ret , indent = 4 ))
479
+ raise WorkflowException ("Error validating output record. " + Text (e ) + "\n in " + json .dumps (ret , indent = 4 ))
480
+ finally :
481
+ if builder .mutation_manager and readers :
482
+ for r in readers .values ():
483
+ builder .mutation_manager .release_reader (jobname , r )
437
484
438
485
def collect_output (self , schema , builder , outdir , fs_access , compute_checksum = True ):
439
486
# type: (Dict[Text, Any], Builder, Text, StdFsAccess, bool) -> Union[Dict[Text, Any], List[Union[Dict[Text, Any], Text]]]
0 commit comments