@@ -123,18 +123,18 @@ def deref_links(outputs): # type: (Any) -> None
123
123
deref_links (output )
124
124
125
125
126
- def relink_initialworkdir (pathmapper , # type: PathMapper
127
- host_outdir , # type: Text
128
- container_outdir , # type: Text
126
+ def relink_initialworkdir (pathmapper , # type: PathMapper
127
+ host_outdir , # type: Text
128
+ container_outdir , # type: Text
129
129
inplace_update = False # type: bool
130
130
): # type: (...) -> None
131
131
for _ , vol in pathmapper .items ():
132
132
if not vol .staged :
133
133
continue
134
134
135
135
if (vol .type in ("File" , "Directory" ) or (
136
- inplace_update and vol .type in
137
- ("WritableFile" , "WritableDirectory" ))):
136
+ inplace_update and vol .type in
137
+ ("WritableFile" , "WritableDirectory" ))):
138
138
if not vol .target .startswith (container_outdir ):
139
139
# this is an input file written outside of the working
140
140
# directory, so therefor ineligable for being an output file.
@@ -162,12 +162,12 @@ def relink_initialworkdir(pathmapper, # type: PathMapper
162
162
163
163
class JobBase (with_metaclass (ABCMeta , HasReqsHints )):
164
164
def __init__ (self ,
165
- builder , # type: Builder
166
- joborder , # type: Dict[Text, Union[Dict[Text, Any], List, Text, None]]
165
+ builder , # type: Builder
166
+ joborder , # type: Dict[Text, Union[Dict[Text, Any], List, Text, None]]
167
167
make_path_mapper , # type: Callable[..., PathMapper]
168
- requirements , # type: List[Dict[Text, Text]]
169
- hints , # type: List[Dict[Text, Text]]
170
- name , # type: Text
168
+ requirements , # type: List[Dict[Text, Text]]
169
+ hints , # type: List[Dict[Text, Text]]
170
+ name , # type: Text
171
171
): # type: (...) -> None
172
172
self .builder = builder
173
173
self .joborder = joborder
@@ -231,9 +231,9 @@ def _setup(self, runtimeContext): # type: (RuntimeContext) -> None
231
231
for p in self .generatemapper .files ()}, indent = 4 ))
232
232
233
233
def _execute (self ,
234
- runtime , # type: List[Text]
235
- env , # type: MutableMapping[Text, Text]
236
- runtimeContext , # type: RuntimeContext
234
+ runtime , # type: List[Text]
235
+ env , # type: MutableMapping[Text, Text]
236
+ runtimeContext , # type: RuntimeContext
237
237
monitor_function = None # type: Optional[Callable]
238
238
): # type: (...) -> None
239
239
@@ -341,8 +341,9 @@ def _execute(self,
341
341
except Exception as e :
342
342
_logger .exception (u"Exception while running job" )
343
343
processStatus = "permanentFail"
344
- if runtimeContext .research_obj is not None and self .prov_obj is not None and \
345
- runtimeContext .process_run_id is not None :
344
+ if runtimeContext .research_obj is not None \
345
+ and self .prov_obj is not None \
346
+ and runtimeContext .process_run_id is not None :
346
347
# creating entities for the outputs produced by each step (in the provenance document)
347
348
self .prov_obj .record_process_end (str (self .name ), runtimeContext .process_run_id ,
348
349
outputs , datetime .datetime .now ())
@@ -431,16 +432,16 @@ class ContainerCommandLineJob(with_metaclass(ABCMeta, JobBase)):
431
432
432
433
@abstractmethod
433
434
def get_from_requirements (self ,
434
- r , # type: Dict[Text, Text]
435
- pull_image , # type: bool
436
- force_pull = False , # type: bool
435
+ r , # type: Dict[Text, Text]
436
+ pull_image , # type: bool
437
+ force_pull = False , # type: bool
437
438
tmp_outdir_prefix = DEFAULT_TMP_PREFIX # type: Text
438
439
): # type: (...) -> Optional[Text]
439
440
pass
440
441
441
442
@abstractmethod
442
443
def create_runtime (self ,
443
- env , # type: MutableMapping[Text, Text]
444
+ env , # type: MutableMapping[Text, Text]
444
445
runtime_context # type: RuntimeContext
445
446
): # type: (...) -> Tuple[List[Text], Optional[Text]]
446
447
""" Return the list of commands to run the selected container engine."""
@@ -455,39 +456,39 @@ def append_volume(runtime, source, target, writable=False):
455
456
456
457
@abstractmethod
457
458
def add_file_or_directory_volume (self ,
458
- runtime , # type: List[Text]
459
- volume , # type: MapperEnt
459
+ runtime , # type: List[Text]
460
+ volume , # type: MapperEnt
460
461
host_outdir_tgt # type: Optional[Text]
461
462
): # type: (...) -> None
462
463
"""Append volume a file/dir mapping to the runtime option list."""
463
464
pass
464
465
465
466
@abstractmethod
466
467
def add_writable_file_volume (self ,
467
- runtime , # type: List[Text]
468
- volume , # type: MapperEnt
468
+ runtime , # type: List[Text]
469
+ volume , # type: MapperEnt
469
470
host_outdir_tgt , # type: Optional[Text]
470
- tmpdir_prefix # type: Text
471
+ tmpdir_prefix # type: Text
471
472
): # type: (...) -> None
472
473
"""Append a writable file mapping to the runtime option list."""
473
474
pass
474
475
475
476
@abstractmethod
476
477
def add_writable_directory_volume (self ,
477
- runtime , # type: List[Text]
478
- volume , # type: MapperEnt
478
+ runtime , # type: List[Text]
479
+ volume , # type: MapperEnt
479
480
host_outdir_tgt , # type: Optional[Text]
480
- tmpdir_prefix # type: Text
481
+ tmpdir_prefix # type: Text
481
482
): # type: (...) -> None
482
483
"""Append a writable directory mapping to the runtime option list."""
483
484
pass
484
485
485
486
def create_file_and_add_volume (self ,
486
- runtime , # type: List[Text]
487
- volume , # type: MapperEnt
487
+ runtime , # type: List[Text]
488
+ volume , # type: MapperEnt
488
489
host_outdir_tgt , # type: Optional[Text]
489
- secret_store , # type: Optional[SecretStore]
490
- tmpdir_prefix # type: Text
490
+ secret_store , # type: Optional[SecretStore]
491
+ tmpdir_prefix # type: Text
491
492
): # type: (...) -> Text
492
493
"""Create the file and add a mapping."""
493
494
if not host_outdir_tgt :
@@ -514,10 +515,10 @@ def create_file_and_add_volume(self,
514
515
return host_outdir_tgt or new_file
515
516
516
517
def add_volumes (self ,
517
- pathmapper , # type: PathMapper
518
- runtime , # type: List[Text]
519
- tmpdir_prefix , # type: Text
520
- secret_store = None , # type: Optional[SecretStore]
518
+ pathmapper , # type: PathMapper
519
+ runtime , # type: List[Text]
520
+ tmpdir_prefix , # type: Text
521
+ secret_store = None , # type: Optional[SecretStore]
521
522
any_path_okay = False # type: bool
522
523
): # type: (...) -> None
523
524
"""Append volume mappings to the runtime option list."""
@@ -543,7 +544,10 @@ def add_volumes(self,
543
544
self .add_writable_directory_volume (
544
545
runtime , vol , host_outdir_tgt , tmpdir_prefix )
545
546
elif vol .type in ["CreateFile" , "CreateWritableFile" ]:
546
- self .create_file_and_add_volume (runtime , vol , host_outdir_tgt , secret_store , tmpdir_prefix )
547
+ new_path = self .create_file_and_add_volume (
548
+ runtime , vol , host_outdir_tgt , secret_store , tmpdir_prefix )
549
+ pathmapper .update (
550
+ key , new_path , vol .target , vol .type , vol .staged )
547
551
548
552
def run (self , runtimeContext ): # type: (RuntimeContext) -> None
549
553
if not os .path .exists (self .tmpdir ):
@@ -615,7 +619,8 @@ def run(self, runtimeContext): # type: (RuntimeContext) -> None
615
619
monitor_function = None
616
620
if cidfile :
617
621
monitor_function = functools .partial (
618
- self .docker_monitor , cidfile , runtimeContext .tmpdir_prefix )
622
+ self .docker_monitor , cidfile , runtimeContext .tmpdir_prefix ,
623
+ not bool (runtimeContext .cidfile_dir ))
619
624
self ._execute (runtime , env , runtimeContext , monitor_function )
620
625
621
626
@staticmethod
@@ -624,7 +629,7 @@ def docker_get_memory(cid): # type: (Text) -> int
624
629
try :
625
630
memory = subprocess .check_output (
626
631
['docker' , 'inspect' , '--type' , 'container' , '--format' ,
627
- '{{.HostConfig.Memory}}' , cid ])
632
+ '{{.HostConfig.Memory}}' , cid ], stderr = subprocess . DEVNULL ) # type: ignore
628
633
except subprocess .CalledProcessError :
629
634
pass
630
635
if memory :
@@ -633,11 +638,21 @@ def docker_get_memory(cid): # type: (Text) -> int
633
638
return value
634
639
return psutil .virtual_memory ().total
635
640
636
- def docker_monitor (self , cidfile , tmpdir_prefix , process ):
637
- # type: (Text, Text, subprocess.Popen) -> None
641
+ def docker_monitor (self , cidfile , tmpdir_prefix , cleanup_cidfile , process ):
642
+ # type: (Text, Text, bool, subprocess.Popen) -> None
643
+ """Record memory usage of the running Docker container."""
644
+ # Todo: consider switching to `docker create` / `docker start`
645
+ # instead of `docker run` as `docker create` outputs the container ID
646
+ # to stdout, but the container is frozen, thus allowing us to start the
647
+ # monitoring process without dealing with the cidfile or too-fast
648
+ # container execution
638
649
cid = None
639
650
while cid is None :
640
651
time .sleep (1 )
652
+ if process .returncode is not None :
653
+ if cleanup_cidfile :
654
+ os .remove (cidfile )
655
+ return
641
656
try :
642
657
with open (cidfile ) as cidhandle :
643
658
cid = cidhandle .readline ().strip ()
@@ -663,21 +678,22 @@ def docker_monitor(self, cidfile, tmpdir_prefix, process):
663
678
break
664
679
_logger .info (u"[job %s] Max memory used: %iMiB" , self .name ,
665
680
int ((max_mem_percent * max_mem ) / (2 ** 20 )))
666
-
667
-
668
- def _job_popen (
669
- commands , # type: List[Text]
670
- stdin_path , # type: Optional[Text]
671
- stdout_path , # type: Optional[Text]
672
- stderr_path , # type: Optional[Text]
673
- env , # type: MutableMapping[AnyStr, AnyStr]
674
- cwd , # type: Text
675
- job_dir , # type: Text
676
- job_script_contents = None , # type: Text
677
- timelimit = None , # type: int
678
- name = None , # type: Text
679
- monitor_function = None # type: Optional[Callable]
680
- ): # type: (...) -> int
681
+ if cleanup_cidfile :
682
+ os .remove (cidfile )
683
+
684
+
685
+ def _job_popen (commands , # type: List[Text]
686
+ stdin_path , # type: Optional[Text]
687
+ stdout_path , # type: Optional[Text]
688
+ stderr_path , # type: Optional[Text]
689
+ env , # type: MutableMapping[AnyStr, AnyStr]
690
+ cwd , # type: Text
691
+ job_dir , # type: Text
692
+ job_script_contents = None , # type: Text
693
+ timelimit = None , # type: int
694
+ name = None , # type: Text
695
+ monitor_function = None # type: Optional[Callable]
696
+ ): # type: (...) -> int
681
697
682
698
if job_script_contents is None and not FORCE_SHELLED_POPEN :
683
699
0 commit comments