Skip to content

Commit 209443e

Browse files
committed
Fixes and improvements
- Add new_uuid method to OrchestrationContext for deterministic replay-safe UUIDs - Fix entity locking behavior for Functions - Align _RuntimeOrchestrationContext param names with OrchestrationContext - Remap __init__.py files for new module - Update version to 0.0.1dev0 - Add docstrings to missing methods - Move code for executing orchestrators/entities to DurableFunctionsWorker - Add function metadata to triggers for detection by extension
1 parent 9965ba4 commit 209443e

File tree

12 files changed

+185
-105
lines changed

12 files changed

+185
-105
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,7 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
3+
4+
from durabletask.azurefunctions.decorators.durable_app import Blueprint, DFApp
5+
from durabletask.azurefunctions.client import DurableFunctionsClient
6+
7+
__all__ = ["Blueprint", "DFApp", "DurableFunctionsClient"]

durabletask-azurefunctions/durabletask/azurefunctions/client.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616

1717
# Client class used for Durable Functions
1818
class DurableFunctionsClient(TaskHubGrpcClient):
19+
"""A gRPC client passed to Durable Functions durable client bindings.
20+
21+
Connects to the Durable Functions runtime using gRPC and provides methods
22+
for creating and managing Durable orchestrations, interacting with Durable entities,
23+
and creating HTTP management payloads and check status responses for use with Durable Functions invocations.
24+
"""
1925
taskHubName: str
2026
connectionName: str
2127
creationUrls: dict[str, str]
@@ -28,6 +34,16 @@ class DurableFunctionsClient(TaskHubGrpcClient):
2834
grpcHttpClientTimeout: timedelta
2935

3036
def __init__(self, client_as_string: str):
37+
"""Initializes a DurableFunctionsClient instance from a JSON string.
38+
39+
This string will be provided by the Durable Functions host extension upon invocation of the client trigger.
40+
41+
Args:
42+
client_as_string (str): A JSON string containing the Durable Functions client configuration.
43+
44+
Raises:
45+
json.JSONDecodeError: If the provided string is not valid JSON.
46+
"""
3147
client = json.loads(client_as_string)
3248

3349
self.taskHubName = client.get("taskHubName", "")

durabletask-azurefunctions/durabletask/azurefunctions/constants.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,6 @@
22
# Licensed under the MIT License.
33

44
"""Constants used to determine the local running context."""
5-
# TODO: Remove unused constants after module is complete
6-
DEFAULT_LOCAL_HOST: str = 'localhost:7071'
7-
DEFAULT_LOCAL_ORIGIN: str = f'http://{DEFAULT_LOCAL_HOST}'
8-
DATETIME_STRING_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
9-
HTTP_ACTION_NAME = 'BuiltIn::HttpActivity'
105
ORCHESTRATION_TRIGGER = "orchestrationTrigger"
116
ACTIVITY_TRIGGER = "activityTrigger"
127
ENTITY_TRIGGER = "entityTrigger"
Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,2 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
3-
4-
"""Durable Task SDK for Python entities component"""
5-
6-
import durabletask.azurefunctions.decorators.durable_app as durable_app
7-
import durabletask.azurefunctions.decorators.metadata as metadata
8-
9-
__all__ = ["durable_app", "metadata"]
10-
11-
PACKAGE_NAME = "durabletask.azurefunctions.decorators"

durabletask-azurefunctions/durabletask/azurefunctions/decorators/durable_app.py

Lines changed: 3 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,16 @@
11
# Copyright (c) Microsoft Corporation. All rights reserved.
22
# Licensed under the MIT License.
33

4-
import base64
54
from functools import wraps
65

7-
from durabletask.internal.orchestrator_service_pb2 import EntityRequest, EntityBatchRequest, EntityBatchResult, OrchestratorRequest, OrchestratorResponse
86
from .metadata import OrchestrationTrigger, ActivityTrigger, EntityTrigger, \
97
DurableClient
108
from typing import Callable, Optional
119
from typing import Union
1210
from azure.functions import FunctionRegister, TriggerApi, BindingApi, AuthLevel
1311

14-
# TODO: Use __init__.py to optimize imports
1512
from durabletask.azurefunctions.client import DurableFunctionsClient
1613
from durabletask.azurefunctions.worker import DurableFunctionsWorker
17-
from durabletask.azurefunctions.internal.azurefunctions_null_stub import AzureFunctionsNullStub
1814

1915

2016
class Blueprint(TriggerApi, BindingApi):
@@ -61,40 +57,8 @@ def _configure_orchestrator_callable(self, wrap) -> Callable:
6157
def decorator(orchestrator_func):
6258
# Construct an orchestrator based on the end-user code
6359

64-
# TODO: Move this logic somewhere better
6560
def handle(context) -> str:
66-
context_body = getattr(context, "body", None)
67-
if context_body is None:
68-
context_body = context
69-
orchestration_context = context_body
70-
request = OrchestratorRequest()
71-
request.ParseFromString(base64.b64decode(orchestration_context))
72-
stub = AzureFunctionsNullStub()
73-
worker = DurableFunctionsWorker()
74-
response: Optional[OrchestratorResponse] = None
75-
76-
def stub_complete(stub_response):
77-
nonlocal response
78-
response = stub_response
79-
stub.CompleteOrchestratorTask = stub_complete
80-
execution_started_events = []
81-
for e in request.pastEvents:
82-
if e.HasField("executionStarted"):
83-
execution_started_events.append(e)
84-
for e in request.newEvents:
85-
if e.HasField("executionStarted"):
86-
execution_started_events.append(e)
87-
if len(execution_started_events) == 0:
88-
raise Exception("No ExecutionStarted event found in orchestration request.")
89-
90-
function_name = execution_started_events[-1].executionStarted.name
91-
worker.add_named_orchestrator(function_name, orchestrator_func)
92-
worker._execute_orchestrator(request, stub, None)
93-
94-
if response is None:
95-
raise Exception("Orchestrator execution did not produce a response.")
96-
# The Python worker returns the input as type "json", so double-encoding is necessary
97-
return '"' + base64.b64encode(response.SerializeToString()).decode('utf-8') + '"'
61+
return DurableFunctionsWorker()._execute_orchestrator(orchestrator_func, context)
9862

9963
handle.orchestrator_function = orchestrator_func # type: ignore
10064

@@ -121,33 +85,11 @@ def _configure_entity_callable(self, wrap) -> Callable:
12185
def decorator(entity_func):
12286
# Construct an orchestrator based on the end-user code
12387

124-
# TODO: Move this logic somewhere better
12588
# TODO: Because this handle method is the one actually exposed to the Functions SDK decorator,
12689
# the parameter name will always be "context" here, even if the user specified a different name.
12790
# We need to find a way to allow custom context names (like "ctx").
12891
def handle(context) -> str:
129-
context_body = getattr(context, "body", None)
130-
if context_body is None:
131-
context_body = context
132-
orchestration_context = context_body
133-
request = EntityBatchRequest()
134-
request.ParseFromString(base64.b64decode(orchestration_context))
135-
stub = AzureFunctionsNullStub()
136-
worker = DurableFunctionsWorker()
137-
response: Optional[EntityBatchResult] = None
138-
139-
def stub_complete(stub_response: EntityBatchResult):
140-
nonlocal response
141-
response = stub_response
142-
stub.CompleteEntityTask = stub_complete
143-
144-
worker.add_entity(entity_func)
145-
worker._execute_entity_batch(request, stub, None)
146-
147-
if response is None:
148-
raise Exception("Entity execution did not produce a response.")
149-
# The Python worker returns the input as type "json", so double-encoding is necessary
150-
return '"' + base64.b64encode(response.SerializeToString()).decode('utf-8') + '"'
92+
return DurableFunctionsWorker()._execute_entity_batch(entity_func, context)
15193

15294
handle.entity_function = entity_func # type: ignore
15395

@@ -157,8 +99,7 @@ def stub_complete(stub_response: EntityBatchResult):
15799

158100
return decorator
159101

160-
def _add_rich_client(self, fb, parameter_name,
161-
client_constructor):
102+
def _add_rich_client(self, fb, parameter_name, client_constructor):
162103
# Obtain user-code and force type annotation on the client-binding parameter to be `str`.
163104
# This ensures a passing type-check of that specific parameter,
164105
# circumventing a limitation of the worker in type-checking rich DF Client objects.

durabletask-azurefunctions/durabletask/azurefunctions/decorators/metadata.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def get_binding_name() -> str:
2828
def __init__(self,
2929
name: str,
3030
orchestration: Optional[str] = None,
31+
durable_requires_grpc=True,
3132
) -> None:
3233
self.orchestration = orchestration
3334
super().__init__(name=name)
@@ -53,6 +54,7 @@ def get_binding_name() -> str:
5354
def __init__(self,
5455
name: str,
5556
activity: Optional[str] = None,
57+
durable_requires_grpc=True,
5658
) -> None:
5759
self.activity = activity
5860
super().__init__(name=name)
@@ -78,6 +80,7 @@ def get_binding_name() -> str:
7880
def __init__(self,
7981
name: str,
8082
entity_name: Optional[str] = None,
83+
durable_requires_grpc=True,
8184
) -> None:
8285
self.entity_name = entity_name
8386
super().__init__(name=name)
@@ -103,7 +106,8 @@ def get_binding_name() -> str:
103106
def __init__(self,
104107
name: str,
105108
task_hub: Optional[str] = None,
106-
connection_name: Optional[str] = None
109+
connection_name: Optional[str] = None,
110+
durable_requires_grpc=True,
107111
) -> None:
108112
self.task_hub = task_hub
109113
self.connection_name = connection_name

durabletask-azurefunctions/durabletask/azurefunctions/http/http_management_payload.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,20 @@
22

33

44
class HttpManagementPayload:
5+
"""A class representing the HTTP management payload for a Durable Function orchestration instance.
6+
7+
Contains URLs for managing the instance, such as querying status,
8+
sending events, terminating, restarting, etc.
9+
"""
10+
511
def __init__(self, instance_id: str, instance_status_url: str, required_query_string_parameters: str):
12+
"""Initializes the HttpManagementPayload with the necessary URLs.
13+
14+
Args:
15+
instance_id (str): The ID of the Durable Function instance.
16+
instance_status_url (str): The base URL for the instance status.
17+
required_query_string_parameters (str): The required URL parameters provided by the Durable extension.
18+
"""
619
self.urls = {
720
'id': instance_id,
821
'purgeHistoryDeleteUri': instance_status_url + "?" + required_query_string_parameters,

durabletask-azurefunctions/durabletask/azurefunctions/worker.py

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT License.
33

4+
import base64
45
from threading import Event
6+
from typing import Optional
7+
from durabletask.internal.orchestrator_service_pb2 import EntityBatchRequest, EntityBatchResult, OrchestratorRequest, OrchestratorResponse
58
from durabletask.worker import _Registry, ConcurrencyOptions
69
from durabletask.internal import shared
710
from durabletask.worker import TaskHubGrpcWorker
11+
from durabletask.azurefunctions.internal.azurefunctions_null_stub import AzureFunctionsNullStub
812

913

1014
# Worker class used for Durable Task Scheduler (DTS)
1115
class DurableFunctionsWorker(TaskHubGrpcWorker):
12-
"""TODO: Docs
16+
"""A worker that can execute orchestrator and entity functions in the context of Azure Functions.
17+
18+
Used internally by the Durable Functions Python SDK, and should not be visible to functionapps directly.
19+
See TaskHubGrpcWorker for base class documentation.
1320
"""
1421

1522
def __init__(self):
@@ -27,6 +34,60 @@ def __init__(self):
2734
self._interceptors = None
2835

2936
def add_named_orchestrator(self, name: str, func):
30-
"""TODO: Docs
31-
"""
3237
self._registry.add_named_orchestrator(name, func)
38+
39+
def _execute_orchestrator(self, func, context) -> str:
40+
context_body = getattr(context, "body", None)
41+
if context_body is None:
42+
context_body = context
43+
orchestration_context = context_body
44+
request = OrchestratorRequest()
45+
request.ParseFromString(base64.b64decode(orchestration_context))
46+
stub = AzureFunctionsNullStub()
47+
response: Optional[OrchestratorResponse] = None
48+
49+
def stub_complete(stub_response):
50+
nonlocal response
51+
response = stub_response
52+
stub.CompleteOrchestratorTask = stub_complete
53+
execution_started_events = []
54+
for e in request.pastEvents:
55+
if e.HasField("executionStarted"):
56+
execution_started_events.append(e)
57+
for e in request.newEvents:
58+
if e.HasField("executionStarted"):
59+
execution_started_events.append(e)
60+
if len(execution_started_events) == 0:
61+
raise Exception("No ExecutionStarted event found in orchestration request.")
62+
63+
function_name = execution_started_events[-1].executionStarted.name
64+
self.add_named_orchestrator(function_name, func)
65+
super()._execute_orchestrator(request, stub, None)
66+
67+
if response is None:
68+
raise Exception("Orchestrator execution did not produce a response.")
69+
# The Python worker returns the input as type "json", so double-encoding is necessary
70+
return '"' + base64.b64encode(response.SerializeToString()).decode('utf-8') + '"'
71+
72+
def _execute_entity_batch(self, func, context) -> str:
73+
context_body = getattr(context, "body", None)
74+
if context_body is None:
75+
context_body = context
76+
orchestration_context = context_body
77+
request = EntityBatchRequest()
78+
request.ParseFromString(base64.b64decode(orchestration_context))
79+
stub = AzureFunctionsNullStub()
80+
response: Optional[EntityBatchResult] = None
81+
82+
def stub_complete(stub_response: EntityBatchResult):
83+
nonlocal response
84+
response = stub_response
85+
stub.CompleteEntityTask = stub_complete
86+
87+
self.add_entity(func)
88+
super()._execute_entity_batch(request, stub, None)
89+
90+
if response is None:
91+
raise Exception("Entity execution did not produce a response.")
92+
# The Python worker returns the input as type "json", so double-encoding is necessary
93+
return '"' + base64.b64encode(response.SerializeToString()).decode('utf-8') + '"'

durabletask-azurefunctions/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ build-backend = "setuptools.build_meta"
99

1010
[project]
1111
name = "durabletask.azurefunctions"
12-
version = "0.1.0"
12+
version = "0.0.1dev0"
1313
description = "Durable Task Python SDK provider implementation for Durable Azure Functions"
1414
keywords = [
1515
"durable",

durabletask/internal/helpers.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import traceback
55
from datetime import datetime
66
from typing import Optional
7-
import uuid
87

98
from google.protobuf import timestamp_pb2, wrappers_pb2
109

@@ -197,8 +196,11 @@ def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str],
197196
))
198197

199198

200-
def new_call_entity_action(id: int, parent_instance_id: str, entity_id: EntityInstanceId, operation: str, encoded_input: Optional[str]):
201-
request_id = str(uuid.uuid4())
199+
def new_call_entity_action(id: int,
200+
parent_instance_id: str,
201+
entity_id: EntityInstanceId,
202+
operation: str, encoded_input: Optional[str],
203+
request_id: str):
202204
return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationCalled=pb.EntityOperationCalledEvent(
203205
requestId=request_id,
204206
operation=operation,
@@ -210,8 +212,11 @@ def new_call_entity_action(id: int, parent_instance_id: str, entity_id: EntityIn
210212
)))
211213

212214

213-
def new_signal_entity_action(id: int, entity_id: EntityInstanceId, operation: str, encoded_input: Optional[str]):
214-
request_id = str(uuid.uuid4())
215+
def new_signal_entity_action(id: int,
216+
entity_id: EntityInstanceId,
217+
operation: str,
218+
encoded_input: Optional[str],
219+
request_id: str):
215220
return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationSignaled=pb.EntityOperationSignaledEvent(
216221
requestId=request_id,
217222
operation=operation,

0 commit comments

Comments
 (0)