@@ -76,7 +76,6 @@ async def submit_from_call(self, runnable, rerun):
76
76
await self .expand_runnable (runnable , wait = True )
77
77
return True
78
78
79
-
80
79
async def expand_runnable (self , runnable , wait = False , rerun = False ):
81
80
"""
82
81
This coroutine handles state expansion.
@@ -117,10 +116,7 @@ async def expand_runnable(self, runnable, wait=False, rerun=False):
117
116
asyncio .create_task (load_and_run_async (task_pkl , sidx , self , rerun ))
118
117
)
119
118
else :
120
- futures .add (
121
- self .worker .run_el ((task_pkl , sidx , runnable ), rerun = rerun )
122
- )
123
-
119
+ futures .add (self .worker .run_el ((task_pkl , sidx , runnable ), rerun = rerun ))
124
120
125
121
if wait and futures :
126
122
# if wait is True, we are at the end of the graph / state expansion.
@@ -130,7 +126,6 @@ async def expand_runnable(self, runnable, wait=False, rerun=False):
130
126
# pass along futures to be awaited independently
131
127
return futures
132
128
133
-
134
129
async def expand_workflow (self , wf , rerun = False ):
135
130
"""
136
131
Expand and execute a stateless :class:`~pydra.engine.core.Workflow`.
@@ -278,11 +273,8 @@ def is_runnable(graph, obj):
278
273
return True
279
274
280
275
281
-
282
276
async def prepare_runnable_with_state (runnable ):
283
277
runnable .state .prepare_states (runnable .inputs , cont_dim = runnable .cont_dim )
284
278
runnable .state .prepare_inputs ()
285
- logger .debug (
286
- f"Expanding { runnable } into { len (runnable .state .states_val )} states"
287
- )
288
- return runnable .pickle_task ()
279
+ logger .debug (f"Expanding { runnable } into { len (runnable .state .states_val )} states" )
280
+ return runnable .pickle_task ()
0 commit comments