Skip to content

Commit a669a72

Browse files
committed
reverting some of the changes; the state from intermediate tasks are not passed if final splitter is empty
1 parent eee43b7 commit a669a72

File tree

2 files changed

+12
-13
lines changed

2 files changed

+12
-13
lines changed

pydra/engine/core.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ def checksum(self):
222222
self.inputs._graph_checksums = [nd.checksum for nd in self.graph_sorted]
223223

224224
input_hash = self.inputs.hash
225-
if self.state is None or self.state.splitter_rpn == []:
225+
if self.state is None:
226226
self._checksum = create_checksum(self.__class__.__name__, input_hash)
227227
else:
228228
# including splitter in the hash
@@ -329,7 +329,7 @@ def cache_locations(self, locations):
329329
@property
330330
def output_dir(self):
331331
"""Get the filesystem path where outputs will be written."""
332-
if self.state and self.state.splitter_rpn:
332+
if self.state:
333333
return [self._cache_dir / checksum for checksum in self.checksum_states()]
334334
return self._cache_dir / self.checksum
335335

@@ -342,7 +342,7 @@ def __call__(self, submitter=None, plugin=None, rerun=False, **kwargs):
342342
plugin = plugin or self.plugin
343343
if plugin:
344344
submitter = Submitter(plugin=plugin)
345-
elif self.state and self.state.splitter_rpn:
345+
elif self.state:
346346
submitter = Submitter()
347347

348348
if submitter:
@@ -512,7 +512,7 @@ def done(self):
512512
# if any of the field is lazy, there is no need to check results
513513
if is_lazy(self.inputs):
514514
return False
515-
if self.state and self.state.splitter_rpn:
515+
if self.state:
516516
# TODO: only check for needed state result
517517
if self.result() and all(self.result()):
518518
return True
@@ -556,7 +556,7 @@ def result(self, state_index=None):
556556
"""
557557
# TODO: check if result is available in load_result and
558558
# return a future if not
559-
if self.state and self.state.splitter_rpn:
559+
if self.state:
560560
if state_index is None:
561561
# if state_index=None, collecting all results
562562
if self.state.combiner:
@@ -753,7 +753,10 @@ def create_connections(self, task):
753753
self.graph.add_edges((getattr(self, val.name), task))
754754
logger.debug("Connecting %s to %s", val.name, task.name)
755755

756-
if getattr(self, val.name).state:
756+
if (
757+
getattr(self, val.name).state
758+
and getattr(self, val.name).state.splitter_rpn_final
759+
):
757760
# adding a state from the previous task to other_states
758761
other_states[val.name] = (
759762
getattr(self, val.name).state,

pydra/engine/submitter.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,7 @@ def __call__(self, runnable, cache_locations=None, rerun=False):
5050
runnable.inputs._graph_checksums = [
5151
nd.checksum for nd in runnable.graph_sorted
5252
]
53-
if is_workflow(runnable) and (
54-
runnable.state is None or runnable.state.splitter_rpn == []
55-
):
53+
if is_workflow(runnable) and runnable.state is None:
5654
self.loop.run_until_complete(self.submit_workflow(runnable, rerun=rerun))
5755
else:
5856
self.loop.run_until_complete(self.submit(runnable, wait=True, rerun=rerun))
@@ -93,7 +91,7 @@ async def submit(self, runnable, wait=False, rerun=False):
9391
9492
"""
9593
futures = set()
96-
if runnable.state and runnable.state.splitter_rpn:
94+
if runnable.state:
9795
runnable.state.prepare_states(runnable.inputs)
9896
runnable.state.prepare_inputs()
9997
logger.debug(
@@ -156,9 +154,7 @@ async def _run_workflow(self, wf, rerun=False):
156154
task.inputs.retrieve_values(wf)
157155
# checksum has to be updated, so resetting
158156
task._checksum = None
159-
if is_workflow(task) and (
160-
not task.state or task.state.splitter_rpn == []
161-
):
157+
if is_workflow(task) and not task.state:
162158
await self.submit_workflow(task, rerun=rerun)
163159
else:
164160
for fut in await self.submit(task, rerun=rerun):

0 commit comments

Comments
 (0)