Skip to content

Commit 9a99b2f

Browse files
committed
revise handling of hashes
1 parent 056af08 commit 9a99b2f

File tree

3 files changed

+147
-117
lines changed

3 files changed

+147
-117
lines changed

nipype/pipeline/engine/nodes.py

Lines changed: 129 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ def __init__(self,
189189
self._hashvalue = None
190190
self._hashed_inputs = None
191191
self._needed_outputs = []
192-
self.needed_outputs = sorted(needed_outputs)
192+
self.needed_outputs = needed_outputs
193193

194194
@property
195195
def interface(self):
@@ -297,83 +297,105 @@ def help(self):
297297
"""Print interface help"""
298298
self._interface.help()
299299

300-
def hash_exists(self, updatehash=False):
300+
def is_cached(self, rm_outdated=False):
301301
"""
302302
Check if the interface has been run previously, and whether
303-
cached results are viable for reuse
303+
cached results are up-to-date.
304304
"""
305+
outdir = self.output_dir()
305306

306-
# Get a dictionary with hashed filenames and a hashvalue
307-
# of the dictionary itself.
307+
# Update hash
308308
hashed_inputs, hashvalue = self._get_hashval()
309-
outdir = self.output_dir()
309+
310+
# The output folder does not exist: not cached
311+
if not op.exists(outdir):
312+
logger.debug('[Node] Directory not found "%s".', outdir)
313+
return False, False
314+
310315
hashfile = op.join(outdir, '_0x%s.json' % hashvalue)
311-
hash_exists = op.exists(hashfile)
312-
313-
logger.debug('[Node] hash value=%s, exists=%s', hashvalue, hash_exists)
314-
315-
if op.exists(outdir):
316-
# Find previous hashfiles
317-
globhashes = glob(op.join(outdir, '_0x*.json'))
318-
unfinished = [
319-
path for path in globhashes
320-
if path.endswith('_unfinished.json')
321-
]
322-
hashfiles = list(set(globhashes) - set(unfinished))
323-
if len(hashfiles) > 1:
324-
for rmfile in hashfiles:
325-
os.remove(rmfile)
326-
327-
raise RuntimeError(
328-
'[Node] Cache ERROR - Found %d previous hashfiles indicating '
329-
'that the ``base_dir`` for this node went stale. Please re-run the '
330-
'workflow.' % len(hashfiles))
331-
332-
# This should not happen, but clean up and break if so.
333-
if unfinished and updatehash:
334-
for rmfile in unfinished:
335-
os.remove(rmfile)
336-
337-
raise RuntimeError(
338-
'[Node] Cache ERROR - Found unfinished hashfiles (%d) indicating '
339-
'that the ``base_dir`` for this node went stale. Please re-run the '
340-
'workflow.' % len(unfinished))
341-
342-
# Remove outdated hashfile
343-
if hashfiles and hashfiles[0] != hashfile:
344-
logger.info(
345-
'[Node] Outdated hashfile found for "%s", removing and forcing node '
346-
'to rerun.', self.fullname)
347-
348-
# If logging is more verbose than INFO (20), print diff between hashes
349-
loglevel = logger.getEffectiveLevel()
350-
if loglevel < 20: # Lazy logging: only < INFO
351-
split_out = split_filename(hashfiles[0])
352-
exp_hash_file_base = split_out[1]
353-
exp_hash = exp_hash_file_base[len('_0x'):]
354-
logger.log(loglevel, "[Node] Old/new hashes = %s/%s",
355-
exp_hash, hashvalue)
356-
try:
357-
prev_inputs = load_json(hashfiles[0])
358-
except Exception:
359-
pass
360-
else:
361-
logger.log(loglevel,
362-
dict_diff(prev_inputs, hashed_inputs, 10))
316+
cached = op.exists(hashfile)
317+
318+
# Check if updated
319+
globhashes = glob(op.join(outdir, '_0x*.json'))
320+
unfinished = [
321+
path for path in globhashes
322+
if path.endswith('_unfinished.json')
323+
]
324+
hashfiles = list(set(globhashes) - set(unfinished))
325+
logger.debug('[Node] Hashes: %s, %s, %s, %s',
326+
hashed_inputs, hashvalue, hashfile, hashfiles)
327+
328+
# No previous hashfiles found, we're all set.
329+
if cached and len(hashfiles) == 1:
330+
assert(hashfile == hashfiles[0])
331+
logger.debug('[Node] Up-to-date cache found for "%s".', self.fullname)
332+
return True, True # Cached and updated
333+
334+
if len(hashfiles) > 1:
335+
if cached:
336+
hashfiles.remove(hashfile) # Do not clean up the node, if cached
337+
logger.warning('[Node] Found %d previous hashfiles indicating that the working '
338+
'directory of node "%s" is stale, deleting old hashfiles.',
339+
len(hashfiles), self.fullname)
340+
for rmfile in hashfiles:
341+
os.remove(rmfile)
342+
343+
hashfiles = [hashfile] if cached else []
344+
345+
# At this point only one hashfile is in the folder
346+
# and we directly check whether it is updated
347+
if not hashfiles:
348+
logger.debug('[Node] No hashfiles found in "%s".', outdir)
349+
assert(not cached)
350+
return False, False
351+
352+
updated = hashfile == hashfiles[0]
353+
if not updated: # Report differences depending on log verbosity
354+
cached = True
355+
logger.info('[Node] Outdated cache found for "%s".', self.fullname)
356+
# If logging is more verbose than INFO (20), print diff between hashes
357+
loglevel = logger.getEffectiveLevel()
358+
if loglevel < 40: # Lazy logging: only < INFO
359+
exp_hash_file_base = split_filename(hashfiles[0])[1]
360+
exp_hash = exp_hash_file_base[len('_0x'):]
361+
logger.log(loglevel, "[Node] Old/new hashes = %s/%s",
362+
exp_hash, hashvalue)
363+
try:
364+
prev_inputs = load_json(hashfiles[0])
365+
except Exception:
366+
pass
367+
else:
368+
logger.log(loglevel,
369+
dict_diff(prev_inputs, hashed_inputs, 10))
363370

371+
if rm_outdated:
364372
os.remove(hashfiles[0])
365373

374+
assert(cached) # At this point, node is cached (may not be up-to-date)
375+
return cached, updated
376+
377+
def hash_exists(self, updatehash=False):
378+
"""
379+
Decorate the new `is_cached` method with hash updating
380+
to maintain backwards compatibility.
381+
"""
382+
383+
# Get a dictionary with hashed filenames and a hashvalue
384+
# of the dictionary itself.
385+
cached, updated = self.is_cached(rm_outdated=True)
386+
387+
outdir = self.output_dir()
388+
hashfile = op.join(outdir, '_0x%s.json' % self._hashvalue)
389+
390+
if updated:
391+
return True, self._hashvalue, hashfile, self._hashed_inputs
392+
366393
# Update only possible if it exists
367-
if hash_exists and updatehash:
368-
logger.debug("[Node] Updating hash: %s", hashvalue)
369-
_save_hashfile(hashfile, hashed_inputs)
394+
if cached and updatehash:
395+
logger.debug("[Node] Updating hash: %s", self._hashvalue)
396+
_save_hashfile(hashfile, self._hashed_inputs)
370397

371-
logger.debug(
372-
'updatehash=%s, overwrite=%s, always_run=%s, hash_exists=%s, '
373-
'hash_method=%s', updatehash, self.overwrite,
374-
self._interface.always_run, hash_exists,
375-
self.config['execution']['hash_method'].lower())
376-
return hash_exists, hashvalue, hashfile, hashed_inputs
398+
return cached, self._hashvalue, hashfile, self._hashed_inputs
377399

378400
def run(self, updatehash=False):
379401
"""Execute the node in its directory.
@@ -390,23 +412,17 @@ def run(self, updatehash=False):
390412
if self.config is None:
391413
self.config = {}
392414
self.config = merge_dict(deepcopy(config._sections), self.config)
393-
self._get_inputs()
394415

395-
# Check if output directory exists
396416
outdir = self.output_dir()
397-
if op.exists(outdir):
398-
logger.debug('Output directory (%s) exists and is %sempty,',
399-
outdir, 'not ' * bool(os.listdir(outdir)))
417+
force_run = self.overwrite or (self.overwrite is None and
418+
self._interface.always_run)
400419

401420
# Check hash, check whether run should be enforced
402421
logger.info('[Node] Setting-up "%s" in "%s".', self.fullname, outdir)
403-
hash_info = self.hash_exists(updatehash=updatehash)
404-
hash_exists, hashvalue, hashfile, hashed_inputs = hash_info
405-
force_run = self.overwrite or (self.overwrite is None and
406-
self._interface.always_run)
422+
cached, updated = self.is_cached()
407423

408424
# If the node is cached, check on pklz files and finish
409-
if hash_exists and (updatehash or not force_run):
425+
if not force_run and (updated or (not updated and updatehash)):
410426
logger.debug("Only updating node hashes or skipping execution")
411427
inputs_file = op.join(outdir, '_inputs.pklz')
412428
if not op.exists(inputs_file):
@@ -418,46 +434,48 @@ def run(self, updatehash=False):
418434
logger.debug('Creating node file %s', node_file)
419435
savepkl(node_file, self)
420436

421-
result = self._run_interface(execute=False, updatehash=updatehash)
437+
result = self._run_interface(execute=False,
438+
updatehash=updatehash and not updated)
422439
logger.info('[Node] "%s" found cached%s.', self.fullname,
423-
' (and hash updated)' * updatehash)
440+
' (and hash updated)' * (updatehash and not updated))
424441
return result
425442

426-
# by rerunning we mean only nodes that did finish to run previously
427-
if hash_exists and not isinstance(self, MapNode):
428-
logger.debug('[Node] Rerunning "%s"', self.fullname)
443+
if cached and updated and not isinstance(self, MapNode):
444+
logger.debug('[Node] Rerunning cached, up-to-date node "%s"', self.fullname)
429445
if not force_run and str2bool(
430446
self.config['execution']['stop_on_first_rerun']):
431447
raise Exception(
432448
'Cannot rerun when "stop_on_first_rerun" is set to True')
433449

434-
# Remove hashfile if it exists at this point (re-running)
435-
if op.exists(hashfile):
436-
os.remove(hashfile)
450+
# Remove any hashfile that exists at this point (re)running.
451+
if cached:
452+
for outdatedhash in glob(op.join(self.output_dir(), '_0x*.json')):
453+
os.remove(outdatedhash)
454+
437455

438456
# Hashfile while running
439-
hashfile_unfinished = op.join(outdir,
440-
'_0x%s_unfinished.json' % hashvalue)
457+
hashfile_unfinished = op.join(
458+
outdir, '_0x%s_unfinished.json' % self._hashvalue)
441459

442460
# Delete directory contents if this is not a MapNode or can't resume
443-
rm_outdir = not isinstance(self, MapNode) and not (
444-
self._interface.can_resume and op.isfile(hashfile_unfinished))
445-
if rm_outdir:
461+
can_resume = not (self._interface.can_resume and op.isfile(hashfile_unfinished))
462+
if can_resume and not isinstance(self, MapNode):
446463
emptydirs(outdir, noexist_ok=True)
447464
else:
448465
logger.debug('[%sNode] Resume - hashfile=%s',
449466
'Map' * int(isinstance(self, MapNode)),
450467
hashfile_unfinished)
451-
if isinstance(self, MapNode):
452-
# remove old json files
453-
for filename in glob(op.join(outdir, '_0x*.json')):
454-
os.remove(filename)
468+
469+
if isinstance(self, MapNode):
470+
# remove old json files
471+
for filename in glob(op.join(outdir, '_0x*.json')):
472+
os.remove(filename)
455473

456474
# Make sure outdir is created
457475
makedirs(outdir, exist_ok=True)
458476

459477
# Store runtime-hashfile, pre-execution report, the node and the inputs set.
460-
_save_hashfile(hashfile_unfinished, hashed_inputs)
478+
_save_hashfile(hashfile_unfinished, self._hashed_inputs)
461479
write_report(
462480
self, report_type='preexec', is_mapnode=isinstance(self, MapNode))
463481
savepkl(op.join(outdir, '_node.pklz'), self)
@@ -485,7 +503,8 @@ def run(self, updatehash=False):
485503
os.chdir(cwd)
486504

487505
# Tear-up after success
488-
shutil.move(hashfile_unfinished, hashfile)
506+
shutil.move(hashfile_unfinished,
507+
hashfile_unfinished.replace('_unfinished', ''))
489508
write_report(
490509
self, report_type='postexec', is_mapnode=isinstance(self, MapNode))
491510
logger.info('[Node] Finished "%s".', self.fullname)
@@ -551,8 +570,14 @@ def _get_inputs(self):
551570
# Successfully set inputs
552571
self._got_inputs = True
553572

573+
def _update_hash(self):
574+
for outdatedhash in glob(op.join(self.output_dir(), '_0x*.json')):
575+
os.remove(outdatedhash)
576+
_save_hashfile(self._hashvalue, self._hashed_inputs)
577+
554578
def _run_interface(self, execute=True, updatehash=False):
555579
if updatehash:
580+
self._update_hash()
556581
return self._load_results()
557582
return self._run_command(execute)
558583

@@ -586,7 +611,6 @@ def _load_results(self):
586611
return result
587612

588613
def _run_command(self, execute, copyfiles=True):
589-
590614
if not execute:
591615
try:
592616
result = self._load_results()
@@ -597,7 +621,8 @@ def _run_command(self, execute, copyfiles=True):
597621
copyfiles = False # OE: this was like this before,
598622
execute = True # I'll keep them for safety
599623
else:
600-
logger.info("[Node] Cached - collecting precomputed outputs")
624+
logger.info('[Node] Cached "%s" - collecting precomputed outputs',
625+
self.fullname)
601626
return result
602627

603628
# Run command: either execute is true or load_results failed.
@@ -1037,6 +1062,10 @@ def _set_mapnode_input(self, name, newvalue):
10371062
def _get_hashval(self):
10381063
"""Compute hash including iterfield lists."""
10391064
self._get_inputs()
1065+
1066+
if self._hashvalue is not None and self._hashed_inputs is not None:
1067+
return self._hashed_inputs, self._hashvalue
1068+
10401069
self._check_iterfield()
10411070
hashinputs = deepcopy(self._interface.inputs)
10421071
for name in self.iterfield:
@@ -1061,7 +1090,8 @@ def _get_hashval(self):
10611090
hashobject.update(str(sorted_outputs).encode())
10621091
hashvalue = hashobject.hexdigest()
10631092
hashed_inputs.append(('needed_outputs', sorted_outputs))
1064-
return hashed_inputs, hashvalue
1093+
self._hashed_inputs, self._hashvalue = hashed_inputs, hashvalue
1094+
return self._hashed_inputs, self._hashvalue
10651095

10661096
@property
10671097
def inputs(self):

nipype/pipeline/engine/tests/test_nodes.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ def test_node_hash(tmpdir):
187187
from nipype.interfaces.utility import Function
188188
tmpdir.chdir()
189189

190+
config.set_default_config()
191+
config.set('execution', 'stop_on_first_crash', True)
192+
config.set('execution', 'crashdump_dir', os.getcwd())
193+
190194
def func1():
191195
return 1
192196

@@ -216,31 +220,27 @@ class EngineTestException(Exception):
216220

217221
class RaiseError(DistributedPluginBase):
218222
def _submit_job(self, node, updatehash=False):
219-
raise EngineTestException('Submit called')
223+
raise EngineTestException(
224+
'Submit called - cached=%s, updated=%s' % node.is_cached())
220225

221226
# check if a proper exception is raised
222227
with pytest.raises(EngineTestException) as excinfo:
223228
w1.run(plugin=RaiseError())
224-
assert 'Submit called' == str(excinfo.value)
229+
assert str(excinfo.value).startswith('Submit called')
225230

226231
# generate outputs
227232
w1.run(plugin='Linear')
228233
# ensure plugin is being called
229-
w1.config['execution'] = {
230-
'stop_on_first_crash': 'true',
231-
'local_hash_check': 'false',
232-
'crashdump_dir': os.getcwd()
233-
}
234+
config.set('execution', 'local_hash_check', False)
234235

235236
# rerun to ensure we have outputs
236237
w1.run(plugin='Linear')
237-
# set local check
238-
w1.config['execution'] = {
239-
'stop_on_first_crash': 'true',
240-
'local_hash_check': 'true',
241-
'crashdump_dir': os.getcwd()
242-
}
243238

239+
# set local check
240+
config.set('execution', 'local_hash_check', True)
241+
w1 = pe.Workflow(name='test')
242+
w1.connect(n1, ('a', modify), n2, 'a')
243+
w1.base_dir = os.getcwd()
244244
w1.run(plugin=RaiseError())
245245

246246

0 commit comments

Comments
 (0)