1
- from azure .durable_functions .models .Task import TaskBase , TaskState , AtomicTask
1
+ from azure .durable_functions .models .Task import TaskBase , TaskState , AtomicTask , CompoundTask
2
2
from azure .durable_functions .models .OrchestratorState import OrchestratorState
3
3
from azure .durable_functions .models .DurableOrchestrationContext import DurableOrchestrationContext
4
4
from typing import Any , List , Optional , Union
@@ -229,7 +229,8 @@ def resume_user_code(self):
229
229
task_succeeded = current_task .state is TaskState .SUCCEEDED
230
230
new_task = self .generator .send (
231
231
task_value ) if task_succeeded else self .generator .throw (task_value )
232
- self .context ._add_to_open_tasks (new_task )
232
+ if isinstance (new_task , TaskBase ) and not (new_task ._is_scheduled ):
233
+ self .context ._add_to_open_tasks (new_task )
233
234
except StopIteration as stop_exception :
234
235
# the orchestration returned,
235
236
# flag it as such and capture its output
@@ -245,9 +246,17 @@ def resume_user_code(self):
245
246
# user yielded the same task multiple times, continue executing code
246
247
# until a new/not-previously-yielded task is encountered
247
248
self .resume_user_code ()
248
- else :
249
+ elif not ( self . current_task . _is_scheduled ) :
249
250
# new task is received. it needs to be resolved to a value
250
251
self .context ._add_to_actions (self .current_task .action_repr )
252
+ self ._mark_as_scheduled (self .current_task )
253
+
254
+ def _mark_as_scheduled (self , task : TaskBase ):
255
+ if isinstance (task , CompoundTask ):
256
+ for task in task .children :
257
+ self ._mark_as_scheduled (task )
258
+ else :
259
+ task ._set_is_scheduled (True )
251
260
252
261
def get_orchestrator_state_str (self ) -> str :
253
262
"""Obtain a JSON-formatted string representing the orchestration's state.
0 commit comments