Skip to content

Commit a5ba813

Browse files
committed
general cleanup
1 parent e420ceb commit a5ba813

File tree

5 files changed

+44
-31
lines changed

5 files changed

+44
-31
lines changed

nipype/pipeline/engine/nodes.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ def run(self, updatehash=False):
327327
makedirs(outdir, exist_ok=True)
328328
os.chdir(outdir)
329329

330-
logger.info('[Node] Executing "%s" (%s)', self.fullname, outdir)
330+
logger.info('[Node] Setting-up "%s" in "%s".', self.fullname, outdir)
331331
hash_info = self.hash_exists(updatehash=updatehash)
332332
hash_exists, hashvalue, hashfile, hashed_inputs = hash_info
333333
force_run = self.overwrite or (self.overwrite is None and self._interface.always_run)
@@ -346,7 +346,7 @@ def run(self, updatehash=False):
346346
savepkl(node_file, self)
347347

348348
self._run_interface(execute=False, updatehash=updatehash)
349-
logger.info('[Node] Cached "%s" (%s)\n', self.fullname, outdir)
349+
logger.info('[Node] Cached "%s".', self.fullname)
350350
os.chdir(cwd)
351351
return self.result
352352

@@ -412,7 +412,7 @@ def run(self, updatehash=False):
412412
# Tear-up
413413
shutil.move(hashfile_unfinished, hashfile)
414414
self.write_report(report_type='postexec', cwd=outdir)
415-
logger.info('[Node] Completed "%s" (%s)', self.fullname, outdir)
415+
logger.info('[Node] Finished "%s".', self.fullname)
416416
os.chdir(cwd)
417417
return self._result
418418

@@ -623,7 +623,7 @@ def _run_command(self, execute, copyfiles=True):
623623
if copyfiles:
624624
self._copyfiles_to_wd(cwd, execute)
625625

626-
message = 'Running node "%s" ("%s.%s")'
626+
message = '[Node] Running "%s" ("%s.%s")'
627627
if issubclass(self._interface.__class__, CommandLine):
628628
try:
629629
cmd = self._interface.cmdline

nipype/pipeline/engine/workflows.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@
4747
from ...utils.filemanip import (save_json, FileNotFoundError,
4848
filename_to_list, list_to_filename,
4949
copyfiles, fnames_presuffix, loadpkl,
50-
split_filename, load_json, savepkl,
50+
split_filename, load_json, makedirs, savepkl,
5151
write_rst_header, write_rst_dict,
5252
write_rst_list, to_str)
5353
from .utils import (generate_expanded_graph, modify_paths,
54-
export_graph, make_output_dir, write_workflow_prov,
54+
export_graph, write_workflow_prov,
5555
write_workflow_resources,
5656
clean_working_directory, format_dot, topological_sort,
5757
get_print_name, merge_dict, evaluate_connect_function,
@@ -424,7 +424,7 @@ def write_graph(self, dotfilename='graph.dot', graph2use='hierarchical',
424424
base_dir = op.join(base_dir, self.name)
425425
else:
426426
base_dir = os.getcwd()
427-
base_dir = make_output_dir(base_dir)
427+
base_dir = makedirs(base_dir)
428428
if graph2use in ['hierarchical', 'colored']:
429429
if self.name[:1].isdigit(): # these graphs break if int
430430
raise ValueError('{} graph failed, workflow name cannot begin '

nipype/pipeline/plugins/base.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ def _local_hash_check(self, jobid, graph):
339339
logger.debug('Skipping cached node %s with ID %s.',
340340
self.procs[jobid]._id, jobid)
341341
try:
342-
self._task_finished_cb(jobid)
342+
self._task_finished_cb(jobid, cached=True)
343343
self._remove_node_dirs()
344344
except Exception:
345345
logger.debug('Error skipping cached node %s (%s).',
@@ -349,13 +349,14 @@ def _local_hash_check(self, jobid, graph):
349349
return True
350350
return False
351351

352-
def _task_finished_cb(self, jobid):
352+
def _task_finished_cb(self, jobid, cached=False):
353353
""" Extract outputs and assign to inputs of dependent tasks
354354
355355
This is called when a job is completed.
356356
"""
357-
logger.info('[Job finished] jobname: %s jobid: %d' %
358-
(self.procs[jobid]._id, jobid))
357+
logger.info('[Job %d] %s (%s).', jobid,
358+
'Cached' if cached else 'Completed',
359+
self.procs[jobid].fullname)
359360
if self._status_callback:
360361
self._status_callback(self.procs[jobid], 'end')
361362
# Update job and worker queues

nipype/pipeline/plugins/multiproc.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@
1212
from multiprocessing import Process, Pool, cpu_count, pool
1313
from traceback import format_exception
1414
import sys
15+
from textwrap import indent
16+
from logging import INFO
1517

1618
from copy import deepcopy
1719
import numpy as np
18-
1920
from ... import logging
2021
from ...utils.profiler import get_system_total_memory_gb
2122
from ..engine import MapNode
@@ -126,8 +127,8 @@ def __init__(self, plugin_args=None):
126127
self.raise_insufficient = self.plugin_args.get('raise_insufficient', True)
127128

128129
# Instantiate different thread pools for non-daemon processes
129-
logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)',
130-
'non' if non_daemon else '', self.processors, self.memory_gb)
130+
logger.debug('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)',
131+
'non' * non_daemon, self.processors, self.memory_gb)
131132

132133
NipypePool = NonDaemonPool if non_daemon else Pool
133134
try:
@@ -158,7 +159,7 @@ def _submit_job(self, node, updatehash=False):
158159
run_node, (node, updatehash, self._taskid),
159160
callback=self._async_callback)
160161

161-
logger.debug('MultiProc submitted task %s (taskid=%d).',
162+
logger.debug('[MultiProc] Submitted task %s (taskid=%d).',
162163
node.fullname, self._taskid)
163164
return self._taskid
164165

@@ -214,9 +215,17 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
214215
stats = (len(self.pending_tasks), len(jobids), free_memory_gb,
215216
self.memory_gb, free_processors, self.processors)
216217
if self._stats != stats:
217-
logger.info('Currently running %d tasks, and %d jobs ready. Free '
218-
'memory (GB): %0.2f/%0.2f, Free processors: %d/%d',
219-
*stats)
218+
tasks_list_msg = ''
219+
if logger.level <= INFO:
220+
running_tasks = [' * %s' % self.procs[jobid].fullname
221+
for _, jobid in self.pending_tasks]
222+
if running_tasks:
223+
tasks_list_msg = '\nCurrently running:\n'
224+
tasks_list_msg += '\n'.join(running_tasks)
225+
tasks_list_msg = indent(tasks_list_msg, ' ' * 21)
226+
logger.info('[MultiProc] Running %d tasks, and %d jobs ready. Free '
227+
'memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s',
228+
*stats, tasks_list_msg)
220229
self._stats = stats
221230

222231
if free_memory_gb < 0.01 or free_processors == 0:

nipype/utils/filemanip.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -699,16 +699,19 @@ def emptydirs(path):
699699
return True
700700

701701
for el in pathconts:
702-
try:
703-
shutil.rmtree(el)
704-
except OSError as ex:
705-
elcont = os.listdir(el)
706-
if ex.errno == errno.ENOTEMPTY and not elcont:
707-
fmlogger.warning(
708-
'An exception was raised trying to remove old %s, but the path '
709-
'seems empty. Is it an NFS mount?. Passing the exception.', el)
710-
elif ex.errno == errno.ENOTEMPTY and elcont:
711-
fmlogger.debug('Folder %s contents (%d items).', el, len(elcont))
712-
raise ex
713-
else:
714-
raise ex
702+
if os.path.isfile(el):
703+
os.remove(el)
704+
else:
705+
try:
706+
shutil.rmtree(el)
707+
except OSError as ex:
708+
elcont = os.listdir(el)
709+
if ex.errno == errno.ENOTEMPTY and not elcont:
710+
fmlogger.warning(
711+
'An exception was raised trying to remove old %s, but the path '
712+
'seems empty. Is it an NFS mount?. Passing the exception.', el)
713+
elif ex.errno == errno.ENOTEMPTY and elcont:
714+
fmlogger.debug('Folder %s contents (%d items).', el, len(elcont))
715+
raise ex
716+
else:
717+
raise ex

0 commit comments

Comments
 (0)