Skip to content

Commit 18145f8

Browse files
committed
Refine entity support
- Still needs eventSent and eventRecieved implementations
1 parent 57de878 commit 18145f8

File tree

6 files changed

+64
-33
lines changed

6 files changed

+64
-33
lines changed

durabletask-azurefunctions/durabletask/azurefunctions/client.py

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
from datetime import timedelta
77
from typing import Any, Optional
88
import azure.functions as func
9+
from urllib.parse import urlparse, quote
910

1011
from durabletask.entities import EntityInstanceId
1112
from durabletask.client import TaskHubGrpcClient
1213
from durabletask.azurefunctions.internal.azurefunctions_grpc_interceptor import AzureFunctionsDefaultClientInterceptorImpl
14+
from durabletask.azurefunctions.http import HttpManagementPayload
1315

1416

1517
# Client class used for Durable Functions
@@ -56,30 +58,32 @@ def create_check_status_response(self, request: func.HttpRequest, instance_id: s
5658
request (func.HttpRequest): The incoming HTTP request.
5759
instance_id (str): The ID of the Durable Function instance.
5860
"""
59-
raise NotImplementedError("This method is not implemented yet.")
61+
location_url = self._get_instance_status_url(request, instance_id)
62+
return func.HttpResponse(
63+
body=str(self._get_client_response_links(request, instance_id)),
64+
status_code=501,
65+
headers={
66+
'content-type': 'application/json',
67+
'Location': location_url,
68+
},
69+
)
6070

61-
def create_http_management_payload(self, instance_id: str) -> dict[str, str]:
71+
def create_http_management_payload(self, request: func.HttpRequest, instance_id: str) -> HttpManagementPayload:
6272
"""Creates an HTTP management payload for a Durable Function instance.
6373
6474
Args:
6575
instance_id (str): The ID of the Durable Function instance.
6676
"""
67-
raise NotImplementedError("This method is not implemented yet.")
77+
return self._get_client_response_links(request, instance_id)
6878

69-
def read_entity_state(
70-
self,
71-
entity_id: EntityInstanceId,
72-
task_hub_name: Optional[str],
73-
connection_name: Optional[str]
74-
) -> tuple[bool, Any]:
75-
"""Reads the state of a Durable Entity.
79+
def _get_client_response_links(self, request: func.HttpRequest, instance_id: str) -> HttpManagementPayload:
80+
instance_status_url = self._get_instance_status_url(request, instance_id)
81+
return HttpManagementPayload(instance_id, instance_status_url, self.requiredQueryStringParameters)
7682

77-
Args:
78-
entity_id (str): The ID of the Durable Entity.
79-
task_hub_name (Optional[str]): The name of the task hub.
80-
connection_name (Optional[str]): The name of the connection.
81-
82-
Returns:
83-
(bool, Any): A tuple containing a boolean indicating if the entity exists and its state.
84-
"""
85-
raise NotImplementedError("This method is not implemented yet.")
83+
@staticmethod
84+
def _get_instance_status_url(request: func.HttpRequest, instance_id: str) -> str:
85+
request_url = urlparse(request.url)
86+
location_url = f"{request_url.scheme}://{request_url.netloc}{request_url.path}"
87+
encoded_instance_id = quote(instance_id)
88+
location_url = location_url + "/runtime/webhooks/durabletask/instances/" + encoded_instance_id
89+
return location_url

durabletask-azurefunctions/durabletask/azurefunctions/decorators/durable_app.py

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ def stub_complete(stub_response):
9696
# The Python worker returns the input as type "json", so double-encoding is necessary
9797
return '"' + base64.b64encode(response.SerializeToString()).decode('utf-8') + '"'
9898

99-
handle.orchestrator_function = orchestrator_func
99+
handle.orchestrator_function = orchestrator_func # type: ignore
100100

101101
# invoke next decorator, with the Orchestrator as input
102102
handle.__name__ = orchestrator_func.__name__
@@ -131,15 +131,7 @@ def handle(context) -> str:
131131
context_body = context
132132
orchestration_context = context_body
133133
request = EntityBatchRequest()
134-
request_2 = EntityRequest()
135-
try:
136-
request.ParseFromString(base64.b64decode(orchestration_context))
137-
except Exception:
138-
pass
139-
try:
140-
request_2.ParseFromString(base64.b64decode(orchestration_context))
141-
except Exception:
142-
pass
134+
request.ParseFromString(base64.b64decode(orchestration_context))
143135
stub = AzureFunctionsNullStub()
144136
worker = DurableFunctionsWorker()
145137
response: Optional[EntityBatchResult] = None
@@ -157,7 +149,7 @@ def stub_complete(stub_response: EntityBatchResult):
157149
# The Python worker returns the input as type "json", so double-encoding is necessary
158150
return '"' + base64.b64encode(response.SerializeToString()).decode('utf-8') + '"'
159151

160-
handle.entity_function = entity_func
152+
handle.entity_function = entity_func # type: ignore
161153

162154
# invoke next decorator, with the Entity as input
163155
handle.__name__ = entity_func.__name__
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
from durabletask.azurefunctions.http.http_management_payload import HttpManagementPayload
5+
6+
__all__ = ["HttpManagementPayload"]
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import json
2+
3+
4+
class HttpManagementPayload:
5+
def __init__(self, instance_id: str, instance_status_url: str, required_query_string_parameters: str):
6+
self.urls = {
7+
'id': instance_id,
8+
'purgeHistoryDeleteUri': instance_status_url + "?" + required_query_string_parameters,
9+
'restartPostUri': instance_status_url + "/restart?" + required_query_string_parameters,
10+
'sendEventPostUri': instance_status_url + "/raiseEvent/{eventName}?" + required_query_string_parameters,
11+
'statusQueryGetUri': instance_status_url + "?" + required_query_string_parameters,
12+
'terminatePostUri': instance_status_url + "/terminate?reason={text}&" + required_query_string_parameters,
13+
'resumePostUri': instance_status_url + "/resume?reason={text}&" + required_query_string_parameters,
14+
'suspendPostUri': instance_status_url + "/suspend?reason={text}&" + required_query_string_parameters
15+
}
16+
17+
def __str__(self):
18+
return json.dumps(self.urls)

durabletask/internal/helpers.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import traceback
55
from datetime import datetime
66
from typing import Optional
7+
import uuid
78

89
from google.protobuf import timestamp_pb2, wrappers_pb2
910

@@ -197,8 +198,9 @@ def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str],
197198

198199

199200
def new_call_entity_action(id: int, parent_instance_id: str, entity_id: EntityInstanceId, operation: str, encoded_input: Optional[str]):
201+
request_id = str(uuid.uuid4())
200202
return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationCalled=pb.EntityOperationCalledEvent(
201-
requestId=f"{parent_instance_id}:{id}",
203+
requestId=request_id,
202204
operation=operation,
203205
scheduledTime=None,
204206
input=get_string_value(encoded_input),
@@ -209,8 +211,9 @@ def new_call_entity_action(id: int, parent_instance_id: str, entity_id: EntityIn
209211

210212

211213
def new_signal_entity_action(id: int, entity_id: EntityInstanceId, operation: str, encoded_input: Optional[str]):
214+
request_id = str(uuid.uuid4())
212215
return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationSignaled=pb.EntityOperationSignaledEvent(
213-
requestId=f"{entity_id}:{id}",
216+
requestId=request_id,
214217
operation=operation,
215218
scheduledTime=None,
216219
input=get_string_value(encoded_input),

durabletask/worker.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from types import GeneratorType
1414
from enum import Enum
1515
from typing import Any, Generator, Optional, Sequence, TypeVar, Union
16+
import uuid
1617
from packaging.version import InvalidVersion, parse
1718

1819
import grpc
@@ -740,6 +741,7 @@ def _execute_entity_batch(
740741
stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub],
741742
completionToken,
742743
):
744+
operation_infos = None
743745
if isinstance(req, pb.EntityRequest):
744746
req, operation_infos = helpers.convert_to_entity_batch_request(req)
745747

@@ -1200,7 +1202,7 @@ def lock_entities_function_helper(self, id: int, entities: list[EntityInstanceId
12001202
if not transition_valid:
12011203
raise RuntimeError(error_message)
12021204

1203-
critical_section_id = f"{self.instance_id}:{id:04x}"
1205+
critical_section_id = str(uuid.uuid4())
12041206

12051207
request, target = self._entity_context.emit_acquire_message(critical_section_id, entities)
12061208

@@ -1747,6 +1749,12 @@ def process_event(
17471749
elif event.HasField("orchestratorCompleted"):
17481750
# Added in Functions only (for some reason) and does not affect orchestrator flow
17491751
pass
1752+
elif event.HasField("eventSent"):
1753+
# Added in Functions only (for some reason) and does not affect orchestrator flow
1754+
pass
1755+
elif event.HasField("eventRaised"):
1756+
# Added in Functions only (for some reason) and does not affect orchestrator flow
1757+
pass
17501758
else:
17511759
eventType = event.WhichOneof("eventType")
17521760
raise task.OrchestrationStateError(

0 commit comments

Comments
 (0)