diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 801719785..a36fc2170 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -327,34 +327,32 @@ def activate( self._is_replaying = act.is_replaying activation_err: Optional[Exception] = None + + # Split into job sets with patches, then signals + updates, then + # non-queries, then queries + job_sets: List[ + List[temporalio.bridge.proto.workflow_activation.WorkflowActivationJob] + ] = [[], [], [], []] + for job in act.jobs: + if job.HasField("notify_has_patch"): + job_sets[0].append(job) + elif job.HasField("signal_workflow") or job.HasField("do_update"): + job_sets[1].append(job) + elif not job.HasField("query_workflow"): + job_sets[2].append(job) + else: + job_sets[3].append(job) + + for job_set in job_sets: + for job in job_set: + # Let errors bubble out of these to the caller to fail the task + self._apply(job) + try: - # Split into job sets with patches, then signals + updates, then - # non-queries, then queries - job_sets: List[ - List[temporalio.bridge.proto.workflow_activation.WorkflowActivationJob] - ] = [[], [], [], []] - for job in act.jobs: - if job.HasField("notify_has_patch"): - job_sets[0].append(job) - elif job.HasField("signal_workflow") or job.HasField("do_update"): - job_sets[1].append(job) - elif not job.HasField("query_workflow"): - job_sets[2].append(job) - else: - job_sets[3].append(job) - - # Apply every job set, running after each set - for index, job_set in enumerate(job_sets): - if not job_set: - continue - for job in job_set: - # Let errors bubble out of these to the caller to fail the task - self._apply(job) - - # Run one iteration of the loop. We do not allow conditions to - # be checked in patch jobs (first index) or query jobs (last - # index). - self._run_once(check_conditions=index == 1 or index == 2) + # Run one iteration of the loop. We do not allow conditions to + # be checked in patch jobs (first index) or query jobs (last + # index). + self._run_once(check_conditions=True) except Exception as err: # We want some errors during activation, like those that can happen # during payload conversion, to be able to fail the workflow not the