Skip to content

Commit 14182a3

Browse files
committed
Merge pull request #890 from oesteban/bug/fixRemoveNonEmptyDir
Not empty folder in NFS errors
2 parents 016bc6d + 320b036 commit 14182a3

File tree

1 file changed

+71
-56
lines changed

1 file changed

+71
-56
lines changed

nipype/pipeline/engine.py

Lines changed: 71 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import os.path as op
2727
import re
2828
import shutil
29+
import errno
2930
from shutil import rmtree
3031
from socket import gethostname
3132
from string import Template
@@ -512,7 +513,7 @@ def write_graph(self, dotfilename='graph.dot', graph2use='hierarchical',
512513
workflow nodes;
513514
flat - expands workflow nodes recursively;
514515
hierarchical - expands workflow nodes recursively with a
515-
notion on hierarchy;
516+
notion on hierarchy;
516517
colored - expands workflow nodes recursively with a
517518
notion on hierarchy in color;
518519
exec - expands workflows to depict iterables
@@ -529,17 +530,17 @@ def write_graph(self, dotfilename='graph.dot', graph2use='hierarchical',
529530
if graph2use not in graphtypes:
530531
raise ValueError('Unknown graph2use keyword. Must be one of: ' +
531532
str(graphtypes))
532-
base_dir, dotfilename = os.path.split(dotfilename)
533+
base_dir, dotfilename = op.split(dotfilename)
533534
if base_dir == '':
534535
if self.base_dir:
535536
base_dir = self.base_dir
536537
if self.name:
537-
base_dir = os.path.join(base_dir, self.name)
538+
base_dir = op.join(base_dir, self.name)
538539
else:
539540
base_dir = os.getcwd()
540541
base_dir = make_output_dir(base_dir)
541542
if graph2use in ['hierarchical', 'colored']:
542-
dotfilename = os.path.join(base_dir, dotfilename)
543+
dotfilename = op.join(base_dir, dotfilename)
543544
self.write_hierarchical_dotfile(dotfilename=dotfilename,
544545
colored=graph2use == "colored",
545546
simple_form=simple_form)
@@ -697,7 +698,7 @@ def run(self, plugin=None, plugin_args=None, updatehash=False):
697698
runner.run(execgraph, updatehash=updatehash, config=self.config)
698699
datestr = datetime.utcnow().strftime('%Y%m%dT%H%M%S')
699700
if str2bool(self.config['execution']['write_provenance']):
700-
prov_base = os.path.join(self.base_dir,
701+
prov_base = op.join(self.base_dir,
701702
'workflow_provenance_%s' % datestr)
702703
logger.info('Provenance file prefix: %s' % prov_base)
703704
write_workflow_prov(execgraph, prov_base, format='all')
@@ -708,17 +709,17 @@ def run(self, plugin=None, plugin_args=None, updatehash=False):
708709
def _write_report_info(self, workingdir, name, graph):
709710
if workingdir is None:
710711
workingdir = os.getcwd()
711-
report_dir = os.path.join(workingdir, name)
712-
if not os.path.exists(report_dir):
712+
report_dir = op.join(workingdir, name)
713+
if not op.exists(report_dir):
713714
os.makedirs(report_dir)
714-
shutil.copyfile(os.path.join(os.path.dirname(__file__),
715+
shutil.copyfile(op.join(op.dirname(__file__),
715716
'report_template.html'),
716-
os.path.join(report_dir, 'index.html'))
717-
shutil.copyfile(os.path.join(os.path.dirname(__file__),
717+
op.join(report_dir, 'index.html'))
718+
shutil.copyfile(op.join(op.dirname(__file__),
718719
'..', 'external', 'd3.v3.min.js'),
719-
os.path.join(report_dir, 'd3.v3.min.js'))
720+
op.join(report_dir, 'd3.v3.min.js'))
720721
nodes, groups = topological_sort(graph, depth_first=True)
721-
graph_file = os.path.join(report_dir, 'graph1.json')
722+
graph_file = op.join(report_dir, 'graph1.json')
722723
json_dict = {'nodes': [], 'links': [], 'groups': [], 'maxN': 0}
723724
for i, node in enumerate(nodes):
724725
report_file = "%s/_report/report.rst" % \
@@ -745,7 +746,7 @@ def _write_report_info(self, workingdir, name, graph):
745746
target=nodes.index(v),
746747
value=1))
747748
save_json(graph_file, json_dict)
748-
graph_file = os.path.join(report_dir, 'graph.json')
749+
graph_file = op.join(report_dir, 'graph.json')
749750
template = '%%0%dd_' % np.ceil(np.log10(len(nodes))).astype(int)
750751
def getname(u, i):
751752
name_parts = u.fullname.split('.')
@@ -790,7 +791,7 @@ def _configure_exec_nodes(self, graph):
790791
data = graph.get_edge_data(*edge)
791792
for sourceinfo, field in sorted(data['connect']):
792793
node.input_source[field] = \
793-
(os.path.join(edge[0].output_dir(),
794+
(op.join(edge[0].output_dir(),
794795
'result_%s.pklz' % edge[0].name),
795796
sourceinfo)
796797

@@ -1245,15 +1246,15 @@ def output_dir(self):
12451246
self.base_dir = mkdtemp()
12461247
outputdir = self.base_dir
12471248
if self._hierarchy:
1248-
outputdir = os.path.join(outputdir, *self._hierarchy.split('.'))
1249+
outputdir = op.join(outputdir, *self._hierarchy.split('.'))
12491250
if self.parameterization:
12501251
if not str2bool(self.config['execution']['parameterize_dirs']):
12511252
param_dirs = [self._parameterization_dir(p) for p in
12521253
self.parameterization]
1253-
outputdir = os.path.join(outputdir, *param_dirs)
1254+
outputdir = op.join(outputdir, *param_dirs)
12541255
else:
1255-
outputdir = os.path.join(outputdir, *self.parameterization)
1256-
return os.path.abspath(os.path.join(outputdir,
1256+
outputdir = op.join(outputdir, *self.parameterization)
1257+
return op.abspath(op.join(outputdir,
12571258
self.name))
12581259

12591260
def set_input(self, parameter, val):
@@ -1284,23 +1285,23 @@ def hash_exists(self, updatehash=False):
12841285
# of the dictionary itself.
12851286
hashed_inputs, hashvalue = self._get_hashval()
12861287
outdir = self.output_dir()
1287-
if os.path.exists(outdir):
1288+
if op.exists(outdir):
12881289
logger.debug(os.listdir(outdir))
1289-
hashfiles = glob(os.path.join(outdir, '_0x*.json'))
1290+
hashfiles = glob(op.join(outdir, '_0x*.json'))
12901291
logger.debug(hashfiles)
12911292
if len(hashfiles) > 1:
12921293
logger.info(hashfiles)
12931294
logger.info('Removing multiple hashfiles and forcing node to rerun')
12941295
for hashfile in hashfiles:
12951296
os.unlink(hashfile)
1296-
hashfile = os.path.join(outdir, '_0x%s.json' % hashvalue)
1297+
hashfile = op.join(outdir, '_0x%s.json' % hashvalue)
12971298
logger.debug(hashfile)
1298-
if updatehash and os.path.exists(outdir):
1299+
if updatehash and op.exists(outdir):
12991300
logger.debug("Updating hash: %s" % hashvalue)
1300-
for file in glob(os.path.join(outdir, '_0x*.json')):
1301+
for file in glob(op.join(outdir, '_0x*.json')):
13011302
os.remove(file)
13021303
self._save_hashfile(hashfile, hashed_inputs)
1303-
return os.path.exists(hashfile), hashvalue, hashfile, hashed_inputs
1304+
return op.exists(hashfile), hashvalue, hashfile, hashed_inputs
13041305

13051306
def run(self, updatehash=False):
13061307
"""Execute the node in its directory.
@@ -1321,7 +1322,7 @@ def run(self, updatehash=False):
13211322
self._got_inputs = True
13221323
outdir = self.output_dir()
13231324
logger.info("Executing node %s in dir: %s" % (self._id, outdir))
1324-
if os.path.exists(outdir):
1325+
if op.exists(outdir):
13251326
logger.debug(os.listdir(outdir))
13261327
hash_info = self.hash_exists(updatehash=updatehash)
13271328
hash_exists, hashvalue, hashfile, hashed_inputs = hash_info
@@ -1337,7 +1338,7 @@ def run(self, updatehash=False):
13371338
# by rerunning we mean only nodes that did finish to run previously
13381339
json_pat = op.join(outdir, '_0x*.json')
13391340
json_unfinished_pat = op.join(outdir, '_0x*_unfinished.json')
1340-
need_rerun = (os.path.exists(outdir)
1341+
need_rerun = (op.exists(outdir)
13411342
and not isinstance(self, MapNode)
13421343
and len(glob(json_pat)) != 0
13431344
and len(glob(json_unfinished_pat)) == 0)
@@ -1352,7 +1353,7 @@ def run(self, updatehash=False):
13521353
str(self.overwrite),
13531354
str(self._interface.always_run),
13541355
hashfile,
1355-
str(os.path.exists(hashfile)),
1356+
str(op.exists(hashfile)),
13561357
self.config['execution']['hash_method'].lower()))
13571358
log_debug = config.get('logging', 'workflow_level') == 'DEBUG'
13581359
if log_debug and not op.exists(hashfile):
@@ -1376,7 +1377,7 @@ def run(self, updatehash=False):
13761377
if cannot_rerun:
13771378
raise Exception(("Cannot rerun when 'stop_on_first_rerun' "
13781379
"is set to True"))
1379-
hashfile_unfinished = os.path.join(outdir,
1380+
hashfile_unfinished = op.join(outdir,
13801381
'_0x%s_unfinished.json' %
13811382
hashvalue)
13821383
if op.exists(hashfile):
@@ -1387,20 +1388,35 @@ def run(self, updatehash=False):
13871388
and not isinstance(self, MapNode))
13881389
if rm_outdir:
13891390
logger.debug("Removing old %s and its contents" % outdir)
1390-
rmtree(outdir)
1391+
try:
1392+
rmtree(outdir)
1393+
except OSError as ex:
1394+
outdircont = os.listdir(outdir)
1395+
if ((ex.errno == errno.ENOTEMPTY) and (len(outdircont) == 0)):
1396+
logger.warn(('An exception was raised trying to remove old %s, '
1397+
'but the path seems empty. Is it an NFS mount?. '
1398+
'Passing the exception.') % outdir)
1399+
pass
1400+
elif ((ex.errno == errno.ENOTEMPTY) and (len(outdircont) != 0)):
1401+
logger.debug(('Folder contents (%d items): '
1402+
'%s') % (len(outdircont), outdircont))
1403+
raise ex
1404+
else:
1405+
raise ex
1406+
13911407
else:
13921408
logger.debug(("%s found and can_resume is True or Node is a "
13931409
"MapNode - resuming execution") %
13941410
hashfile_unfinished)
13951411
if isinstance(self, MapNode):
13961412
# remove old json files
1397-
for filename in glob(os.path.join(outdir, '_0x*.json')):
1413+
for filename in glob(op.join(outdir, '_0x*.json')):
13981414
os.unlink(filename)
13991415
outdir = make_output_dir(outdir)
14001416
self._save_hashfile(hashfile_unfinished, hashed_inputs)
14011417
self.write_report(report_type='preexec', cwd=outdir)
1402-
savepkl(os.path.join(outdir, '_node.pklz'), self)
1403-
savepkl(os.path.join(outdir, '_inputs.pklz'),
1418+
savepkl(op.join(outdir, '_node.pklz'), self)
1419+
savepkl(op.join(outdir, '_inputs.pklz'),
14041420
self.inputs.get_traitsfree())
14051421
try:
14061422
self._run_interface()
@@ -1410,13 +1426,13 @@ def run(self, updatehash=False):
14101426
shutil.move(hashfile_unfinished, hashfile)
14111427
self.write_report(report_type='postexec', cwd=outdir)
14121428
else:
1413-
if not os.path.exists(os.path.join(outdir, '_inputs.pklz')):
1429+
if not op.exists(op.join(outdir, '_inputs.pklz')):
14141430
logger.debug('%s: creating inputs file' % self.name)
1415-
savepkl(os.path.join(outdir, '_inputs.pklz'),
1431+
savepkl(op.join(outdir, '_inputs.pklz'),
14161432
self.inputs.get_traitsfree())
1417-
if not os.path.exists(os.path.join(outdir, '_node.pklz')):
1433+
if not op.exists(op.join(outdir, '_node.pklz')):
14181434
logger.debug('%s: creating node file' % self.name)
1419-
savepkl(os.path.join(outdir, '_node.pklz'), self)
1435+
savepkl(op.join(outdir, '_node.pklz'), self)
14201436
logger.debug("Hashfile exists. Skipping execution")
14211437
self._run_interface(execute=False, updatehash=updatehash)
14221438
logger.debug('Finished running %s in dir: %s\n' % (self._id, outdir))
@@ -1517,7 +1533,7 @@ def _run_interface(self, execute=True, updatehash=False):
15171533
os.chdir(old_cwd)
15181534

15191535
def _save_results(self, result, cwd):
1520-
resultsfile = os.path.join(cwd, 'result_%s.pklz' % self.name)
1536+
resultsfile = op.join(cwd, 'result_%s.pklz' % self.name)
15211537
if result.outputs:
15221538
try:
15231539
outputs = result.outputs.get()
@@ -1550,10 +1566,10 @@ def _load_resultfile(self, cwd):
15501566
rerun
15511567
"""
15521568
aggregate = True
1553-
resultsoutputfile = os.path.join(cwd, 'result_%s.pklz' % self.name)
1569+
resultsoutputfile = op.join(cwd, 'result_%s.pklz' % self.name)
15541570
result = None
15551571
attribute_error = False
1556-
if os.path.exists(resultsoutputfile):
1572+
if op.exists(resultsoutputfile):
15571573
pkl_file = gzip.open(resultsoutputfile, 'rb')
15581574
try:
15591575
result = cPickle.load(pkl_file)
@@ -1589,7 +1605,7 @@ def _load_results(self, cwd):
15891605
if aggregate:
15901606
logger.debug('aggregating results')
15911607
if attribute_error:
1592-
old_inputs = loadpkl(os.path.join(cwd, '_inputs.pklz'))
1608+
old_inputs = loadpkl(op.join(cwd, '_inputs.pklz'))
15931609
self.inputs.set(**old_inputs)
15941610
if not isinstance(self, MapNode):
15951611
self._copyfiles_to_wd(cwd, True, linksonly=True)
@@ -1633,7 +1649,7 @@ def _run_command(self, execute, copyfiles=True):
16331649
except Exception, msg:
16341650
self._result.runtime.stderr = msg
16351651
raise
1636-
cmdfile = os.path.join(cwd, 'command.txt')
1652+
cmdfile = op.join(cwd, 'command.txt')
16371653
fd = open(cmdfile, 'wt')
16381654
fd.writelines(cmd + "\n")
16391655
fd.close()
@@ -1646,7 +1662,7 @@ def _run_command(self, execute, copyfiles=True):
16461662

16471663
dirs2keep = None
16481664
if isinstance(self, MapNode):
1649-
dirs2keep = [os.path.join(cwd, 'mapflow')]
1665+
dirs2keep = [op.join(cwd, 'mapflow')]
16501666
result.outputs = clean_working_directory(result.outputs, cwd,
16511667
self._interface.inputs,
16521668
self.needed_outputs,
@@ -1670,7 +1686,7 @@ def _strip_temp(self, files, wd):
16701686
if isinstance(f, list):
16711687
out.append(self._strip_temp(f, wd))
16721688
else:
1673-
out.append(f.replace(os.path.join(wd, '_tempinput'), wd))
1689+
out.append(f.replace(op.join(wd, '_tempinput'), wd))
16741690
return out
16751691

16761692
def _copyfiles_to_wd(self, outdir, execute, linksonly=False):
@@ -1680,7 +1696,7 @@ def _copyfiles_to_wd(self, outdir, execute, linksonly=False):
16801696
(str(execute), str(linksonly)))
16811697
if execute and linksonly:
16821698
olddir = outdir
1683-
outdir = os.path.join(outdir, '_tempinput')
1699+
outdir = op.join(outdir, '_tempinput')
16841700
os.makedirs(outdir)
16851701
for info in self._interface._get_filecopy_info():
16861702
files = self.inputs.get().get(info['key'])
@@ -1700,7 +1716,7 @@ def _copyfiles_to_wd(self, outdir, execute, linksonly=False):
17001716
newpath=outdir)
17011717
newfiles = self._strip_temp(
17021718
newfiles,
1703-
op.abspath(olddir).split(os.path.sep)[-1])
1719+
op.abspath(olddir).split(op.sep)[-1])
17041720
else:
17051721
newfiles = copyfiles(infiles,
17061722
[outdir],
@@ -1720,9 +1736,9 @@ def update(self, **opts):
17201736
def write_report(self, report_type=None, cwd=None):
17211737
if not str2bool(self.config['execution']['create_report']):
17221738
return
1723-
report_dir = os.path.join(cwd, '_report')
1724-
report_file = os.path.join(report_dir, 'report.rst')
1725-
if not os.path.exists(report_dir):
1739+
report_dir = op.join(cwd, '_report')
1740+
report_file = op.join(report_dir, 'report.rst')
1741+
if not op.exists(report_dir):
17261742
os.makedirs(report_dir)
17271743
if report_type == 'preexec':
17281744
logger.debug('writing pre-exec report to %s' % report_file)
@@ -2038,7 +2054,6 @@ def __init__(self, interface, iterfield, name, serial=False, **kwargs):
20382054
fields=self.iterfield)
20392055
self._inputs.on_trait_change(self._set_mapnode_input)
20402056
self._got_inputs = False
2041-
20422057
self._serial = serial
20432058

20442059
def _create_dynamic_traits(self, basetraits, fields=None, nitems=None):
@@ -2137,7 +2152,7 @@ def _make_nodes(self, cwd=None):
21372152
setattr(node.inputs, field,
21382153
fieldvals[i])
21392154
node.config = self.config
2140-
node.base_dir = os.path.join(cwd, 'mapflow')
2155+
node.base_dir = op.join(cwd, 'mapflow')
21412156
yield i, node
21422157

21432158
def _node_runner(self, nodes, updatehash=False):
@@ -2199,8 +2214,8 @@ def write_report(self, report_type=None, cwd=None):
21992214
super(MapNode, self).write_report(report_type=report_type, cwd=cwd)
22002215
if report_type == 'postexec':
22012216
super(MapNode, self).write_report(report_type=report_type, cwd=cwd)
2202-
report_dir = os.path.join(cwd, '_report')
2203-
report_file = os.path.join(report_dir, 'report.rst')
2217+
report_dir = op.join(cwd, '_report')
2218+
report_file = op.join(report_dir, 'report.rst')
22042219
fp = open(report_file, 'at')
22052220
fp.writelines(write_rst_header('Subnode reports', level=1))
22062221
nitems = len(filename_to_list(
@@ -2209,7 +2224,7 @@ def write_report(self, report_type=None, cwd=None):
22092224
for i in range(nitems):
22102225
nodename = '_' + self.name + str(i)
22112226
subnode_report_files.insert(i, 'subnode %d' % i + ' : ' +
2212-
os.path.join(cwd,
2227+
op.join(cwd,
22132228
'mapflow',
22142229
nodename,
22152230
'_report',
@@ -2282,9 +2297,9 @@ def _run_interface(self, execute=True, updatehash=False):
22822297
self._save_results(self._result, cwd)
22832298
# remove any node directories no longer required
22842299
dirs2remove = []
2285-
for path in glob(os.path.join(cwd, 'mapflow', '*')):
2286-
if os.path.isdir(path):
2287-
if path.split(os.path.sep)[-1] not in nodenames:
2300+
for path in glob(op.join(cwd, 'mapflow', '*')):
2301+
if op.isdir(path):
2302+
if path.split(op.sep)[-1] not in nodenames:
22882303
dirs2remove.append(path)
22892304
for path in dirs2remove:
22902305
shutil.rmtree(path)

0 commit comments

Comments
 (0)