diff --git a/.github/workflows/durabletask-azurefunctions-dev.yml b/.github/workflows/durabletask-azurefunctions-dev.yml new file mode 100644 index 0000000..fa7b720 --- /dev/null +++ b/.github/workflows/durabletask-azurefunctions-dev.yml @@ -0,0 +1,52 @@ +name: Durable Task Scheduler SDK (durabletask-azurefunctions) Dev Release + +on: + workflow_run: + workflows: ["Durable Task Scheduler SDK (durabletask-azurefunctions)"] + types: + - completed + branches: + - main + +jobs: + publish-dev: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Extract version from tag + run: echo "VERSION=${GITHUB_REF#refs/tags/azurefunctions-v}" >> $GITHUB_ENV # Extract version from the tag + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.14" # Adjust Python version as needed + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install build twine + + - name: Append dev to version in pyproject.toml + working-directory: durabletask-azurefunctions + run: | + sed -i 's/^version = "\(.*\)"/version = "\1.dev${{ github.run_number }}"/' pyproject.toml + + - name: Build package from directory durabletask-azurefunctions + working-directory: durabletask-azurefunctions + run: | + python -m build + + - name: Check package + working-directory: durabletask-azurefunctions + run: | + twine check dist/* + + - name: Publish package to PyPI + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN_AZUREFUNCTIONS }} # Store your PyPI API token in GitHub Secrets + working-directory: durabletask-azurefunctions + run: | + twine upload dist/* \ No newline at end of file diff --git a/.github/workflows/durabletask-azurefunctions-experimental.yml b/.github/workflows/durabletask-azurefunctions-experimental.yml new file mode 100644 index 0000000..49b8c25 --- /dev/null +++ b/.github/workflows/durabletask-azurefunctions-experimental.yml @@ -0,0 +1,51 @@ +name: Durable Task Scheduler SDK (durabletask-azurefunctions) Experimental Release + +on: + push: + branches-ignore: + - main + - release/* + +jobs: + publish-experimental: + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Extract version from tag + run: echo "VERSION=${GITHUB_REF#refs/tags/azurefunctions-v}" >> $GITHUB_ENV # Extract version from the tag + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.14" # Adjust Python version as needed + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install build twine + + - name: Change the version in pyproject.toml to 0.0.0dev{github.run_number} + working-directory: durabletask-azurefunctions + run: | + sed -i 's/^version = ".*"/version = "0.0.0.dev${{ github.run_number }}"/' pyproject.toml + sed -i 's/"durabletask>=.*"/"durabletask>=0.0.0dev1"/' pyproject.toml + + - name: Build package from directory durabletask-azurefunctions + working-directory: durabletask-azurefunctions + run: | + python -m build + + - name: Check package + working-directory: durabletask-azurefunctions + run: | + twine check dist/* + + - name: Publish package to PyPI + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN_AZUREFUNCTIONS }} # Store your PyPI API token in GitHub Secrets + working-directory: durabletask-azurefunctions + run: | + twine upload dist/* \ No newline at end of file diff --git a/.github/workflows/durabletask-azurefunctions.yml b/.github/workflows/durabletask-azurefunctions.yml new file mode 100644 index 0000000..2fc7454 --- /dev/null +++ b/.github/workflows/durabletask-azurefunctions.yml @@ -0,0 +1,126 @@ +name: Durable Task Scheduler SDK (durabletask-azurefunctions) + +on: + push: + branches: + - "main" + tags: + - "azurefunctions-v*" # Only run for tags starting with "azurefunctions-v" + pull_request: + branches: + - "main" + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.14 + uses: actions/setup-python@v5 + with: + python-version: 3.14 + - name: Install dependencies + working-directory: durabletask-azurefunctions + run: | + python -m pip install --upgrade pip + pip install setuptools wheel tox + pip install flake8 + - name: Run flake8 Linter + working-directory: durabletask-azurefunctions + run: flake8 . + - name: Run flake8 Linter + working-directory: tests/durabletask-azurefunctions + run: flake8 . + + run-docker-tests: + strategy: + fail-fast: false + matrix: + python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"] + env: + EMULATOR_VERSION: "latest" + needs: lint + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Pull Docker image + run: docker pull mcr.microsoft.com/dts/dts-emulator:$EMULATOR_VERSION + + - name: Run Docker container + run: | + docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:$EMULATOR_VERSION + + - name: Wait for container to be ready + run: sleep 10 # Adjust if your service needs more time to start + + - name: Set environment variables + run: | + echo "TASKHUB=default" >> $GITHUB_ENV + echo "ENDPOINT=http://localhost:8080" >> $GITHUB_ENV + + - name: Install durabletask dependencies + run: | + python -m pip install --upgrade pip + pip install flake8 pytest + pip install -r requirements.txt + + - name: Install durabletask-azurefunctions dependencies + working-directory: examples + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Install durabletask-azurefunctions locally + working-directory: durabletask-azurefunctions + run: | + pip install . --no-deps --force-reinstall + + - name: Install durabletask locally + run: | + pip install . --no-deps --force-reinstall + + - name: Run the tests + working-directory: tests/durabletask-azurefunctions + run: | + pytest -m "dts" --verbose + + publish-release: + if: startsWith(github.ref, 'refs/tags/azurefunctions-v') # Only run if a matching tag is pushed + needs: run-docker-tests + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Extract version from tag + run: echo "VERSION=${GITHUB_REF#refs/tags/azurefunctions-v}" >> $GITHUB_ENV # Extract version from the tag + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.14" # Adjust Python version as needed + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install build twine + + - name: Build package from directory durabletask-azurefunctions + working-directory: durabletask-azurefunctions + run: | + python -m build + + - name: Check package + working-directory: durabletask-azurefunctions + run: | + twine check dist/* + + - name: Publish package to PyPI + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_API_TOKEN_AZUREFUNCTIONS }} # Store your PyPI API token in GitHub Secrets + working-directory: durabletask-azurefunctions + run: | + twine upload dist/* \ No newline at end of file diff --git a/durabletask-azurefunctions/CHANGELOG.md b/durabletask-azurefunctions/CHANGELOG.md new file mode 100644 index 0000000..b9be159 --- /dev/null +++ b/durabletask-azurefunctions/CHANGELOG.md @@ -0,0 +1,10 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## v0.1.0 + +- Initial implementation diff --git a/durabletask-azurefunctions/__init__.py b/durabletask-azurefunctions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/__init__.py b/durabletask-azurefunctions/durabletask/azurefunctions/__init__.py new file mode 100644 index 0000000..c768021 --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/__init__.py @@ -0,0 +1,7 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from durabletask.azurefunctions.decorators.durable_app import Blueprint, DFApp +from durabletask.azurefunctions.client import DurableFunctionsClient + +__all__ = ["Blueprint", "DFApp", "DurableFunctionsClient"] diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/client.py b/durabletask-azurefunctions/durabletask/azurefunctions/client.py new file mode 100644 index 0000000..362ef89 --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/client.py @@ -0,0 +1,105 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import json + +from datetime import timedelta +from typing import Any, Optional +import azure.functions as func +from urllib.parse import urlparse, quote + +from durabletask.entities import EntityInstanceId +from durabletask.client import TaskHubGrpcClient +from durabletask.azurefunctions.internal.azurefunctions_grpc_interceptor import AzureFunctionsDefaultClientInterceptorImpl +from durabletask.azurefunctions.http import HttpManagementPayload + + +# Client class used for Durable Functions +class DurableFunctionsClient(TaskHubGrpcClient): + """A gRPC client passed to Durable Functions durable client bindings. + + Connects to the Durable Functions runtime using gRPC and provides methods + for creating and managing Durable orchestrations, interacting with Durable entities, + and creating HTTP management payloads and check status responses for use with Durable Functions invocations. + """ + taskHubName: str + connectionName: str + creationUrls: dict[str, str] + managementUrls: dict[str, str] + baseUrl: str + requiredQueryStringParameters: str + rpcBaseUrl: str + httpBaseUrl: str + maxGrpcMessageSizeInBytes: int + grpcHttpClientTimeout: timedelta + + def __init__(self, client_as_string: str): + """Initializes a DurableFunctionsClient instance from a JSON string. + + This string will be provided by the Durable Functions host extension upon invocation of the client trigger. + + Args: + client_as_string (str): A JSON string containing the Durable Functions client configuration. + + Raises: + json.JSONDecodeError: If the provided string is not valid JSON. + """ + client = json.loads(client_as_string) + + self.taskHubName = client.get("taskHubName", "") + self.connectionName = client.get("connectionName", "") + self.creationUrls = client.get("creationUrls", {}) + self.managementUrls = client.get("managementUrls", {}) + self.baseUrl = client.get("baseUrl", "") + self.requiredQueryStringParameters = client.get("requiredQueryStringParameters", "") + self.rpcBaseUrl = client.get("rpcBaseUrl", "") + self.httpBaseUrl = client.get("httpBaseUrl", "") + self.maxGrpcMessageSizeInBytes = client.get("maxGrpcMessageSizeInBytes", 0) + # TODO: convert the string value back to timedelta - annoying regex? + self.grpcHttpClientTimeout = client.get("grpcHttpClientTimeout", timedelta(seconds=30)) + interceptors = [AzureFunctionsDefaultClientInterceptorImpl(self.taskHubName, self.requiredQueryStringParameters)] + + # We pass in None for the metadata so we don't construct an additional interceptor in the parent class + # Since the parent class doesn't use anything metadata for anything else, we can set it as None + super().__init__( + host_address=self.rpcBaseUrl, + secure_channel=False, + metadata=None, + interceptors=interceptors) + + def create_check_status_response(self, request: func.HttpRequest, instance_id: str) -> func.HttpResponse: + """Creates an HTTP response for checking the status of a Durable Function instance. + + Args: + request (func.HttpRequest): The incoming HTTP request. + instance_id (str): The ID of the Durable Function instance. + """ + location_url = self._get_instance_status_url(request, instance_id) + return func.HttpResponse( + body=str(self._get_client_response_links(request, instance_id)), + status_code=501, + headers={ + 'content-type': 'application/json', + 'Location': location_url, + }, + ) + + def create_http_management_payload(self, request: func.HttpRequest, instance_id: str) -> HttpManagementPayload: + """Creates an HTTP management payload for a Durable Function instance. + + Args: + instance_id (str): The ID of the Durable Function instance. + """ + return self._get_client_response_links(request, instance_id) + + def _get_client_response_links(self, request: func.HttpRequest, instance_id: str) -> HttpManagementPayload: + instance_status_url = self._get_instance_status_url(request, instance_id) + return HttpManagementPayload(instance_id, instance_status_url, self.requiredQueryStringParameters) + + @staticmethod + def _get_instance_status_url(request: func.HttpRequest, instance_id: str) -> str: + request_url = urlparse(request.url) + location_url = f"{request_url.scheme}://{request_url.netloc}" + encoded_instance_id = quote(instance_id) + location_url = location_url + "/runtime/webhooks/durabletask/instances/" + encoded_instance_id + return location_url diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/constants.py b/durabletask-azurefunctions/durabletask/azurefunctions/constants.py new file mode 100644 index 0000000..fbd268a --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/constants.py @@ -0,0 +1,8 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Constants used to determine the local running context.""" +ORCHESTRATION_TRIGGER = "orchestrationTrigger" +ACTIVITY_TRIGGER = "activityTrigger" +ENTITY_TRIGGER = "entityTrigger" +DURABLE_CLIENT = "durableClient" diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/decorators/__init__.py b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/__init__.py new file mode 100644 index 0000000..59e481e --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/decorators/durable_app.py b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/durable_app.py new file mode 100644 index 0000000..15a13e5 --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/durable_app.py @@ -0,0 +1,250 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from functools import wraps + +from .metadata import OrchestrationTrigger, ActivityTrigger, EntityTrigger, \ + DurableClient +from typing import Callable, Optional +from typing import Union +from azure.functions import FunctionRegister, TriggerApi, BindingApi, AuthLevel + +from durabletask.azurefunctions.client import DurableFunctionsClient +from durabletask.azurefunctions.worker import DurableFunctionsWorker + + +class Blueprint(TriggerApi, BindingApi): + """Durable Functions (DF) Blueprint container. + + It allows functions to be declared via trigger and binding decorators, + but does not automatically index/register these functions. + + To register these functions, utilize the `register_functions` method from any + :class:`FunctionRegister` subclass, such as `DFApp`. + """ + + def __init__(self, + http_auth_level: Union[AuthLevel, str] = AuthLevel.FUNCTION): + """Instantiate a Durable Functions app with which to register Functions. + + Parameters + ---------- + http_auth_level: Union[AuthLevel, str] + Authorization level required for Function invocation. + Defaults to AuthLevel.Function. + + Returns + ------- + DFApp + New instance of a Durable Functions app + """ + super().__init__(auth_level=http_auth_level) + + def _configure_orchestrator_callable(self, wrap) -> Callable: + """Obtain decorator to construct an Orchestrator class from a user-defined Function. + + Parameters + ---------- + wrap: Callable + The next decorator to be applied. + + Returns + ------- + Callable + The function to construct an Orchestrator class from the user-defined Function, + wrapped by the next decorator in the sequence. + """ + def decorator(orchestrator_func): + # Construct an orchestrator based on the end-user code + + def handle(context) -> str: + return DurableFunctionsWorker()._execute_orchestrator(orchestrator_func, context) + + handle.orchestrator_function = orchestrator_func # type: ignore + + # invoke next decorator, with the Orchestrator as input + handle.__name__ = orchestrator_func.__name__ + return wrap(handle) + + return decorator + + def _configure_entity_callable(self, wrap) -> Callable: + """Obtain decorator to construct an Entity class from a user-defined Function. + + Parameters + ---------- + wrap: Callable + The next decorator to be applied. + + Returns + ------- + Callable + The function to construct an Entity class from the user-defined Function, + wrapped by the next decorator in the sequence. + """ + def decorator(entity_func): + # Construct an orchestrator based on the end-user code + + # TODO: Because this handle method is the one actually exposed to the Functions SDK decorator, + # the parameter name will always be "context" here, even if the user specified a different name. + # We need to find a way to allow custom context names (like "ctx"). + def handle(context) -> str: + return DurableFunctionsWorker()._execute_entity_batch(entity_func, context) + + handle.entity_function = entity_func # type: ignore + + # invoke next decorator, with the Entity as input + handle.__name__ = entity_func.__name__ + return wrap(handle) + + return decorator + + def _add_rich_client(self, fb, parameter_name, client_constructor): + # Obtain user-code and force type annotation on the client-binding parameter to be `str`. + # This ensures a passing type-check of that specific parameter, + # circumventing a limitation of the worker in type-checking rich DF Client objects. + # TODO: Once rich-binding type checking is possible, remove the annotation change. + user_code = fb._function._func + user_code.__annotations__[parameter_name] = str + + # `wraps` This ensures we re-export the same method-signature as the decorated method + @wraps(user_code) + async def df_client_middleware(*args, **kwargs): + + # Obtain JSON-string currently passed as DF Client, + # construct rich object from it, + # and assign parameter to that rich object + starter = kwargs[parameter_name] + client = client_constructor(starter) + kwargs[parameter_name] = client + + # Invoke user code with rich DF Client binding + return await user_code(*args, **kwargs) + + # TODO: Is there a better way to support retrieving the unwrapped user code? + df_client_middleware.client_function = fb._function._func # type: ignore + + user_code_with_rich_client = df_client_middleware + fb._function._func = user_code_with_rich_client + + def orchestration_trigger(self, context_name: str, + orchestration: Optional[str] = None): + """Register an Orchestrator Function. + + Parameters + ---------- + context_name: str + Parameter name of the DurableOrchestrationContext object. + orchestration: Optional[str] + Name of Orchestrator Function. + The value is None by default, in which case the name of the method is used. + """ + @self._configure_orchestrator_callable + @self._configure_function_builder + def wrap(fb): + + def decorator(): + fb.add_trigger( + trigger=OrchestrationTrigger(name=context_name, + orchestration=orchestration)) + return fb + + return decorator() + + return wrap + + def activity_trigger(self, input_name: str, + activity: Optional[str] = None): + """Register an Activity Function. + + Parameters + ---------- + input_name: str + Parameter name of the Activity input. + activity: Optional[str] + Name of Activity Function. + The value is None by default, in which case the name of the method is used. + """ + @self._configure_function_builder + def wrap(fb): + def decorator(): + fb.add_trigger( + trigger=ActivityTrigger(name=input_name, + activity=activity)) + return fb + + return decorator() + + return wrap + + def entity_trigger(self, context_name: str, + entity_name: Optional[str] = None): + """Register an Entity Function. + + Parameters + ---------- + context_name: str + Parameter name of the Entity input. + entity_name: Optional[str] + Name of Entity Function. + The value is None by default, in which case the name of the method is used. + """ + @self._configure_entity_callable + @self._configure_function_builder + def wrap(fb): + def decorator(): + fb.add_trigger( + trigger=EntityTrigger(name=context_name, + entity_name=entity_name)) + return fb + + return decorator() + + return wrap + + def durable_client_input(self, + client_name: str, + task_hub: Optional[str] = None, + connection_name: Optional[str] = None + ): + """Register a Durable-client Function. + + Parameters + ---------- + client_name: str + Parameter name of durable client. + task_hub: Optional[str] + Used in scenarios where multiple function apps share the same storage account + but need to be isolated from each other. If not specified, the default value + from host.json is used. + This value must match the value used by the target orchestrator functions. + connection_name: Optional[str] + The name of an app setting that contains a storage account connection string. + The storage account represented by this connection string must be the same one + used by the target orchestrator functions. If not specified, the default storage + account connection string for the function app is used. + """ + + @self._configure_function_builder + def wrap(fb): + def decorator(): + self._add_rich_client(fb, client_name, DurableFunctionsClient) + + fb.add_binding( + binding=DurableClient(name=client_name, + task_hub=task_hub, + connection_name=connection_name)) + return fb + + return decorator() + + return wrap + + +class DFApp(Blueprint, FunctionRegister): + """Durable Functions (DF) app. + + Exports the decorators required to declare and index DF Function-types. + """ + + pass diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/decorators/metadata.py b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/metadata.py new file mode 100644 index 0000000..21cd7f4 --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/decorators/metadata.py @@ -0,0 +1,118 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import Optional + +from durabletask.azurefunctions.constants import ORCHESTRATION_TRIGGER, \ + ACTIVITY_TRIGGER, ENTITY_TRIGGER, DURABLE_CLIENT +from azure.functions.decorators.core import Trigger, InputBinding + + +class OrchestrationTrigger(Trigger): + """OrchestrationTrigger. + + Trigger representing an Orchestration Function. + """ + + @staticmethod + def get_binding_name() -> str: + """Get the name of this trigger, as a string. + + Returns + ------- + str + The string representation of this trigger. + """ + return ORCHESTRATION_TRIGGER + + def __init__(self, + name: str, + orchestration: Optional[str] = None, + durable_requires_grpc=True, + ) -> None: + self.orchestration = orchestration + self.durable_requires_grpc = durable_requires_grpc + super().__init__(name=name) + + +class ActivityTrigger(Trigger): + """ActivityTrigger. + + Trigger representing a Durable Functions Activity. + """ + + @staticmethod + def get_binding_name() -> str: + """Get the name of this trigger, as a string. + + Returns + ------- + str + The string representation of this trigger. + """ + return ACTIVITY_TRIGGER + + def __init__(self, + name: str, + activity: Optional[str] = None, + durable_requires_grpc=True, + ) -> None: + self.activity = activity + self.durable_requires_grpc = durable_requires_grpc + super().__init__(name=name) + + +class EntityTrigger(Trigger): + """EntityTrigger. + + Trigger representing an Entity Function. + """ + + @staticmethod + def get_binding_name() -> str: + """Get the name of this trigger, as a string. + + Returns + ------- + str + The string representation of this trigger. + """ + return ENTITY_TRIGGER + + def __init__(self, + name: str, + entity_name: Optional[str] = None, + durable_requires_grpc=True, + ) -> None: + self.entity_name = entity_name + self.durable_requires_grpc = durable_requires_grpc + super().__init__(name=name) + + +class DurableClient(InputBinding): + """DurableClient. + + Binding representing a Durable-client object. + """ + + @staticmethod + def get_binding_name() -> str: + """Get the name of this Binding, as a string. + + Returns + ------- + str + The string representation of this binding. + """ + return DURABLE_CLIENT + + def __init__(self, + name: str, + task_hub: Optional[str] = None, + connection_name: Optional[str] = None, + durable_requires_grpc=True, + ) -> None: + self.task_hub = task_hub + self.connection_name = connection_name + self.durable_requires_grpc = durable_requires_grpc + super().__init__(name=name) diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/http/__init__.py b/durabletask-azurefunctions/durabletask/azurefunctions/http/__init__.py new file mode 100644 index 0000000..fc1cb6b --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/http/__init__.py @@ -0,0 +1,6 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from durabletask.azurefunctions.http.http_management_payload import HttpManagementPayload + +__all__ = ["HttpManagementPayload"] diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/http/http_management_payload.py b/durabletask-azurefunctions/durabletask/azurefunctions/http/http_management_payload.py new file mode 100644 index 0000000..9d470c6 --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/http/http_management_payload.py @@ -0,0 +1,31 @@ +import json + + +class HttpManagementPayload: + """A class representing the HTTP management payload for a Durable Function orchestration instance. + + Contains URLs for managing the instance, such as querying status, + sending events, terminating, restarting, etc. + """ + + def __init__(self, instance_id: str, instance_status_url: str, required_query_string_parameters: str): + """Initializes the HttpManagementPayload with the necessary URLs. + + Args: + instance_id (str): The ID of the Durable Function instance. + instance_status_url (str): The base URL for the instance status. + required_query_string_parameters (str): The required URL parameters provided by the Durable extension. + """ + self.urls = { + 'id': instance_id, + 'purgeHistoryDeleteUri': instance_status_url + "?" + required_query_string_parameters, + 'restartPostUri': instance_status_url + "/restart?" + required_query_string_parameters, + 'sendEventPostUri': instance_status_url + "/raiseEvent/{eventName}?" + required_query_string_parameters, + 'statusQueryGetUri': instance_status_url + "?" + required_query_string_parameters, + 'terminatePostUri': instance_status_url + "/terminate?reason={text}&" + required_query_string_parameters, + 'resumePostUri': instance_status_url + "/resume?reason={text}&" + required_query_string_parameters, + 'suspendPostUri': instance_status_url + "/suspend?reason={text}&" + required_query_string_parameters + } + + def __str__(self): + return json.dumps(self.urls) diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/internal/__init__.py b/durabletask-azurefunctions/durabletask/azurefunctions/internal/__init__.py new file mode 100644 index 0000000..59e481e --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/internal/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_grpc_interceptor.py b/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_grpc_interceptor.py new file mode 100644 index 0000000..8736bf6 --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_grpc_interceptor.py @@ -0,0 +1,27 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from importlib.metadata import version + +from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl + + +class AzureFunctionsDefaultClientInterceptorImpl(DefaultClientInterceptorImpl): + """The class implements a UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, + StreamUnaryClientInterceptor and StreamStreamClientInterceptor from grpc to add an + interceptor to add additional headers to all calls as needed.""" + required_query_string_parameters: str + + def __init__(self, taskhub_name: str, required_query_string_parameters: str): + self.required_query_string_parameters = required_query_string_parameters + try: + # Get the version of the azurefunctions package + sdk_version = version('durabletask-azurefunctions') + except Exception: + # Fallback if version cannot be determined + sdk_version = "unknown" + user_agent = f"durabletask-python/{sdk_version}" + self._metadata = [ + ("taskhub", taskhub_name), + ("x-user-agent", user_agent)] # 'user-agent' is a reserved header in grpc, so we use 'x-user-agent' instead + super().__init__(self._metadata) diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_null_stub.py b/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_null_stub.py new file mode 100644 index 0000000..75a48a0 --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/internal/azurefunctions_null_stub.py @@ -0,0 +1,34 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from durabletask.internal.proto_task_hub_sidecar_service_stub import ProtoTaskHubSidecarServiceStub + + +class AzureFunctionsNullStub(ProtoTaskHubSidecarServiceStub): + """A task hub sidecar stub class that implements all methods as no-ops.""" + Hello = lambda *args, **kwargs: None + StartInstance = lambda *args, **kwargs: None + GetInstance = lambda *args, **kwargs: None + RewindInstance = lambda *args, **kwargs: None + WaitForInstanceStart = lambda *args, **kwargs: None + WaitForInstanceCompletion = lambda *args, **kwargs: None + RaiseEvent = lambda *args, **kwargs: None + TerminateInstance = lambda *args, **kwargs: None + SuspendInstance = lambda *args, **kwargs: None + ResumeInstance = lambda *args, **kwargs: None + QueryInstances = lambda *args, **kwargs: None + PurgeInstances = lambda *args, **kwargs: None + GetWorkItems = lambda *args, **kwargs: None + CompleteActivityTask = lambda *args, **kwargs: None + CompleteOrchestratorTask = lambda *args, **kwargs: None + CompleteEntityTask = lambda *args, **kwargs: None + StreamInstanceHistory = lambda *args, **kwargs: None + CreateTaskHub = lambda *args, **kwargs: None + DeleteTaskHub = lambda *args, **kwargs: None + SignalEntity = lambda *args, **kwargs: None + GetEntity = lambda *args, **kwargs: None + QueryEntities = lambda *args, **kwargs: None + CleanEntityStorage = lambda *args, **kwargs: None + AbandonTaskActivityWorkItem = lambda *args, **kwargs: None + AbandonTaskOrchestratorWorkItem = lambda *args, **kwargs: None + AbandonTaskEntityWorkItem = lambda *args, **kwargs: None diff --git a/durabletask-azurefunctions/durabletask/azurefunctions/worker.py b/durabletask-azurefunctions/durabletask/azurefunctions/worker.py new file mode 100644 index 0000000..540f375 --- /dev/null +++ b/durabletask-azurefunctions/durabletask/azurefunctions/worker.py @@ -0,0 +1,93 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import base64 +from threading import Event +from typing import Optional +from durabletask.internal.orchestrator_service_pb2 import EntityBatchRequest, EntityBatchResult, OrchestratorRequest, OrchestratorResponse +from durabletask.worker import _Registry, ConcurrencyOptions +from durabletask.internal import shared +from durabletask.worker import TaskHubGrpcWorker +from durabletask.azurefunctions.internal.azurefunctions_null_stub import AzureFunctionsNullStub + + +# Worker class used for Durable Task Scheduler (DTS) +class DurableFunctionsWorker(TaskHubGrpcWorker): + """A worker that can execute orchestrator and entity functions in the context of Azure Functions. + + Used internally by the Durable Functions Python SDK, and should not be visible to functionapps directly. + See TaskHubGrpcWorker for base class documentation. + """ + + def __init__(self): + # Don't call the parent constructor - we don't actually want to start an AsyncWorkerLoop + # or recieve work items from anywhere but the method that is creating this worker + self._registry = _Registry() + self._host_address = "" + self._logger = shared.get_logger("worker") + self._shutdown = Event() + self._is_running = False + self._secure_channel = False + + self._concurrency_options = ConcurrencyOptions() + + self._interceptors = None + + def add_named_orchestrator(self, name: str, func): + self._registry.add_named_orchestrator(name, func) + + def _execute_orchestrator(self, func, context) -> str: + context_body = getattr(context, "body", None) + if context_body is None: + context_body = context + orchestration_context = context_body + request = OrchestratorRequest() + request.ParseFromString(base64.b64decode(orchestration_context)) + stub = AzureFunctionsNullStub() + response: Optional[OrchestratorResponse] = None + + def stub_complete(stub_response): + nonlocal response + response = stub_response + stub.CompleteOrchestratorTask = stub_complete + execution_started_events = [] + for e in request.pastEvents: + if e.HasField("executionStarted"): + execution_started_events.append(e) + for e in request.newEvents: + if e.HasField("executionStarted"): + execution_started_events.append(e) + if len(execution_started_events) == 0: + raise Exception("No ExecutionStarted event found in orchestration request.") + + function_name = execution_started_events[-1].executionStarted.name + self.add_named_orchestrator(function_name, func) + super()._execute_orchestrator(request, stub, None) + + if response is None: + raise Exception("Orchestrator execution did not produce a response.") + # The Python worker returns the input as type "json", so double-encoding is necessary + return '"' + base64.b64encode(response.SerializeToString()).decode('utf-8') + '"' + + def _execute_entity_batch(self, func, context) -> str: + context_body = getattr(context, "body", None) + if context_body is None: + context_body = context + orchestration_context = context_body + request = EntityBatchRequest() + request.ParseFromString(base64.b64decode(orchestration_context)) + stub = AzureFunctionsNullStub() + response: Optional[EntityBatchResult] = None + + def stub_complete(stub_response: EntityBatchResult): + nonlocal response + response = stub_response + stub.CompleteEntityTask = stub_complete + + self.add_entity(func) + super()._execute_entity_batch(request, stub, None) + + if response is None: + raise Exception("Entity execution did not produce a response.") + # The Python worker returns the input as type "json", so double-encoding is necessary + return '"' + base64.b64encode(response.SerializeToString()).decode('utf-8') + '"' diff --git a/durabletask-azurefunctions/pyproject.toml b/durabletask-azurefunctions/pyproject.toml new file mode 100644 index 0000000..b1e72e5 --- /dev/null +++ b/durabletask-azurefunctions/pyproject.toml @@ -0,0 +1,43 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +# For more information on pyproject.toml, see https://peps.python.org/pep-0621/ + +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "durabletask.azurefunctions" +version = "0.0.1dev0" +description = "Durable Task Python SDK provider implementation for Durable Azure Functions" +keywords = [ + "durable", + "task", + "workflow", + "azure", + "azure functions" +] +classifiers = [ + "Development Status :: 3 - Alpha", + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", +] +requires-python = ">=3.9" +license = {file = "LICENSE"} +readme = "README.md" +dependencies = [ + "durabletask>=1.2.0dev0", + "azure-identity>=1.19.0", + "azure-functions>=1.11.0" +] + +[project.urls] +repository = "https://github.com/microsoft/durabletask-python" +changelog = "https://github.com/microsoft/durabletask-python/blob/main/CHANGELOG.md" + +[tool.setuptools.packages.find] +include = ["durabletask.azurefunctions", "durabletask.azurefunctions.*"] + +[tool.pytest.ini_options] +minversion = "6.0" diff --git a/durabletask/entities/entity_instance_id.py b/durabletask/entities/entity_instance_id.py index c3b76c1..c270f72 100644 --- a/durabletask/entities/entity_instance_id.py +++ b/durabletask/entities/entity_instance_id.py @@ -20,7 +20,7 @@ def __lt__(self, other): return str(self) < str(other) @staticmethod - def parse(entity_id: str) -> Optional["EntityInstanceId"]: + def parse(entity_id: str) -> "EntityInstanceId": """Parse a string representation of an entity ID into an EntityInstanceId object. Parameters diff --git a/durabletask/internal/helpers.py b/durabletask/internal/helpers.py index ccd8558..612915c 100644 --- a/durabletask/internal/helpers.py +++ b/durabletask/internal/helpers.py @@ -196,9 +196,13 @@ def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str], )) -def new_call_entity_action(id: int, parent_instance_id: str, entity_id: EntityInstanceId, operation: str, encoded_input: Optional[str]): +def new_call_entity_action(id: int, + parent_instance_id: str, + entity_id: EntityInstanceId, + operation: str, encoded_input: Optional[str], + request_id: str): return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationCalled=pb.EntityOperationCalledEvent( - requestId=f"{parent_instance_id}:{id}", + requestId=request_id, operation=operation, scheduledTime=None, input=get_string_value(encoded_input), @@ -208,9 +212,13 @@ def new_call_entity_action(id: int, parent_instance_id: str, entity_id: EntityIn ))) -def new_signal_entity_action(id: int, entity_id: EntityInstanceId, operation: str, encoded_input: Optional[str]): +def new_signal_entity_action(id: int, + entity_id: EntityInstanceId, + operation: str, + encoded_input: Optional[str], + request_id: str): return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationSignaled=pb.EntityOperationSignaledEvent( - requestId=f"{entity_id}:{id}", + requestId=request_id, operation=operation, scheduledTime=None, input=get_string_value(encoded_input), diff --git a/durabletask/internal/proto_task_hub_sidecar_service_stub.py b/durabletask/internal/proto_task_hub_sidecar_service_stub.py new file mode 100644 index 0000000..f91a15c --- /dev/null +++ b/durabletask/internal/proto_task_hub_sidecar_service_stub.py @@ -0,0 +1,34 @@ +from typing import Any, Callable, Protocol + + +class ProtoTaskHubSidecarServiceStub(Protocol): + """A stub class roughly matching the TaskHubSidecarServiceStub generated from the .proto file. + Used by Azure Functions during orchestration and entity executions to inject custom behavior, + as no real sidecar stub is available. + """ + Hello: Callable[..., Any] + StartInstance: Callable[..., Any] + GetInstance: Callable[..., Any] + RewindInstance: Callable[..., Any] + WaitForInstanceStart: Callable[..., Any] + WaitForInstanceCompletion: Callable[..., Any] + RaiseEvent: Callable[..., Any] + TerminateInstance: Callable[..., Any] + SuspendInstance: Callable[..., Any] + ResumeInstance: Callable[..., Any] + QueryInstances: Callable[..., Any] + PurgeInstances: Callable[..., Any] + GetWorkItems: Callable[..., Any] + CompleteActivityTask: Callable[..., Any] + CompleteOrchestratorTask: Callable[..., Any] + CompleteEntityTask: Callable[..., Any] + StreamInstanceHistory: Callable[..., Any] + CreateTaskHub: Callable[..., Any] + DeleteTaskHub: Callable[..., Any] + SignalEntity: Callable[..., Any] + GetEntity: Callable[..., Any] + QueryEntities: Callable[..., Any] + CleanEntityStorage: Callable[..., Any] + AbandonTaskActivityWorkItem: Callable[..., Any] + AbandonTaskOrchestratorWorkItem: Callable[..., Any] + AbandonTaskEntityWorkItem: Callable[..., Any] diff --git a/durabletask/task.py b/durabletask/task.py index 3570838..2f763bc 100644 --- a/durabletask/task.py +++ b/durabletask/task.py @@ -258,6 +258,22 @@ def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None: """ pass + @abstractmethod + def new_uuid(self) -> str: + """Create a new UUID that is safe for replay within an orchestration or operation. + + The default implementation of this method creates a name-based UUID + using the algorithm from RFC 4122 ยง4.3. The name input used to generate + this value is a combination of the orchestration instance ID and an + internally managed sequence number. + + Returns + ------- + str + New UUID that is safe for replay within an orchestration or operation. + """ + pass + @abstractmethod def _exit_critical_section(self) -> None: pass diff --git a/durabletask/worker.py b/durabletask/worker.py index fae345c..cd1f899 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -13,12 +13,14 @@ from types import GeneratorType from enum import Enum from typing import Any, Generator, Optional, Sequence, TypeVar, Union +import uuid from packaging.version import InvalidVersion, parse import grpc from google.protobuf import empty_pb2 from durabletask.internal import helpers +from durabletask.internal.proto_task_hub_sidecar_service_stub import ProtoTaskHubSidecarServiceStub from durabletask.internal.entity_state_shim import StateShim from durabletask.internal.helpers import new_timestamp from durabletask.entities import DurableEntity, EntityLock, EntityInstanceId, EntityContext @@ -33,6 +35,7 @@ TInput = TypeVar("TInput") TOutput = TypeVar("TOutput") +DATETIME_STRING_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' class ConcurrencyOptions: @@ -629,7 +632,7 @@ def stop(self): def _execute_orchestrator( self, req: pb.OrchestratorRequest, - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): try: @@ -736,9 +739,10 @@ def _cancel_activity( def _execute_entity_batch( self, req: Union[pb.EntityBatchRequest, pb.EntityRequest], - stub: stubs.TaskHubSidecarServiceStub, + stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub], completionToken, ): + operation_infos = None if isinstance(req, pb.EntityRequest): req, operation_infos = helpers.convert_to_entity_batch_request(req) @@ -794,7 +798,7 @@ def _execute_entity_batch( stub.CompleteEntityTask(batch_result) except Exception as ex: self._logger.exception( - f"Failed to deliver entity response for '{entity_instance_id}' of orchestration ID '{instance_id}' to sidecar: {ex}" + f"Failed to deliver entity response for orchestration ID '{instance_id}' to sidecar: {ex}" ) # TODO: Reset context @@ -827,10 +831,11 @@ def __init__(self, instance_id: str, registry: _Registry): self._pending_actions: dict[int, pb.OrchestratorAction] = {} self._pending_tasks: dict[int, task.CompletableTask] = {} # Maps entity ID to task ID - self._entity_task_id_map: dict[str, tuple[EntityInstanceId, int]] = {} + self._entity_task_id_map: dict[str, tuple[EntityInstanceId, int, Optional[str]]] = {} # Maps criticalSectionId to task ID self._entity_lock_id_map: dict[str, int] = {} self._sequence_number = 0 + self._new_uuid_counter = 0 self._current_utc_datetime = datetime(1000, 1, 1) self._instance_id = instance_id self._registry = registry @@ -1038,14 +1043,14 @@ def call_activity( def call_entity( self, - entity_id: EntityInstanceId, + entity: EntityInstanceId, operation: str, input: Optional[TInput] = None, ) -> task.Task: id = self.next_sequence_number() self.call_entity_function_helper( - id, entity_id, operation, input=input + id, entity, operation, input=input ) return self._pending_tasks.get(id, task.CompletableTask()) @@ -1053,13 +1058,13 @@ def call_entity( def signal_entity( self, entity_id: EntityInstanceId, - operation: str, + operation_name: str, input: Optional[TInput] = None ) -> None: id = self.next_sequence_number() self.signal_entity_function_helper( - id, entity_id, operation, input + id, entity_id, operation_name, input ) def lock_entities(self, entities: list[EntityInstanceId]) -> task.Task[EntityLock]: @@ -1165,7 +1170,12 @@ def call_entity_function_helper( raise RuntimeError(error_message) encoded_input = shared.to_json(input) if input is not None else None - action = ph.new_call_entity_action(id, self.instance_id, entity_id, operation, encoded_input) + action = ph.new_call_entity_action(id, + self.instance_id, + entity_id, + operation, + encoded_input, + self.new_uuid()) self._pending_actions[id] = action fn_task = task.CompletableTask() @@ -1188,7 +1198,7 @@ def signal_entity_function_helper( encoded_input = shared.to_json(input) if input is not None else None - action = ph.new_signal_entity_action(id, entity_id, operation, encoded_input) + action = ph.new_signal_entity_action(id, entity_id, operation, encoded_input, self.new_uuid()) self._pending_actions[id] = action def lock_entities_function_helper(self, id: int, entities: list[EntityInstanceId]) -> None: @@ -1199,7 +1209,7 @@ def lock_entities_function_helper(self, id: int, entities: list[EntityInstanceId if not transition_valid: raise RuntimeError(error_message) - critical_section_id = f"{self.instance_id}:{id:04x}" + critical_section_id = self.new_uuid() request, target = self._entity_context.emit_acquire_message(critical_section_id, entities) @@ -1251,6 +1261,17 @@ def continue_as_new(self, new_input, *, save_events: bool = False) -> None: self.set_continued_as_new(new_input, save_events) + def new_uuid(self) -> str: + URL_NAMESPACE: str = "9e952958-5e33-4daf-827f-2fa12937b875" + + uuid_name_value = \ + f"{self._instance_id}" \ + f"_{self.current_utc_datetime.strftime(DATETIME_STRING_FORMAT)}" \ + f"_{self._new_uuid_counter}" + self._new_uuid_counter += 1 + namespace_uuid = uuid.uuid5(uuid.NAMESPACE_OID, URL_NAMESPACE) + return str(uuid.uuid5(namespace_uuid, uuid_name_value)) + class ExecutionResults: actions: list[pb.OrchestratorAction] @@ -1590,33 +1611,61 @@ def process_event( else: raise TypeError("Unexpected sub-orchestration task type") elif event.HasField("eventRaised"): - # event names are case-insensitive - event_name = event.eventRaised.name.casefold() - if not ctx.is_replaying: - self._logger.info(f"{ctx.instance_id} Event raised: {event_name}") - task_list = ctx._pending_events.get(event_name, None) - decoded_result: Optional[Any] = None - if task_list: - event_task = task_list.pop(0) + if event.eventRaised.name in ctx._entity_task_id_map: + # This eventRaised represents the result of an entity operation after being translated to the old + # entity protocol by the Durable WebJobs extension + entity_id, task_id, action_type = ctx._entity_task_id_map.get(event.eventRaised.name, (None, None, None)) + if entity_id is None: + raise RuntimeError(f"Could not retrieve entity ID for entity-related eventRaised with ID '{event.eventId}'") + if task_id is None: + raise RuntimeError(f"Could not retrieve task ID for entity-related eventRaised with ID '{event.eventId}'") + entity_task = ctx._pending_tasks.pop(task_id, None) + if not entity_task: + raise RuntimeError(f"Could not retrieve entity task for entity-related eventRaised with ID '{event.eventId}'") + result = None if not ph.is_empty(event.eventRaised.input): - decoded_result = shared.from_json(event.eventRaised.input.value) - event_task.complete(decoded_result) - if not task_list: - del ctx._pending_events[event_name] - ctx.resume() + # TODO: Investigate why the event result is wrapped in a dict with "result" key + result = shared.from_json(event.eventRaised.input.value)["result"] + if action_type == "entityOperationCalled": + ctx._entity_context.recover_lock_after_call(entity_id) + entity_task.complete(result) + ctx.resume() + elif action_type == "entityLockRequested": + ctx._entity_context.complete_acquire(event.eventRaised.name) + entity_task.complete(EntityLock(ctx)) + ctx.resume() + else: + raise RuntimeError(f"Unknown action type '{action_type}' for entity-related eventRaised " + f"with ID '{event.eventId}'") + else: - # buffer the event - event_list = ctx._received_events.get(event_name, None) - if not event_list: - event_list = [] - ctx._received_events[event_name] = event_list - if not ph.is_empty(event.eventRaised.input): - decoded_result = shared.from_json(event.eventRaised.input.value) - event_list.append(decoded_result) + # event names are case-insensitive + event_name = event.eventRaised.name.casefold() if not ctx.is_replaying: - self._logger.info( - f"{ctx.instance_id}: Event '{event_name}' has been buffered as there are no tasks waiting for it." - ) + self._logger.info(f"{ctx.instance_id} Event raised: {event_name}") + task_list = ctx._pending_events.get(event_name, None) + decoded_result: Optional[Any] = None + if task_list: + event_task = task_list.pop(0) + if not ph.is_empty(event.eventRaised.input): + decoded_result = shared.from_json(event.eventRaised.input.value) + event_task.complete(decoded_result) + if not task_list: + del ctx._pending_events[event_name] + ctx.resume() + else: + # buffer the event + event_list = ctx._received_events.get(event_name, None) + if not event_list: + event_list = [] + ctx._received_events[event_name] = event_list + if not ph.is_empty(event.eventRaised.input): + decoded_result = shared.from_json(event.eventRaised.input.value) + event_list.append(decoded_result) + if not ctx.is_replaying: + self._logger.info( + f"{ctx.instance_id}: Event '{event_name}' has been buffered as there are no tasks waiting for it." + ) elif event.HasField("executionSuspended"): if not self._is_suspended and not ctx.is_replaying: self._logger.info(f"{ctx.instance_id}: Execution suspended.") @@ -1659,7 +1708,7 @@ def process_event( entity_id = EntityInstanceId.parse(event.entityOperationCalled.targetInstanceId.value) if not entity_id: raise RuntimeError(f"Could not parse entity ID from targetInstanceId '{event.entityOperationCalled.targetInstanceId.value}'") - ctx._entity_task_id_map[event.entityOperationCalled.requestId] = (entity_id, entity_call_id) + ctx._entity_task_id_map[event.entityOperationCalled.requestId] = (entity_id, entity_call_id, None) elif event.HasField("entityOperationSignaled"): # This history event confirms that the entity signal was successfully scheduled. # Remove the entityOperationSignaled event from the pending action list so we don't schedule it @@ -1720,7 +1769,7 @@ def process_event( ctx.resume() elif event.HasField("entityOperationCompleted"): request_id = event.entityOperationCompleted.requestId - entity_id, task_id = ctx._entity_task_id_map.pop(request_id, (None, None)) + entity_id, task_id, _ = ctx._entity_task_id_map.pop(request_id, (None, None, None)) if not entity_id: raise RuntimeError(f"Could not parse entity ID from request ID '{request_id}'") if not task_id: @@ -1743,6 +1792,25 @@ def process_event( self._logger.info(f"{ctx.instance_id}: Entity operation failed.") self._logger.info(f"Data: {json.dumps(event.entityOperationFailed)}") pass + elif event.HasField("orchestratorCompleted"): + # Added in Functions only (for some reason) and does not affect orchestrator flow + pass + elif event.HasField("eventSent"): + # Check if this eventSent corresponds to an entity operation call after being translated to the old + # entity protocol by the Durable WebJobs extension. If so, treat this message similarly to + # entityOperationCalled and remove the pending action. Also store the entity id and event id for later + action = ctx._pending_actions.pop(event.eventId, None) + if action and action.HasField("sendEntityMessage"): + if action.sendEntityMessage.HasField("entityOperationCalled"): + action_type = "entityOperationCalled" + elif action.sendEntityMessage.HasField("entityLockRequested"): + action_type = "entityLockRequested" + else: + return + + entity_id = EntityInstanceId.parse(event.eventSent.instanceId) + event_id = json.loads(event.eventSent.input.value)["id"] + ctx._entity_task_id_map[event_id] = (entity_id, event.eventId, action_type) else: eventType = event.WhichOneof("eventType") raise task.OrchestrationStateError( diff --git a/examples/entities/function_based_entity.py b/examples/entities/function_based_entity.py index a43b86d..32d9469 100644 --- a/examples/entities/function_based_entity.py +++ b/examples/entities/function_based_entity.py @@ -13,7 +13,7 @@ def counter(ctx: entities.EntityContext, input: int) -> Optional[int]: if ctx.operation == "set": ctx.set_state(input) - if ctx.operation == "add": + elif ctx.operation == "add": current_state = ctx.get_state(int, 0) new_state = current_state + (input or 1) ctx.set_state(new_state) diff --git a/pyproject.toml b/pyproject.toml index 111693c..958981e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ build-backend = "setuptools.build_meta" [project] name = "durabletask" -version = "1.1.0" +version = "1.2.0dev0" description = "A Durable Task Client SDK for Python" keywords = [ "durable",