Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,8 @@ def _add_to_open_tasks(self, task: TaskBase):
task.id = self._sequence_number
self._sequence_number += 1
self.open_tasks[task.id] = task
elif task.id != -1:
elif task.id != -1 and self.open_tasks[task.id] != task:
# Case when returning task_any with multiple external events having the same ID
self.open_tasks[task.id].append(task)

if task.id in self.deferred_tasks:
Expand Down
8 changes: 4 additions & 4 deletions azure/durable_functions/models/TaskOrchestrationExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,14 @@ def resume_user_code(self):

self.current_task = new_task
if not (new_task is None):
if not (self.current_task._is_scheduled):
# new task is received. it needs to be resolved to a value
self.context._add_to_actions(self.current_task.action_repr)
self._mark_as_scheduled(self.current_task)
if not (new_task.state is TaskState.RUNNING):
# user yielded the same task multiple times, continue executing code
# until a new/not-previously-yielded task is encountered
self.resume_user_code()
elif not (self.current_task._is_scheduled):
# new task is received. it needs to be resolved to a value
self.context._add_to_actions(self.current_task.action_repr)
self._mark_as_scheduled(self.current_task)

def _mark_as_scheduled(self, task: TaskBase):
if isinstance(task, CompoundTask):
Expand Down