diff --git a/durabletask/internal/proto_task_hub_sidecar_service_stub.py b/durabletask/internal/proto_task_hub_sidecar_service_stub.py new file mode 100644 index 0000000..f91a15c --- /dev/null +++ b/durabletask/internal/proto_task_hub_sidecar_service_stub.py @@ -0,0 +1,34 @@ +from typing import Any, Callable, Protocol + + +class ProtoTaskHubSidecarServiceStub(Protocol): + """A stub class roughly matching the TaskHubSidecarServiceStub generated from the .proto file. + Used by Azure Functions during orchestration and entity executions to inject custom behavior, + as no real sidecar stub is available. + """ + Hello: Callable[..., Any] + StartInstance: Callable[..., Any] + GetInstance: Callable[..., Any] + RewindInstance: Callable[..., Any] + WaitForInstanceStart: Callable[..., Any] + WaitForInstanceCompletion: Callable[..., Any] + RaiseEvent: Callable[..., Any] + TerminateInstance: Callable[..., Any] + SuspendInstance: Callable[..., Any] + ResumeInstance: Callable[..., Any] + QueryInstances: Callable[..., Any] + PurgeInstances: Callable[..., Any] + GetWorkItems: Callable[..., Any] + CompleteActivityTask: Callable[..., Any] + CompleteOrchestratorTask: Callable[..., Any] + CompleteEntityTask: Callable[..., Any] + StreamInstanceHistory: Callable[..., Any] + CreateTaskHub: Callable[..., Any] + DeleteTaskHub: Callable[..., Any] + SignalEntity: Callable[..., Any] + GetEntity: Callable[..., Any] + QueryEntities: Callable[..., Any] + CleanEntityStorage: Callable[..., Any] + AbandonTaskActivityWorkItem: Callable[..., Any] + AbandonTaskOrchestratorWorkItem: Callable[..., Any] + AbandonTaskEntityWorkItem: Callable[..., Any] diff --git a/durabletask/worker.py b/durabletask/worker.py index fae345c..5e95ca8 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -23,6 +23,7 @@ from durabletask.internal.helpers import new_timestamp from durabletask.entities import DurableEntity, EntityLock, EntityInstanceId, EntityContext from durabletask.internal.orchestration_entity_context import OrchestrationEntityContext +from durabletask.internal.proto_task_hub_sidecar_service_stub import ProtoTaskHubSidecarServiceStub import durabletask.internal.helpers as ph import durabletask.internal.exceptions as pe import durabletask.internal.orchestrator_service_pb2 as pb @@ -629,7 +630,7 @@ def stop(self): def _execute_orchestrator( self, req: pb.OrchestratorRequest, - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): try: @@ -677,7 +678,7 @@ def _execute_orchestrator( def _cancel_orchestrator( self, req: pb.OrchestratorRequest, - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): stub.AbandonTaskOrchestratorWorkItem( @@ -690,7 +691,7 @@ def _cancel_orchestrator( def _execute_activity( self, req: pb.ActivityRequest, - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): instance_id = req.orchestrationInstance.instanceId @@ -723,7 +724,7 @@ def _execute_activity( def _cancel_activity( self, req: pb.ActivityRequest, - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): stub.AbandonTaskActivityWorkItem( @@ -736,7 +737,7 @@ def _cancel_activity( def _execute_entity_batch( self, req: Union[pb.EntityBatchRequest, pb.EntityRequest], - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): if isinstance(req, pb.EntityRequest): @@ -804,7 +805,7 @@ def _execute_entity_batch( def _cancel_entity_batch( self, req: Union[pb.EntityBatchRequest, pb.EntityRequest], - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): stub.AbandonTaskEntityWorkItem(