2222#
2323"""A lightweight NiPype MultiProc execution plugin."""
2424
25- # Import packages
2625import os
2726import sys
2827from copy import deepcopy
3231from traceback import format_exception
3332import gc
3433
34+ from nipype .utils .misc import str2bool
35+
3536
3637# Run node
3738def run_node (node , updatehash , taskid ):
@@ -239,8 +240,6 @@ def _clear_task(self, taskid):
239240 raise NotImplementedError
240241
241242 def _clean_queue (self , jobid , graph , result = None ):
242- from mriqc import config
243-
244243 if self ._status_callback :
245244 self ._status_callback (self .procs [jobid ], "exception" )
246245 if result is None :
@@ -250,7 +249,7 @@ def _clean_queue(self, jobid, graph, result=None):
250249 }
251250
252251 crashfile = self ._report_crash (self .procs [jobid ], result = result )
253- if config . nipype . stop_on_first_crash :
252+ if str2bool ( self . _config [ "execution" ][ " stop_on_first_crash" ]) :
254253 raise RuntimeError ("" .join (result ["traceback" ]))
255254 if jobid in self .mapnodesubids :
256255 # remove current jobid
@@ -292,9 +291,7 @@ def _submit_mapnode(self, jobid):
292291 return False
293292
294293 def _local_hash_check (self , jobid , graph ):
295- from mriqc import config
296-
297- if not config .nipype .local_hash_check :
294+ if not str2bool (self .procs [jobid ].config ["execution" ]["local_hash_check" ]):
298295 return False
299296
300297 try :
@@ -368,9 +365,8 @@ def _remove_node_dirs(self):
368365 """Remove directories whose outputs have already been used up."""
369366 import numpy as np
370367 from shutil import rmtree
371- from mriqc import config
372368
373- if config . nipype . remove_node_directories :
369+ if str2bool ( self . _config [ "execution" ][ " remove_node_directories" ]) :
374370 indices = np .nonzero ((self .refidx .sum (axis = 1 ) == 0 ).__array__ ())[0 ]
375371 for idx in indices :
376372 if idx in self .mapnodesubids :
@@ -413,8 +409,6 @@ def __init__(self, pool=None, plugin_args=None):
413409 A Nipype-compatible dictionary of settings.
414410
415411 """
416- from mriqc import config
417-
418412 super ().__init__ (plugin_args = plugin_args )
419413 self ._taskresult = {}
420414 self ._task_obj = {}
@@ -424,6 +418,24 @@ def __init__(self, pool=None, plugin_args=None):
424418 # change to it when workers are set up
425419 self ._cwd = os .getcwd ()
426420
421+ # Retrieve a nipreps-style configuration object
422+ try :
423+ config = plugin_args ["app_config" ]
424+ except (KeyError , TypeError ):
425+ from types import SimpleNamespace
426+ from nipype .utils .profiler import get_system_total_memory_gb
427+
428+ config = SimpleNamespace (
429+ environment = SimpleNamespace (
430+ # Nipype default
431+ total_memory = get_system_total_memory_gb ()
432+ ),
433+ # concurrent.futures default
434+ _process_initializer = None ,
435+ # Just needs to exist
436+ file_path = None ,
437+ )
438+
427439 # Read in options or set defaults.
428440 self .processors = self .plugin_args .get ("n_procs" , mp .cpu_count ())
429441 self .memory_gb = self .plugin_args .get (
0 commit comments