Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 25 additions & 27 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,34 +327,32 @@ def activate(
self._is_replaying = act.is_replaying

activation_err: Optional[Exception] = None

# Split into job sets with patches, then signals + updates, then
# non-queries, then queries
job_sets: List[
List[temporalio.bridge.proto.workflow_activation.WorkflowActivationJob]
] = [[], [], [], []]
for job in act.jobs:
if job.HasField("notify_has_patch"):
job_sets[0].append(job)
elif job.HasField("signal_workflow") or job.HasField("do_update"):
job_sets[1].append(job)
elif not job.HasField("query_workflow"):
job_sets[2].append(job)
else:
job_sets[3].append(job)

for job_set in job_sets:
for job in job_set:
# Let errors bubble out of these to the caller to fail the task
self._apply(job)

try:
# Split into job sets with patches, then signals + updates, then
# non-queries, then queries
job_sets: List[
List[temporalio.bridge.proto.workflow_activation.WorkflowActivationJob]
] = [[], [], [], []]
for job in act.jobs:
if job.HasField("notify_has_patch"):
job_sets[0].append(job)
elif job.HasField("signal_workflow") or job.HasField("do_update"):
job_sets[1].append(job)
elif not job.HasField("query_workflow"):
job_sets[2].append(job)
else:
job_sets[3].append(job)

# Apply every job set, running after each set
for index, job_set in enumerate(job_sets):
if not job_set:
continue
for job in job_set:
# Let errors bubble out of these to the caller to fail the task
self._apply(job)

# Run one iteration of the loop. We do not allow conditions to
# be checked in patch jobs (first index) or query jobs (last
# index).
self._run_once(check_conditions=index == 1 or index == 2)
# Run one iteration of the loop. We do not allow conditions to
# be checked in patch jobs (first index) or query jobs (last
# index).
self._run_once(check_conditions=True)
except Exception as err:
# We want some errors during activation, like those that can happen
# during payload conversion, to be able to fail the workflow not the
Expand Down