Skip to content

Commit 57de878

Browse files
committed
Add entity support (needs extension change)
1 parent 3497148 commit 57de878

File tree

2 files changed

+45
-4
lines changed

2 files changed

+45
-4
lines changed

durabletask-azurefunctions/durabletask/azurefunctions/decorators/durable_app.py

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import base64
55
from functools import wraps
66

7-
from durabletask.internal.orchestrator_service_pb2 import OrchestratorRequest, OrchestratorResponse
7+
from durabletask.internal.orchestrator_service_pb2 import EntityRequest, EntityBatchRequest, EntityBatchResult, OrchestratorRequest, OrchestratorResponse
88
from .metadata import OrchestrationTrigger, ActivityTrigger, EntityTrigger, \
99
DurableClient
1010
from typing import Callable, Optional
@@ -119,8 +119,49 @@ def _configure_entity_callable(self, wrap) -> Callable:
119119
wrapped by the next decorator in the sequence.
120120
"""
121121
def decorator(entity_func):
122-
# TODO: Implement entity support - similar to orchestrators (?)
123-
raise NotImplementedError()
122+
# Construct an orchestrator based on the end-user code
123+
124+
# TODO: Move this logic somewhere better
125+
# TODO: Because this handle method is the one actually exposed to the Functions SDK decorator,
126+
# the parameter name will always be "context" here, even if the user specified a different name.
127+
# We need to find a way to allow custom context names (like "ctx").
128+
def handle(context) -> str:
129+
context_body = getattr(context, "body", None)
130+
if context_body is None:
131+
context_body = context
132+
orchestration_context = context_body
133+
request = EntityBatchRequest()
134+
request_2 = EntityRequest()
135+
try:
136+
request.ParseFromString(base64.b64decode(orchestration_context))
137+
except Exception:
138+
pass
139+
try:
140+
request_2.ParseFromString(base64.b64decode(orchestration_context))
141+
except Exception:
142+
pass
143+
stub = AzureFunctionsNullStub()
144+
worker = DurableFunctionsWorker()
145+
response: Optional[EntityBatchResult] = None
146+
147+
def stub_complete(stub_response: EntityBatchResult):
148+
nonlocal response
149+
response = stub_response
150+
stub.CompleteEntityTask = stub_complete
151+
152+
worker.add_entity(entity_func)
153+
worker._execute_entity_batch(request, stub, None)
154+
155+
if response is None:
156+
raise Exception("Entity execution did not produce a response.")
157+
# The Python worker returns the input as type "json", so double-encoding is necessary
158+
return '"' + base64.b64encode(response.SerializeToString()).decode('utf-8') + '"'
159+
160+
handle.entity_function = entity_func
161+
162+
# invoke next decorator, with the Entity as input
163+
handle.__name__ = entity_func.__name__
164+
return wrap(handle)
124165

125166
return decorator
126167

durabletask/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -737,7 +737,7 @@ def _cancel_activity(
737737
def _execute_entity_batch(
738738
self,
739739
req: Union[pb.EntityBatchRequest, pb.EntityRequest],
740-
stub: stubs.TaskHubSidecarServiceStub,
740+
stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub],
741741
completionToken,
742742
):
743743
if isinstance(req, pb.EntityRequest):

0 commit comments

Comments
 (0)