Skip to content

Commit 15fdd4a

Browse files
committed
fix: submit mapnodes should add jobs regardless of slots and provenance writer fails in some situations with map nodes
1 parent 230ce4a commit 15fdd4a

File tree

2 files changed

+4
-5
lines changed

2 files changed

+4
-5
lines changed

nipype/pipeline/plugins/base.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ def _submit_mapnode(self, jobid):
325325
return False
326326

327327
def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None):
328-
""" Sends jobs to workers using ipython's taskclient interface
328+
""" Sends jobs to workers
329329
"""
330330
while np.any(self.proc_done == False):
331331
# Check to see if a job is available
@@ -346,8 +346,6 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None):
346346
self.proc_pending[jobid] = False
347347
continue
348348
if num_subnodes > 1:
349-
if num_subnodes > (self.max_jobs - len(self.pending_tasks)):
350-
break
351349
submit = self._submit_mapnode(jobid)
352350
if not submit:
353351
continue

nipype/pipeline/utils.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,11 +1101,12 @@ def write_workflow_prov(graph, filename=None, format='turtle'):
11011101
subresult = InterfaceResult(result.interface[idx],
11021102
runtime, outputs={})
11031103
if result.inputs:
1104-
subresult.inputs = result.inputs[idx]
1104+
if idx < len(result.inputs):
1105+
subresult.inputs = result.inputs[idx]
11051106
if result.outputs:
11061107
for key, value in result.outputs.items():
11071108
values = getattr(result.outputs, key)
1108-
if isdefined(values):
1109+
if isdefined(values) and idx < len(values):
11091110
subresult.outputs[key] = values[idx]
11101111
sub_bundle = ProvStore().add_results(subresult)
11111112
ps.g = merge_bundles(ps.g, sub_bundle)

0 commit comments

Comments
 (0)