Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions durabletask/internal/proto_task_hub_sidecar_service_stub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import Any, Callable, Protocol


class ProtoTaskHubSidecarServiceStub(Protocol):
"""A stub class roughly matching the TaskHubSidecarServiceStub generated from the .proto file.
Used by Azure Functions during orchestration and entity executions to inject custom behavior,
as no real sidecar stub is available.
"""
Hello: Callable[..., Any]
StartInstance: Callable[..., Any]
GetInstance: Callable[..., Any]
RewindInstance: Callable[..., Any]
WaitForInstanceStart: Callable[..., Any]
WaitForInstanceCompletion: Callable[..., Any]
RaiseEvent: Callable[..., Any]
TerminateInstance: Callable[..., Any]
SuspendInstance: Callable[..., Any]
ResumeInstance: Callable[..., Any]
QueryInstances: Callable[..., Any]
PurgeInstances: Callable[..., Any]
GetWorkItems: Callable[..., Any]
CompleteActivityTask: Callable[..., Any]
CompleteOrchestratorTask: Callable[..., Any]
CompleteEntityTask: Callable[..., Any]
StreamInstanceHistory: Callable[..., Any]
CreateTaskHub: Callable[..., Any]
DeleteTaskHub: Callable[..., Any]
SignalEntity: Callable[..., Any]
GetEntity: Callable[..., Any]
QueryEntities: Callable[..., Any]
CleanEntityStorage: Callable[..., Any]
AbandonTaskActivityWorkItem: Callable[..., Any]
AbandonTaskOrchestratorWorkItem: Callable[..., Any]
AbandonTaskEntityWorkItem: Callable[..., Any]
13 changes: 7 additions & 6 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from durabletask.internal.helpers import new_timestamp
from durabletask.entities import DurableEntity, EntityLock, EntityInstanceId, EntityContext
from durabletask.internal.orchestration_entity_context import OrchestrationEntityContext
from durabletask.internal.proto_task_hub_sidecar_service_stub import ProtoTaskHubSidecarServiceStub
import durabletask.internal.helpers as ph
import durabletask.internal.exceptions as pe
import durabletask.internal.orchestrator_service_pb2 as pb
Expand Down Expand Up @@ -629,7 +630,7 @@ def stop(self):
def _execute_orchestrator(
self,
req: pb.OrchestratorRequest,
stub: stubs.TaskHubSidecarServiceStub,
stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub],
completionToken,
):
try:
Expand Down Expand Up @@ -677,7 +678,7 @@ def _execute_orchestrator(
def _cancel_orchestrator(
self,
req: pb.OrchestratorRequest,
stub: stubs.TaskHubSidecarServiceStub,
stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub],
completionToken,
):
stub.AbandonTaskOrchestratorWorkItem(
Expand All @@ -690,7 +691,7 @@ def _cancel_orchestrator(
def _execute_activity(
self,
req: pb.ActivityRequest,
stub: stubs.TaskHubSidecarServiceStub,
stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub],
completionToken,
):
instance_id = req.orchestrationInstance.instanceId
Expand Down Expand Up @@ -723,7 +724,7 @@ def _execute_activity(
def _cancel_activity(
self,
req: pb.ActivityRequest,
stub: stubs.TaskHubSidecarServiceStub,
stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub],
completionToken,
):
stub.AbandonTaskActivityWorkItem(
Expand All @@ -736,7 +737,7 @@ def _cancel_activity(
def _execute_entity_batch(
self,
req: Union[pb.EntityBatchRequest, pb.EntityRequest],
stub: stubs.TaskHubSidecarServiceStub,
stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub],
completionToken,
):
if isinstance(req, pb.EntityRequest):
Expand Down Expand Up @@ -804,7 +805,7 @@ def _execute_entity_batch(
def _cancel_entity_batch(
self,
req: Union[pb.EntityBatchRequest, pb.EntityRequest],
stub: stubs.TaskHubSidecarServiceStub,
stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub],
completionToken,
):
stub.AbandonTaskEntityWorkItem(
Expand Down
Loading