5
5
import os
6
6
import re
7
7
import shutil
8
- import stat
9
8
import subprocess # nosec
10
9
import sys
11
10
import tempfile
17
16
from threading import Timer
18
17
from typing import (
19
18
IO ,
20
- Any ,
21
- AnyStr ,
22
19
Callable ,
23
20
Dict ,
24
21
Iterable ,
27
24
MutableMapping ,
28
25
MutableSequence ,
29
26
Optional ,
27
+ TextIO ,
30
28
Tuple ,
31
29
Union ,
32
30
cast ,
50
48
from .utils import (
51
49
DEFAULT_TMP_PREFIX ,
52
50
CWLObjectType ,
53
- Directory ,
51
+ CWLOutputType ,
52
+ DirectoryType ,
54
53
OutputCallbackType ,
55
54
bytes2str_in_dicts ,
56
55
copytree_with_merge ,
130
129
"""
131
130
132
131
133
- def deref_links (outputs : Any ) -> None :
134
- if isinstance (outputs , MutableMapping ):
135
- if outputs .get ("class" ) == "File" :
136
- st = os .lstat (outputs ["path" ])
137
- if stat .S_ISLNK (st .st_mode ):
138
- outputs ["basename" ] = os .path .basename (outputs ["path" ])
139
- outputs ["path" ] = os .readlink (outputs ["path" ])
140
- else :
141
- for v in outputs .values ():
142
- deref_links (v )
143
- if isinstance (outputs , MutableSequence ):
144
- for output in outputs :
145
- deref_links (output )
146
-
147
-
148
132
def relink_initialworkdir (
149
133
pathmapper : PathMapper ,
150
134
host_outdir : str ,
@@ -192,6 +176,9 @@ def neverquote(string: str, pos: int = 0, endpos: int = 0) -> Optional[Match[str
192
176
return None
193
177
194
178
179
+ CollectOutputsType = Union [Callable [[str , int ], CWLObjectType ], functools .partial ]
180
+
181
+
195
182
class JobBase (HasReqsHints , metaclass = ABCMeta ):
196
183
def __init__ (
197
184
self ,
@@ -221,9 +208,7 @@ def __init__(
221
208
self .generatemapper = None # type: Optional[PathMapper]
222
209
223
210
# set in CommandLineTool.job(i)
224
- self .collect_outputs = cast (
225
- Callable [[str , int ], MutableMapping [str , Any ]], None
226
- ) # type: Union[Callable[[str, int], MutableMapping[str, Any]], functools.partial[MutableMapping[str, Any]]]
211
+ self .collect_outputs = cast (CollectOutputsType , None )
227
212
self .output_callback = None # type: Optional[OutputCallbackType]
228
213
self .outdir = ""
229
214
self .tmpdir = ""
@@ -233,7 +218,7 @@ def __init__(
233
218
"class" : "Directory" ,
234
219
"listing" : [],
235
220
"basename" : "" ,
236
- } # type: Directory
221
+ } # type: DirectoryType
237
222
self .stagedir = None # type: Optional[str]
238
223
self .inplace_update = False
239
224
self .prov_obj = None # type: Optional[ProvenanceProfile]
@@ -269,7 +254,7 @@ def _setup(self, runtimeContext: RuntimeContext) -> None:
269
254
runtimeContext = runtimeContext .copy ()
270
255
runtimeContext .outdir = self .outdir
271
256
self .generatemapper = self .make_path_mapper (
272
- cast ( List [ Any ], self .generatefiles ["listing" ]) ,
257
+ self .generatefiles ["listing" ],
273
258
self .builder .outdir ,
274
259
runtimeContext ,
275
260
False ,
@@ -331,7 +316,7 @@ def _execute(
331
316
"or prov_obj is missing from runtimeContext: "
332
317
"{}" .format (runtimeContext )
333
318
)
334
- outputs = {} # type: MutableMapping[str,Any]
319
+ outputs = {} # type: CWLObjectType
335
320
try :
336
321
stdin_path = None
337
322
if self .stdin is not None :
@@ -361,8 +346,14 @@ def _execute(
361
346
362
347
commands = [str (x ) for x in runtime + self .command_line ]
363
348
if runtimeContext .secret_store is not None :
364
- commands = runtimeContext .secret_store .retrieve (commands )
365
- env = runtimeContext .secret_store .retrieve (env )
349
+ commands = cast (
350
+ List [str ],
351
+ runtimeContext .secret_store .retrieve (cast (CWLOutputType , commands )),
352
+ )
353
+ env = cast (
354
+ MutableMapping [str , str ],
355
+ runtimeContext .secret_store .retrieve (cast (CWLOutputType , env )),
356
+ )
366
357
367
358
job_script_contents = None # type: Optional[str]
368
359
builder = getattr (self , "builder" , None ) # type: Builder
@@ -653,10 +644,9 @@ def create_file_and_add_volume(
653
644
os .path .basename (volume .target ),
654
645
)
655
646
writable = True if volume .type == "CreateWritableFile" else False
647
+ contents = volume .resolved
656
648
if secret_store :
657
- contents = secret_store .retrieve (volume .resolved )
658
- else :
659
- contents = volume .resolved
649
+ contents = cast (str , secret_store .retrieve (volume .resolved ))
660
650
dirname = os .path .dirname (host_outdir_tgt or new_file )
661
651
if not os .path .exists (dirname ):
662
652
os .makedirs (dirname )
@@ -918,15 +908,15 @@ def _job_popen(
918
908
919
909
if job_script_contents is None and not FORCE_SHELLED_POPEN :
920
910
921
- stdin = subprocess .PIPE # type: Union[IO[Any ], int]
911
+ stdin = subprocess .PIPE # type: Union[IO[bytes ], int]
922
912
if stdin_path is not None :
923
913
stdin = open (stdin_path , "rb" )
924
914
925
- stdout = sys .stderr # type: IO[Any ]
915
+ stdout = sys .stderr # type: Union[ IO[bytes], TextIO ]
926
916
if stdout_path is not None :
927
917
stdout = open (stdout_path , "wb" )
928
918
929
- stderr = sys .stderr # type: IO[Any ]
919
+ stderr = sys .stderr # type: Union[ IO[bytes], TextIO ]
930
920
if stderr_path is not None :
931
921
stderr = open (stderr_path , "wb" )
932
922
@@ -986,7 +976,7 @@ def terminate(): # type: () -> None
986
976
job_script_contents = SHELL_COMMAND_TEMPLATE
987
977
988
978
env_copy = {}
989
- key = None # type: Any
979
+ key = None # type: Optional[str]
990
980
for key in env :
991
981
env_copy [key ] = env [key ]
992
982
0 commit comments