-
Notifications
You must be signed in to change notification settings - Fork 70
feat: support durabletask-azurefunctions #309
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| # Copyright (c) Microsoft Corporation. All rights reserved. | ||
| # Licensed under the MIT License. | ||
| import logging | ||
|
|
||
| _logger = logging.getLogger('azure.functions.AsgiMiddleware') | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All of the info logs will be removed in the future, just for debugging purposes now
hallvictoria marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| df = None | ||
|
|
||
|
|
||
| def get_durable_package(): | ||
| """Determines which Durable SDK is being used. | ||
| If the `azure-functions-durable` package is installed, we | ||
| log a warning that this legacy package | ||
| is deprecated. | ||
| If both the legacy and current packages are installed, | ||
| we log a warning and prefer the current package. | ||
| If neither package is installed, we raise an exception. | ||
| """ | ||
| _logger.info("Attempting to import Durable Functions package.") | ||
hallvictoria marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| using_legacy = False | ||
| using_durable_task = False | ||
| global df | ||
| if df: | ||
| _logger.info("Durable Functions package already loaded. DF: %s", df) | ||
| return df | ||
|
|
||
| try: | ||
| import azure.durable_functions as durable_functions | ||
| using_legacy = True | ||
| _logger.warning("`azure-functions-durable` is deprecated. " \ | ||
| "Please migrate to the new `durabletask-azurefunctions` package. " \ | ||
| "See <AKA.MS LINK HERE> for more details.") | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Log a warning if
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This message is probably too aggressive before we post the Azure deprecation notice, would suggest for now an info message informing about the new package only. |
||
| except ImportError: | ||
| _logger.info("`azure-functions-durable` package not found.") | ||
hallvictoria marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| pass | ||
| try: | ||
| import durabletask.azurefunctions as durable_functions | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need to check for both? If durabletask is added, lets directly use it and just add a log that durable task is being used.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We still need to maintain backwards compatibility with |
||
| using_durable_task = True | ||
| except ImportError: | ||
| _logger.info("`durabletask-azurefunctions` package not found.") | ||
| pass | ||
|
|
||
| if using_durable_task and using_legacy: | ||
| # Both packages are installed; prefer `durabletask-azurefunctions`. | ||
| _logger.warning("Both `azure-functions-durable` and " \ | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the case that the customer has both durable packages, log a warning & prioritize
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There will be more unexpected behavior from the Durable extension if both modules are present, this warning suggests that we will safely default to the new module but that might not always be the case. Recommend changing this message to. |
||
| "`durabletask-azurefunctions` packages are installed. " \ | ||
| "The `durabletask-azurefunctions` package will be used.") | ||
|
|
||
| if not using_durable_task and not using_legacy: | ||
| error_message = \ | ||
| "Attempted to use a Durable Functions decorator, " \ | ||
| "but the `durabletask-azurefunctions` SDK package could not be " \ | ||
| "found. Please install `durabletask-azurefunctions` to use " \ | ||
| "Durable Functions." | ||
| raise Exception(error_message) | ||
|
|
||
| df = durable_functions | ||
|
|
||
| return durable_functions | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| DaprBindingTrigger, DaprInvokeOutput, DaprPublishOutput, \ | ||
| DaprSecretInput, DaprServiceInvocationTrigger, DaprStateInput, \ | ||
| DaprStateOutput, DaprTopicTrigger | ||
| from azure.functions.decorators.durable_functions import get_durable_package | ||
| from azure.functions.decorators.eventgrid import EventGridTrigger, \ | ||
| EventGridOutput | ||
| from azure.functions.decorators.eventhub import EventHubTrigger, EventHubOutput | ||
|
|
@@ -57,6 +58,7 @@ | |
| from azure.functions.decorators.mysql import MySqlInput, MySqlOutput, \ | ||
| MySqlTrigger | ||
|
|
||
| _logger = logging.getLogger('azure.functions.AsgiMiddleware') | ||
|
|
||
| class Function(object): | ||
| """ | ||
|
|
@@ -347,17 +349,11 @@ def _get_durable_blueprint(self): | |
| """Attempt to import the Durable Functions SDK from which DF | ||
| decorators are implemented. | ||
| """ | ||
| try: | ||
| import azure.durable_functions as df | ||
| df_bp = df.Blueprint() | ||
| return df_bp | ||
| except ImportError: | ||
| error_message = \ | ||
| "Attempted to use a Durable Functions decorator, " \ | ||
| "but the `azure-functions-durable` SDK package could not be " \ | ||
| "found. Please install `azure-functions-durable` to use " \ | ||
| "Durable Functions." | ||
| raise Exception(error_message) | ||
| _logger.info("Getting Durable Functions blueprint.") | ||
| df = get_durable_package() | ||
| df_bp = df.Blueprint() | ||
|
||
| return df_bp | ||
|
|
||
|
|
||
|
||
| @property | ||
| def app_script_file(self) -> str: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,13 +5,17 @@ | |
| import json | ||
|
|
||
| from azure.functions import _durable_functions | ||
| from azure.functions.decorators.durable_functions import get_durable_package | ||
| from . import meta | ||
|
|
||
| import logging | ||
| _logger = logging.getLogger('azure.functions.AsgiMiddleware') | ||
|
|
||
| # Durable Function Orchestration Trigger | ||
| class OrchestrationTriggerConverter(meta.InConverter, | ||
| # ---------------- Legacy Durable Functions Converters ---------------- # | ||
| # Legacy Durable Function Orchestration Trigger | ||
| class LegacyOrchestrationTriggerConverter(meta.InConverter, | ||
| meta.OutConverter, | ||
| binding='orchestrationTrigger', | ||
| binding=None, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previous converters are renamed to "Legacy" with no changes. |
||
| trigger=True): | ||
| @classmethod | ||
| def check_input_type_annotation(cls, pytype): | ||
|
|
@@ -39,9 +43,196 @@ def has_implicit_output(cls) -> bool: | |
| return True | ||
|
|
||
|
|
||
| # Legacy Durable Function Entity Trigger | ||
| class LegacyEnitityTriggerConverter(meta.InConverter, | ||
| meta.OutConverter, | ||
| binding=None, | ||
| trigger=True): | ||
| @classmethod | ||
| def check_input_type_annotation(cls, pytype): | ||
| return issubclass(pytype, _durable_functions.EntityContext) | ||
|
|
||
| @classmethod | ||
| def check_output_type_annotation(cls, pytype): | ||
| # Implicit output should accept any return type | ||
| return True | ||
|
|
||
| @classmethod | ||
| def decode(cls, | ||
| data: meta.Datum, *, | ||
| trigger_metadata) -> _durable_functions.EntityContext: | ||
| return _durable_functions.EntityContext(data.value) | ||
|
|
||
| @classmethod | ||
| def encode(cls, obj: typing.Any, *, | ||
| expected_type: typing.Optional[type]) -> meta.Datum: | ||
| # Durable function context should be a json | ||
| return meta.Datum(type='json', value=obj) | ||
|
|
||
| @classmethod | ||
| def has_implicit_output(cls) -> bool: | ||
| return True | ||
|
|
||
|
|
||
| # Legacy Durable Function Activity Trigger | ||
| class LegacyActivityTriggerConverter(meta.InConverter, | ||
| meta.OutConverter, | ||
| binding=None, | ||
| trigger=True): | ||
| @classmethod | ||
| def check_input_type_annotation(cls, pytype): | ||
| # Activity Trigger's arguments should accept any types | ||
| return True | ||
|
|
||
| @classmethod | ||
| def check_output_type_annotation(cls, pytype): | ||
| # The activity trigger should accept any JSON serializable types | ||
| return True | ||
|
|
||
| @classmethod | ||
| def decode(cls, | ||
| data: meta.Datum, *, | ||
| trigger_metadata) -> typing.Any: | ||
| data_type = data.type | ||
|
|
||
| # Durable functions extension always returns a string of json | ||
| # See durable functions library's call_activity_task docs | ||
| if data_type in ['string', 'json']: | ||
| try: | ||
| callback = _durable_functions._deserialize_custom_object | ||
| result = json.loads(data.value, object_hook=callback) | ||
| except json.JSONDecodeError: | ||
| # String failover if the content is not json serializable | ||
| result = data.value | ||
| except Exception as e: | ||
| raise ValueError( | ||
| 'activity trigger input must be a string or a ' | ||
| f'valid json serializable ({data.value})') from e | ||
| else: | ||
| raise NotImplementedError( | ||
| f'unsupported activity trigger payload type: {data_type}') | ||
|
|
||
| return result | ||
|
|
||
| @classmethod | ||
| def encode(cls, obj: typing.Any, *, | ||
| expected_type: typing.Optional[type]) -> meta.Datum: | ||
| try: | ||
| callback = _durable_functions._serialize_custom_object | ||
| result = json.dumps(obj, default=callback) | ||
| except TypeError as e: | ||
| raise ValueError( | ||
| f'activity trigger output must be json serializable ({obj})') from e | ||
|
|
||
| return meta.Datum(type='json', value=result) | ||
|
|
||
| @classmethod | ||
| def has_implicit_output(cls) -> bool: | ||
| return True | ||
|
|
||
|
|
||
| # Legacy Durable Functions Durable Client Bindings | ||
| class LegacyDurableClientConverter(meta.InConverter, | ||
| meta.OutConverter, | ||
| binding=None): | ||
| @classmethod | ||
| def has_implicit_output(cls) -> bool: | ||
| return False | ||
|
|
||
| @classmethod | ||
| def has_trigger_support(cls) -> bool: | ||
| return False | ||
|
|
||
| @classmethod | ||
| def check_input_type_annotation(cls, pytype: type) -> bool: | ||
| return issubclass(pytype, (str, bytes)) | ||
|
|
||
| @classmethod | ||
| def check_output_type_annotation(cls, pytype: type) -> bool: | ||
| return issubclass(pytype, (str, bytes, bytearray)) | ||
|
|
||
| @classmethod | ||
| def encode(cls, obj: typing.Any, *, | ||
| expected_type: typing.Optional[type]) -> meta.Datum: | ||
| if isinstance(obj, str): | ||
| return meta.Datum(type='string', value=obj) | ||
|
|
||
| elif isinstance(obj, (bytes, bytearray)): | ||
| return meta.Datum(type='bytes', value=bytes(obj)) | ||
| elif obj is None: | ||
| return meta.Datum(type=None, value=obj) | ||
| elif isinstance(obj, dict): | ||
| return meta.Datum(type='dict', value=obj) | ||
| elif isinstance(obj, list): | ||
| return meta.Datum(type='list', value=obj) | ||
| elif isinstance(obj, bool): | ||
| return meta.Datum(type='bool', value=obj) | ||
| elif isinstance(obj, int): | ||
| return meta.Datum(type='int', value=obj) | ||
| elif isinstance(obj, float): | ||
| return meta.Datum(type='double', value=obj) | ||
| else: | ||
| raise NotImplementedError | ||
|
|
||
| @classmethod | ||
| def decode(cls, data: meta.Datum, *, trigger_metadata) -> typing.Any: | ||
| if data is None: | ||
| return None | ||
| data_type = data.type | ||
|
|
||
| if data_type == 'string': | ||
| result = data.value | ||
| elif data_type == 'bytes': | ||
| result = data.value | ||
| elif data_type == 'json': | ||
| result = data.value | ||
| elif data_type is None: | ||
| result = None | ||
| else: | ||
| raise ValueError( | ||
| 'unexpected type of data received for the "generic" binding ', | ||
| repr(data_type) | ||
| ) | ||
|
|
||
| return result | ||
|
|
||
|
|
||
| # ---------------- Durable Task Durable Functions Converters ---------------- # | ||
| # Durable Function Orchestration Trigger | ||
| class OrchestrationTriggerConverter(meta.InConverter, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently, converters for durable task package are named |
||
| meta.OutConverter, | ||
| binding=None, | ||
| trigger=True): | ||
| @classmethod | ||
| def check_input_type_annotation(cls, pytype): | ||
| return issubclass(pytype, _durable_functions.OrchestrationContext) | ||
|
|
||
| @classmethod | ||
| def check_output_type_annotation(cls, pytype): | ||
| # Implicit output should accept any return type | ||
| return True | ||
|
|
||
| @classmethod | ||
| def decode(cls, | ||
| data: meta.Datum, *, | ||
| trigger_metadata) -> _durable_functions.OrchestrationContext: | ||
| return _durable_functions.OrchestrationContext(data.value) | ||
|
|
||
| @classmethod | ||
| def encode(cls, obj: typing.Any, *, | ||
| expected_type: typing.Optional[type]) -> meta.Datum: | ||
| # Durable function context should be a string | ||
| return meta.Datum(type='string', value=obj) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. New converters for durable task are currently identical to legacy converters except for this change based on microsoft/durabletask-python#75 (comment). Will update these converters as needed to be compatible with durable task |
||
|
|
||
| @classmethod | ||
| def has_implicit_output(cls) -> bool: | ||
| return True | ||
|
|
||
|
|
||
| # Durable Function Entity Trigger | ||
| class EnitityTriggerConverter(meta.InConverter, | ||
| meta.OutConverter, | ||
| binding='entityTrigger', | ||
| binding=None, | ||
| trigger=True): | ||
| @classmethod | ||
| def check_input_type_annotation(cls, pytype): | ||
|
|
@@ -72,7 +263,7 @@ def has_implicit_output(cls) -> bool: | |
| # Durable Function Activity Trigger | ||
| class ActivityTriggerConverter(meta.InConverter, | ||
| meta.OutConverter, | ||
| binding='activityTrigger', | ||
| binding=None, | ||
| trigger=True): | ||
| @classmethod | ||
| def check_input_type_annotation(cls, pytype): | ||
|
|
@@ -129,7 +320,7 @@ def has_implicit_output(cls) -> bool: | |
| # Durable Functions Durable Client Bindings | ||
| class DurableClientConverter(meta.InConverter, | ||
| meta.OutConverter, | ||
| binding='durableClient'): | ||
| binding=None): | ||
| @classmethod | ||
| def has_implicit_output(cls) -> bool: | ||
| return False | ||
|
|
@@ -190,3 +381,34 @@ def decode(cls, data: meta.Datum, *, trigger_metadata) -> typing.Any: | |
| ) | ||
|
|
||
| return result | ||
|
|
||
|
|
||
| def register_durable_converters(): | ||
| _logger.info("Registering Durable Functions converters based on ") | ||
| pkg = get_durable_package() | ||
| if pkg is None: | ||
| # Durable library not installed → do nothing | ||
| return | ||
|
|
||
| _logger.info("Durable Functions package loaded: %s", pkg.__name__) | ||
| _logger.info("Current bindings before registration: %s", meta._ConverterMeta._bindings) | ||
| # Clear existing bindings if they exist | ||
| meta._ConverterMeta._bindings.pop("orchestrationTrigger", None) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should never happen, but just in case - if there are bindings already registered for durable, remove them. Then we add the appropriate converter (Legacy vs. new) based on the durable package brought by the customer |
||
| meta._ConverterMeta._bindings.pop("entityTrigger", None) | ||
| meta._ConverterMeta._bindings.pop("activityTrigger", None) | ||
| meta._ConverterMeta._bindings.pop("durableClient", None) | ||
|
|
||
| if pkg.__name__ == "azure.durable_functions": | ||
| _logger.info("Registering Legacy Durable Functions converters.") | ||
| meta._ConverterMeta._bindings["orchestrationTrigger"] = LegacyOrchestrationTriggerConverter | ||
| meta._ConverterMeta._bindings["entityTrigger"] = LegacyEnitityTriggerConverter | ||
| meta._ConverterMeta._bindings["activityTrigger"] = LegacyActivityTriggerConverter | ||
| meta._ConverterMeta._bindings["durableClient"] = LegacyDurableClientConverter | ||
| else: | ||
| _logger.info("Registering Durable Task Durable Functions converters.") | ||
| meta._ConverterMeta._bindings["orchestrationTrigger"] = OrchestrationTriggerConverter | ||
| meta._ConverterMeta._bindings["entityTrigger"] = EnitityTriggerConverter | ||
| meta._ConverterMeta._bindings["activityTrigger"] = ActivityTriggerConverter | ||
| meta._ConverterMeta._bindings["durableClient"] = DurableClientConverter | ||
| _logger.info("Durable Functions converters registered.") | ||
| _logger.info("Current bindings after registration: %s", meta._ConverterMeta._bindings) | ||
Uh oh!
There was an error while loading. Please reload this page.