From febabb04db3614cea5cb461840e617358ea039dd Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Fri, 5 Dec 2025 11:25:43 -0700 Subject: [PATCH 1/3] Allow passing custom stub to execute operations --- .../proto_task_hub_sidecar_service_stub.py | 38 +++++++++++++++++++ durabletask/worker.py | 13 ++++--- 2 files changed, 45 insertions(+), 6 deletions(-) create mode 100644 durabletask/internal/proto_task_hub_sidecar_service_stub.py diff --git a/durabletask/internal/proto_task_hub_sidecar_service_stub.py b/durabletask/internal/proto_task_hub_sidecar_service_stub.py new file mode 100644 index 0000000..7ccfd58 --- /dev/null +++ b/durabletask/internal/proto_task_hub_sidecar_service_stub.py @@ -0,0 +1,38 @@ +from typing import Callable + + +class ProtoTaskHubSidecarServiceStub(object): + """A stub class roughly matching the TaskHubSidecarServiceStub generated from the .proto file. + Used by Azure Functions during orchestration and entity executions to inject custom behavior, + as no real sidecar stub is available. + """ + + def __init__(self): + """Constructor. + """ + self.Hello: Callable[..., None] + self.StartInstance: Callable[..., None] + self.GetInstance: Callable[..., None] + self.RewindInstance: Callable[..., None] + self.WaitForInstanceStart: Callable[..., None] + self.WaitForInstanceCompletion: Callable[..., None] + self.RaiseEvent: Callable[..., None] + self.TerminateInstance: Callable[..., None] + self.SuspendInstance: Callable[..., None] + self.ResumeInstance: Callable[..., None] + self.QueryInstances: Callable[..., None] + self.PurgeInstances: Callable[..., None] + self.GetWorkItems: Callable[..., None] + self.CompleteActivityTask: Callable[..., None] + self.CompleteOrchestratorTask: Callable[..., None] + self.CompleteEntityTask: Callable[..., None] + self.StreamInstanceHistory: Callable[..., None] + self.CreateTaskHub: Callable[..., None] + self.DeleteTaskHub: Callable[..., None] + self.SignalEntity: Callable[..., None] + self.GetEntity: Callable[..., None] + self.QueryEntities: Callable[..., None] + self.CleanEntityStorage: Callable[..., None] + self.AbandonTaskActivityWorkItem: Callable[..., None] + self.AbandonTaskOrchestratorWorkItem: Callable[..., None] + self.AbandonTaskEntityWorkItem: Callable[..., None] diff --git a/durabletask/worker.py b/durabletask/worker.py index fae345c..5e95ca8 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -23,6 +23,7 @@ from durabletask.internal.helpers import new_timestamp from durabletask.entities import DurableEntity, EntityLock, EntityInstanceId, EntityContext from durabletask.internal.orchestration_entity_context import OrchestrationEntityContext +from durabletask.internal.proto_task_hub_sidecar_service_stub import ProtoTaskHubSidecarServiceStub import durabletask.internal.helpers as ph import durabletask.internal.exceptions as pe import durabletask.internal.orchestrator_service_pb2 as pb @@ -629,7 +630,7 @@ def stop(self): def _execute_orchestrator( self, req: pb.OrchestratorRequest, - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): try: @@ -677,7 +678,7 @@ def _execute_orchestrator( def _cancel_orchestrator( self, req: pb.OrchestratorRequest, - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): stub.AbandonTaskOrchestratorWorkItem( @@ -690,7 +691,7 @@ def _cancel_orchestrator( def _execute_activity( self, req: pb.ActivityRequest, - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): instance_id = req.orchestrationInstance.instanceId @@ -723,7 +724,7 @@ def _execute_activity( def _cancel_activity( self, req: pb.ActivityRequest, - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): stub.AbandonTaskActivityWorkItem( @@ -736,7 +737,7 @@ def _cancel_activity( def _execute_entity_batch( self, req: Union[pb.EntityBatchRequest, pb.EntityRequest], - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): if isinstance(req, pb.EntityRequest): @@ -804,7 +805,7 @@ def _execute_entity_batch( def _cancel_entity_batch( self, req: Union[pb.EntityBatchRequest, pb.EntityRequest], - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): stub.AbandonTaskEntityWorkItem( From acf3ec39c09ad5b823abc415b47ed1d98743069c Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Fri, 5 Dec 2025 11:56:01 -0700 Subject: [PATCH 2/3] Typehint with Any instead of None --- .../proto_task_hub_sidecar_service_stub.py | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/durabletask/internal/proto_task_hub_sidecar_service_stub.py b/durabletask/internal/proto_task_hub_sidecar_service_stub.py index 7ccfd58..bb49684 100644 --- a/durabletask/internal/proto_task_hub_sidecar_service_stub.py +++ b/durabletask/internal/proto_task_hub_sidecar_service_stub.py @@ -1,4 +1,4 @@ -from typing import Callable +from typing import Any, Callable class ProtoTaskHubSidecarServiceStub(object): @@ -10,29 +10,29 @@ class ProtoTaskHubSidecarServiceStub(object): def __init__(self): """Constructor. """ - self.Hello: Callable[..., None] - self.StartInstance: Callable[..., None] - self.GetInstance: Callable[..., None] - self.RewindInstance: Callable[..., None] - self.WaitForInstanceStart: Callable[..., None] - self.WaitForInstanceCompletion: Callable[..., None] - self.RaiseEvent: Callable[..., None] - self.TerminateInstance: Callable[..., None] - self.SuspendInstance: Callable[..., None] - self.ResumeInstance: Callable[..., None] - self.QueryInstances: Callable[..., None] - self.PurgeInstances: Callable[..., None] - self.GetWorkItems: Callable[..., None] - self.CompleteActivityTask: Callable[..., None] - self.CompleteOrchestratorTask: Callable[..., None] - self.CompleteEntityTask: Callable[..., None] - self.StreamInstanceHistory: Callable[..., None] - self.CreateTaskHub: Callable[..., None] - self.DeleteTaskHub: Callable[..., None] - self.SignalEntity: Callable[..., None] - self.GetEntity: Callable[..., None] - self.QueryEntities: Callable[..., None] - self.CleanEntityStorage: Callable[..., None] - self.AbandonTaskActivityWorkItem: Callable[..., None] - self.AbandonTaskOrchestratorWorkItem: Callable[..., None] - self.AbandonTaskEntityWorkItem: Callable[..., None] + self.Hello: Callable[..., Any] + self.StartInstance: Callable[..., Any] + self.GetInstance: Callable[..., Any] + self.RewindInstance: Callable[..., Any] + self.WaitForInstanceStart: Callable[..., Any] + self.WaitForInstanceCompletion: Callable[..., Any] + self.RaiseEvent: Callable[..., Any] + self.TerminateInstance: Callable[..., Any] + self.SuspendInstance: Callable[..., Any] + self.ResumeInstance: Callable[..., Any] + self.QueryInstances: Callable[..., Any] + self.PurgeInstances: Callable[..., Any] + self.GetWorkItems: Callable[..., Any] + self.CompleteActivityTask: Callable[..., Any] + self.CompleteOrchestratorTask: Callable[..., Any] + self.CompleteEntityTask: Callable[..., Any] + self.StreamInstanceHistory: Callable[..., Any] + self.CreateTaskHub: Callable[..., Any] + self.DeleteTaskHub: Callable[..., Any] + self.SignalEntity: Callable[..., Any] + self.GetEntity: Callable[..., Any] + self.QueryEntities: Callable[..., Any] + self.CleanEntityStorage: Callable[..., Any] + self.AbandonTaskActivityWorkItem: Callable[..., Any] + self.AbandonTaskOrchestratorWorkItem: Callable[..., Any] + self.AbandonTaskEntityWorkItem: Callable[..., Any] From e5487002f027b58a3faf2de08dc8aaf235f54200 Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Fri, 5 Dec 2025 12:14:40 -0700 Subject: [PATCH 3/3] Use Protocol instead of init with properties --- .../proto_task_hub_sidecar_service_stub.py | 60 +++++++++---------- 1 file changed, 28 insertions(+), 32 deletions(-) diff --git a/durabletask/internal/proto_task_hub_sidecar_service_stub.py b/durabletask/internal/proto_task_hub_sidecar_service_stub.py index bb49684..f91a15c 100644 --- a/durabletask/internal/proto_task_hub_sidecar_service_stub.py +++ b/durabletask/internal/proto_task_hub_sidecar_service_stub.py @@ -1,38 +1,34 @@ -from typing import Any, Callable +from typing import Any, Callable, Protocol -class ProtoTaskHubSidecarServiceStub(object): +class ProtoTaskHubSidecarServiceStub(Protocol): """A stub class roughly matching the TaskHubSidecarServiceStub generated from the .proto file. Used by Azure Functions during orchestration and entity executions to inject custom behavior, as no real sidecar stub is available. """ - - def __init__(self): - """Constructor. - """ - self.Hello: Callable[..., Any] - self.StartInstance: Callable[..., Any] - self.GetInstance: Callable[..., Any] - self.RewindInstance: Callable[..., Any] - self.WaitForInstanceStart: Callable[..., Any] - self.WaitForInstanceCompletion: Callable[..., Any] - self.RaiseEvent: Callable[..., Any] - self.TerminateInstance: Callable[..., Any] - self.SuspendInstance: Callable[..., Any] - self.ResumeInstance: Callable[..., Any] - self.QueryInstances: Callable[..., Any] - self.PurgeInstances: Callable[..., Any] - self.GetWorkItems: Callable[..., Any] - self.CompleteActivityTask: Callable[..., Any] - self.CompleteOrchestratorTask: Callable[..., Any] - self.CompleteEntityTask: Callable[..., Any] - self.StreamInstanceHistory: Callable[..., Any] - self.CreateTaskHub: Callable[..., Any] - self.DeleteTaskHub: Callable[..., Any] - self.SignalEntity: Callable[..., Any] - self.GetEntity: Callable[..., Any] - self.QueryEntities: Callable[..., Any] - self.CleanEntityStorage: Callable[..., Any] - self.AbandonTaskActivityWorkItem: Callable[..., Any] - self.AbandonTaskOrchestratorWorkItem: Callable[..., Any] - self.AbandonTaskEntityWorkItem: Callable[..., Any] + Hello: Callable[..., Any] + StartInstance: Callable[..., Any] + GetInstance: Callable[..., Any] + RewindInstance: Callable[..., Any] + WaitForInstanceStart: Callable[..., Any] + WaitForInstanceCompletion: Callable[..., Any] + RaiseEvent: Callable[..., Any] + TerminateInstance: Callable[..., Any] + SuspendInstance: Callable[..., Any] + ResumeInstance: Callable[..., Any] + QueryInstances: Callable[..., Any] + PurgeInstances: Callable[..., Any] + GetWorkItems: Callable[..., Any] + CompleteActivityTask: Callable[..., Any] + CompleteOrchestratorTask: Callable[..., Any] + CompleteEntityTask: Callable[..., Any] + StreamInstanceHistory: Callable[..., Any] + CreateTaskHub: Callable[..., Any] + DeleteTaskHub: Callable[..., Any] + SignalEntity: Callable[..., Any] + GetEntity: Callable[..., Any] + QueryEntities: Callable[..., Any] + CleanEntityStorage: Callable[..., Any] + AbandonTaskActivityWorkItem: Callable[..., Any] + AbandonTaskOrchestratorWorkItem: Callable[..., Any] + AbandonTaskEntityWorkItem: Callable[..., Any]