Skip to content

Commit 97bfd31

Browse files
committed
fix tests
Signed-off-by: Fabian Martinez <[email protected]>
1 parent 01b1eff commit 97bfd31

File tree

2 files changed

+114
-55
lines changed

2 files changed

+114
-55
lines changed

durabletask/worker.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,8 @@ def stop(self):
188188
def _execute_orchestrator(self, req: pb.OrchestratorRequest, stub: stubs.TaskHubSidecarServiceStub):
189189
try:
190190
executor = _OrchestrationExecutor(self._registry, self._logger)
191-
actions, custom_status = executor.execute(req.instanceId, req.pastEvents, req.newEvents)
192-
res = pb.OrchestratorResponse(instanceId=req.instanceId, actions=actions, customStatus=wrappers_pb2.StringValue(value=custom_status))
191+
result = executor.execute(req.instanceId, req.pastEvents, req.newEvents)
192+
res = pb.OrchestratorResponse(instanceId=req.instanceId, actions=result.actions, customStatus=wrappers_pb2.StringValue(value=result.custom_status))
193193
except Exception as ex:
194194
self._logger.exception(f"An error occurred while trying to execute instance '{req.instanceId}': {ex}")
195195
failure_details = pbh.new_failure_details(ex)
@@ -461,6 +461,14 @@ def continue_as_new(self, new_input, *, save_events: bool = False) -> None:
461461
self.set_continued_as_new(new_input, save_events)
462462

463463

464+
class ExecutionResults:
465+
actions: List[pb.OrchestratorAction]
466+
custom_status: str
467+
468+
def __init__(self, actions: List[pb.OrchestratorAction], custom_status: str):
469+
self.actions = actions
470+
self.custom_status = custom_status
471+
464472
class _OrchestrationExecutor:
465473
_generator: Optional[task.Orchestrator] = None
466474

@@ -470,7 +478,7 @@ def __init__(self, registry: _Registry, logger: logging.Logger):
470478
self._is_suspended = False
471479
self._suspended_events: List[pb.HistoryEvent] = []
472480

473-
def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_events: Sequence[pb.HistoryEvent]) -> Tuple[List[pb.OrchestratorAction],str]:
481+
def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_events: Sequence[pb.HistoryEvent]) -> ExecutionResults:
474482
if not new_events:
475483
raise task.OrchestrationStateError("The new history event list must have at least one event in it.")
476484

@@ -505,7 +513,7 @@ def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_e
505513
actions = ctx.get_actions()
506514
if self._logger.level <= logging.DEBUG:
507515
self._logger.debug(f"{instance_id}: Returning {len(actions)} action(s): {_get_action_summary(actions)}")
508-
return actions, ctx._custom_status
516+
return ExecutionResults(actions=actions, custom_status=ctx._custom_status)
509517

510518
def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEvent) -> None:
511519
if self._is_suspended and _is_suspendable(event):

0 commit comments

Comments
 (0)