diff --git a/README.md b/README.md index df576494..17e43480 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ pip3 install dapr-ext-fastapi ```sh # Install Dapr client sdk -pip3 install dapr-dev +pip3 install dapr # Install Dapr gRPC AppCallback service extension pip3 install dapr-ext-grpc-dev diff --git a/dapr/aio/clients/grpc/client.py b/dapr/aio/clients/grpc/client.py index 995b8268..1b76dcb0 100644 --- a/dapr/aio/clients/grpc/client.py +++ b/dapr/aio/clients/grpc/client.py @@ -153,7 +153,7 @@ def __init__( max_grpc_message_length (int, optional): The maximum grpc send and receive message length in bytes. """ - DaprHealth.wait_until_ready() + DaprHealth.wait_for_sidecar() self.retry_policy = retry_policy or RetryPolicy() useragent = f'dapr-sdk-python/{__version__}' diff --git a/dapr/aio/clients/grpc/subscription.py b/dapr/aio/clients/grpc/subscription.py index 9aabf8b2..e0e380ca 100644 --- a/dapr/aio/clients/grpc/subscription.py +++ b/dapr/aio/clients/grpc/subscription.py @@ -51,7 +51,7 @@ async def outgoing_request_iterator(): async def reconnect_stream(self): await self.close() - DaprHealth.wait_until_ready() + DaprHealth.wait_for_sidecar() print('Attempting to reconnect...') await self.start() diff --git a/dapr/clients/grpc/client.py b/dapr/clients/grpc/client.py index e4ffb264..6c276dd3 100644 --- a/dapr/clients/grpc/client.py +++ b/dapr/clients/grpc/client.py @@ -145,7 +145,7 @@ def __init__( message length in bytes. retry_policy (RetryPolicy optional): Specifies retry behaviour """ - DaprHealth.wait_until_ready() + DaprHealth.wait_for_sidecar() self.retry_policy = retry_policy or RetryPolicy() useragent = f'dapr-sdk-python/{__version__}' diff --git a/dapr/clients/grpc/subscription.py b/dapr/clients/grpc/subscription.py index 111946b1..6dcfcb4d 100644 --- a/dapr/clients/grpc/subscription.py +++ b/dapr/clients/grpc/subscription.py @@ -65,7 +65,7 @@ def outgoing_request_iterator(): def reconnect_stream(self): self.close() - DaprHealth.wait_until_ready() + DaprHealth.wait_for_sidecar() print('Attempting to reconnect...') self.start() diff --git a/dapr/clients/health.py b/dapr/clients/health.py index e3daec79..37c42a87 100644 --- a/dapr/clients/health.py +++ b/dapr/clients/health.py @@ -15,6 +15,7 @@ import urllib.request import urllib.error import time +from warnings import warn from dapr.clients.http.conf import DAPR_API_TOKEN_HEADER, USER_AGENT_HEADER, DAPR_USER_AGENT from dapr.clients.http.helpers import get_api_url @@ -24,6 +25,15 @@ class DaprHealth: @staticmethod def wait_until_ready(): + warn( + 'This method is deprecated. Use DaprHealth.wait_for_sidecar instead.', + DeprecationWarning, + stacklevel=2, + ) + DaprHealth.wait_for_sidecar() + + @staticmethod + def wait_for_sidecar(): health_url = f'{get_api_url()}/healthz/outbound' headers = {USER_AGENT_HEADER: DAPR_USER_AGENT} if settings.DAPR_API_TOKEN is not None: diff --git a/dapr/clients/http/client.py b/dapr/clients/http/client.py index 86e9ab6f..f6f95aa7 100644 --- a/dapr/clients/http/client.py +++ b/dapr/clients/http/client.py @@ -51,7 +51,7 @@ def __init__( timeout (int, optional): Timeout in seconds, defaults to 60. headers_callback (lambda: Dict[str, str]], optional): Generates header for each request. """ - DaprHealth.wait_until_ready() + DaprHealth.wait_for_sidecar() self._timeout = aiohttp.ClientTimeout(total=timeout) self._serializer = message_serializer diff --git a/dapr/version/version.py b/dapr/version/version.py index 112a2520..8c6c1296 100644 --- a/dapr/version/version.py +++ b/dapr/version/version.py @@ -13,4 +13,4 @@ limitations under the License. """ -__version__ = '1.15.0.dev' +__version__ = '1.16.1rc1' diff --git a/dev-requirements.txt b/dev-requirements.txt index cbd71985..461d9239 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -15,7 +15,7 @@ Flask>=1.1 # needed for auto fix ruff===0.2.2 # needed for dapr-ext-workflow -durabletask-dapr >= 0.2.0a7 +durabletask-dapr >= 0.2.0a9 # needed for .env file loading in examples python-dotenv>=1.0.0 # needed for enhanced schema generation from function features diff --git a/examples/demo_actor/demo_actor/requirements.txt b/examples/demo_actor/demo_actor/requirements.txt index 4c2215b5..9496602e 100644 --- a/examples/demo_actor/demo_actor/requirements.txt +++ b/examples/demo_actor/demo_actor/requirements.txt @@ -1 +1 @@ -dapr-ext-fastapi-dev>=1.15.0.dev +dapr-ext-fastapi>=1.16.1rc1 diff --git a/examples/demo_workflow/demo_workflow/requirements.txt b/examples/demo_workflow/demo_workflow/requirements.txt index 7f7a666d..a70b0269 100644 --- a/examples/demo_workflow/demo_workflow/requirements.txt +++ b/examples/demo_workflow/demo_workflow/requirements.txt @@ -1 +1 @@ -dapr-ext-workflow-dev>=1.15.0.dev \ No newline at end of file +dapr-ext-workflow>=1.16.1rc1 diff --git a/examples/invoke-simple/requirements.txt b/examples/invoke-simple/requirements.txt index ee0ce707..e77f5d6e 100644 --- a/examples/invoke-simple/requirements.txt +++ b/examples/invoke-simple/requirements.txt @@ -1,2 +1,2 @@ -dapr-ext-grpc-dev >= 1.15.0.dev -dapr-dev >= 1.15.0.dev +dapr-ext-grpc >= 1.16.1rc1 +dapr >= 1.16.1rc1 diff --git a/examples/w3c-tracing/requirements.txt b/examples/w3c-tracing/requirements.txt index cd15885b..514e2606 100644 --- a/examples/w3c-tracing/requirements.txt +++ b/examples/w3c-tracing/requirements.txt @@ -1,5 +1,5 @@ -dapr-ext-grpc-dev >= 1.15.0.dev -dapr-dev >= 1.15.0.dev +dapr-ext-grpc >= 1.16.1rc1 +dapr >= 1.16.1rc1 opentelemetry-sdk opentelemetry-instrumentation-grpc opentelemetry-exporter-zipkin diff --git a/examples/workflow/README.md b/examples/workflow/README.md index f5b901d1..2e09eeef 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -20,7 +20,7 @@ pip3 install -r requirements.txt Each of the examples in this directory can be run directly from the command line. ### Simple Workflow -This example represents a workflow that manages counters through a series of activities and child workflows. +This example represents a workflow that manages counters through a series of activities and child workflows. It shows several Dapr Workflow features including: - Basic activity execution with counter increments - Retryable activities with configurable retry policies @@ -57,7 +57,7 @@ timeout_seconds: 30 --> ```sh -dapr run --app-id wf-simple-example --dapr-grpc-port 50001 -- python3 simple.py +dapr run --app-id wf-simple-example -- python3 simple.py ``` @@ -99,7 +99,7 @@ timeout_seconds: 30 --> ```sh -dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 task_chaining.py +dapr run --app-id wfexample -- python3 task_chaining.py ``` @@ -146,7 +146,7 @@ timeout_seconds: 30 --> ```sh -dapr run --app-id wfexample --dapr-grpc-port 50001 -- python3 fan_out_fan_in.py +dapr run --app-id wfexample -- python3 fan_out_fan_in.py ``` @@ -186,7 +186,7 @@ This example demonstrates how to use a workflow to interact with a human user. T The Dapr CLI can be started using the following command: ```sh -dapr run --app-id wfexample --dapr-grpc-port 50001 +dapr run --app-id wfexample ``` In a separate terminal window, run the following command to start the Python workflow app: @@ -222,7 +222,7 @@ This example demonstrates how to eternally running workflow that polls an endpoi The Dapr CLI can be started using the following command: ```sh -dapr run --app-id wfexample --dapr-grpc-port 50001 +dapr run --app-id wfexample ``` In a separate terminal window, run the following command to start the Python workflow app: @@ -254,7 +254,7 @@ This workflow runs forever or until you press `ENTER` to stop it. Starting the a This example demonstrates how to call a child workflow. The Dapr CLI can be started using the following command: ```sh -dapr run --app-id wfexample --dapr-grpc-port 50001 +dapr run --app-id wfexample ``` In a separate terminal window, run the following command to start the Python workflow app: @@ -269,4 +269,129 @@ When you run the example, you will see output like this: *** Calling child workflow 29a7592a1e874b07aad2bb58de309a51-child *** Child workflow 6feadc5370184b4998e50875b20084f6 called ... -``` \ No newline at end of file +``` + + +### Cross-app Workflow + +This example demonstrates how to call child workflows and activities in different apps. The multiple Dapr CLI instances can be started using the following commands: + + + +```sh +dapr run --app-id wfexample3 python3 cross-app3.py & +dapr run --app-id wfexample2 python3 cross-app2.py & +dapr run --app-id wfexample1 python3 cross-app1.py +``` + + +When you run the apps, you will see output like this: +``` +... +app1 - triggering app2 workflow +app2 - triggering app3 activity +... +``` +among others. This shows that the workflow calls are working as expected. + + +#### Error handling on activity calls + +This example demonstrates how the error handling works on activity calls across apps. + +Error handling on activity calls across apps works as normal workflow activity calls. + +In this example we run `app3` in failing mode, which makes the activity call return error constantly. The activity call from `app2` will fail after the retry policy is exhausted. + + + +```sh +export ERROR_ACTIVITY_MODE=true +dapr run --app-id wfexample3 python3 cross-app3.py & +dapr run --app-id wfexample2 python3 cross-app2.py & +dapr run --app-id wfexample1 python3 cross-app1.py +``` + + + +When you run the apps with the `ERROR_ACTIVITY_MODE` environment variable set, you will see output like this: +``` +... +app3 - received activity call +app3 - raising error in activity due to error mode being enabled +app2 - received activity error from app3 +... +``` +among others. This shows that the activity calls are failing as expected, and they are being handled as expected too. + + +#### Error handling on workflow calls + +This example demonstrates how the error handling works on workflow calls across apps. + +Error handling on workflow calls across apps works as normal workflow calls. + +In this example we run `app2` in failing mode, which makes the workflow call return error constantly. The workflow call from `app1` will fail after the retry policy is exhausted. + + + +```sh +export ERROR_WORKFLOW_MODE=true +dapr run --app-id wfexample3 python3 cross-app3.py & +dapr run --app-id wfexample2 python3 cross-app2.py & +dapr run --app-id wfexample1 python3 cross-app1.py +``` + + +When you run the apps with the `ERROR_WORKFLOW_MODE` environment variable set, you will see output like this: +``` +... +app2 - received workflow call +app2 - raising error in workflow due to error mode being enabled +app1 - received workflow error from app2 +... +``` +among others. This shows that the workflow calls are failing as expected, and they are being handled as expected too. + diff --git a/examples/workflow/cross-app1.py b/examples/workflow/cross-app1.py new file mode 100644 index 00000000..f84de662 --- /dev/null +++ b/examples/workflow/cross-app1.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +# Copyright 2025 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import timedelta + +from durabletask.task import TaskFailedError +import dapr.ext.workflow as wf +import time + +wfr = wf.WorkflowRuntime() + + +@wfr.workflow +def app1_workflow(ctx: wf.DaprWorkflowContext): + print(f'app1 - received workflow call', flush=True) + print(f'app1 - triggering app2 workflow', flush=True) + + try: + retry_policy = wf.RetryPolicy( + max_number_of_attempts=2, + first_retry_interval=timedelta(milliseconds=100), + max_retry_interval=timedelta(seconds=3), + ) + yield ctx.call_child_workflow( + workflow='app2_workflow', + input=None, + app_id='wfexample2', + retry_policy=retry_policy, + ) + print(f'app1 - received workflow result', flush=True) + except TaskFailedError as e: + print(f'app1 - received workflow error from app2', flush=True) + + print(f'app1 - returning workflow result', flush=True) + return 1 + + +if __name__ == '__main__': + wfr.start() + time.sleep(10) # wait for workflow runtime to start + + wf_client = wf.DaprWorkflowClient() + print(f'app1 - triggering app1 workflow', flush=True) + instance_id = wf_client.schedule_new_workflow(workflow=app1_workflow) + + # Wait for the workflow to complete + time.sleep(7) + + wfr.shutdown() diff --git a/examples/workflow/cross-app2.py b/examples/workflow/cross-app2.py new file mode 100644 index 00000000..4cb30874 --- /dev/null +++ b/examples/workflow/cross-app2.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# Copyright 2025 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import timedelta +import os + +from durabletask.task import TaskFailedError +import dapr.ext.workflow as wf +import time + +wfr = wf.WorkflowRuntime() + + +@wfr.workflow +def app2_workflow(ctx: wf.DaprWorkflowContext): + print(f'app2 - received workflow call', flush=True) + if os.getenv('ERROR_WORKFLOW_MODE', 'false') == 'true': + print(f'app2 - raising error in workflow due to error mode being enabled', flush=True) + raise ValueError('Error in workflow due to error mode being enabled') + print(f'app2 - triggering app3 activity', flush=True) + try: + retry_policy = wf.RetryPolicy( + max_number_of_attempts=2, + first_retry_interval=timedelta(milliseconds=100), + max_retry_interval=timedelta(seconds=3), + ) + result = yield ctx.call_activity( + 'app3_activity', input=None, app_id='wfexample3', retry_policy=retry_policy + ) + print(f'app2 - received activity result', flush=True) + except TaskFailedError as e: + print(f'app2 - received activity error from app3', flush=True) + + print(f'app2 - returning workflow result', flush=True) + return 2 + + +if __name__ == '__main__': + wfr.start() + time.sleep(15) # wait for workflow runtime to start + wfr.shutdown() diff --git a/examples/workflow/cross-app3.py b/examples/workflow/cross-app3.py new file mode 100644 index 00000000..ecc945ca --- /dev/null +++ b/examples/workflow/cross-app3.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +# Copyright 2025 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import dapr.ext.workflow as wf +import time + +wfr = wf.WorkflowRuntime() + + +@wfr.activity +def app3_activity(ctx: wf.DaprWorkflowContext) -> int: + print(f'app3 - received activity call', flush=True) + if os.getenv('ERROR_ACTIVITY_MODE', 'false') == 'true': + print(f'app3 - raising error in activity due to error mode being enabled', flush=True) + raise ValueError('Error in activity due to error mode being enabled') + print(f'app3 - returning activity result', flush=True) + return 3 + + +if __name__ == '__main__': + wfr.start() + time.sleep(15) # wait for workflow runtime to start + wfr.shutdown() diff --git a/examples/workflow/requirements.txt b/examples/workflow/requirements.txt index e220036d..fab86e72 100644 --- a/examples/workflow/requirements.txt +++ b/examples/workflow/requirements.txt @@ -1,2 +1,2 @@ -dapr-ext-workflow-dev>=1.15.0.dev -dapr-dev>=1.15.0.dev +dapr-ext-workflow>=1.16.1rc1 +dapr>=1.16.1rc1 diff --git a/ext/dapr-ext-fastapi/dapr/ext/fastapi/version.py b/ext/dapr-ext-fastapi/dapr/ext/fastapi/version.py index 112a2520..8c6c1296 100644 --- a/ext/dapr-ext-fastapi/dapr/ext/fastapi/version.py +++ b/ext/dapr-ext-fastapi/dapr/ext/fastapi/version.py @@ -13,4 +13,4 @@ limitations under the License. """ -__version__ = '1.15.0.dev' +__version__ = '1.16.1rc1' diff --git a/ext/dapr-ext-fastapi/setup.cfg b/ext/dapr-ext-fastapi/setup.cfg index 560a795f..8b6080eb 100644 --- a/ext/dapr-ext-fastapi/setup.cfg +++ b/ext/dapr-ext-fastapi/setup.cfg @@ -24,7 +24,7 @@ python_requires = >=3.9 packages = find_namespace: include_package_data = True install_requires = - dapr-dev >= 1.15.0.dev + dapr >= 1.16.1rc1 uvicorn >= 0.11.6 fastapi >= 0.60.1 @@ -32,5 +32,5 @@ install_requires = include = dapr.* -exclude = +exclude = tests diff --git a/ext/dapr-ext-grpc/dapr/ext/grpc/version.py b/ext/dapr-ext-grpc/dapr/ext/grpc/version.py index 112a2520..8c6c1296 100644 --- a/ext/dapr-ext-grpc/dapr/ext/grpc/version.py +++ b/ext/dapr-ext-grpc/dapr/ext/grpc/version.py @@ -13,4 +13,4 @@ limitations under the License. """ -__version__ = '1.15.0.dev' +__version__ = '1.16.1rc1' diff --git a/ext/dapr-ext-grpc/setup.cfg b/ext/dapr-ext-grpc/setup.cfg index caf84a2e..d08757c7 100644 --- a/ext/dapr-ext-grpc/setup.cfg +++ b/ext/dapr-ext-grpc/setup.cfg @@ -24,12 +24,12 @@ python_requires = >=3.9 packages = find_namespace: include_package_data = True install_requires = - dapr-dev >= 1.15.0.dev + dapr >= 1.16.1rc1 cloudevents >= 1.0.0 [options.packages.find] include = dapr.* -exclude = +exclude = tests diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py index 2dee46fe..476ab765 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py @@ -63,11 +63,29 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task: def call_activity( self, - activity: Callable[[WorkflowActivityContext, TInput], TOutput], + activity: Union[Callable[[WorkflowActivityContext, TInput], TOutput], str], *, input: TInput = None, retry_policy: Optional[RetryPolicy] = None, + app_id: Optional[str] = None, ) -> task.Task[TOutput]: + # Handle string activity names for cross-app scenarios + if isinstance(activity, str): + activity_name = activity + if app_id is not None: + self._logger.debug( + f'{self.instance_id}: Creating cross-app activity {activity_name} for app {app_id}' + ) + else: + self._logger.debug(f'{self.instance_id}: Creating activity {activity_name}') + + if retry_policy is None: + return self.__obj.call_activity(activity=activity_name, input=input, app_id=app_id) + return self.__obj.call_activity( + activity=activity_name, input=input, retry_policy=retry_policy.obj, app_id=app_id + ) + + # Handle function activity objects (original behavior) self._logger.debug(f'{self.instance_id}: Creating activity {activity.__name__}') if hasattr(activity, '_dapr_alternate_name'): act = activity.__dict__['_dapr_alternate_name'] @@ -75,17 +93,38 @@ def call_activity( # this case should ideally never happen act = activity.__name__ if retry_policy is None: - return self.__obj.call_activity(activity=act, input=input) - return self.__obj.call_activity(activity=act, input=input, retry_policy=retry_policy.obj) + return self.__obj.call_activity(activity=act, input=input, app_id=app_id) + return self.__obj.call_activity( + activity=act, input=input, retry_policy=retry_policy.obj, app_id=app_id + ) def call_child_workflow( self, - workflow: Workflow, + workflow: Union[Workflow, str], *, input: Optional[TInput] = None, instance_id: Optional[str] = None, retry_policy: Optional[RetryPolicy] = None, + app_id: Optional[str] = None, ) -> task.Task[TOutput]: + # Handle string workflow names for cross-app scenarios + if isinstance(workflow, str): + workflow_name = workflow + self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow_name}') + + if retry_policy is None: + return self.__obj.call_sub_orchestrator( + workflow_name, input=input, instance_id=instance_id, app_id=app_id + ) + return self.__obj.call_sub_orchestrator( + workflow_name, + input=input, + instance_id=instance_id, + retry_policy=retry_policy.obj, + app_id=app_id, + ) + + # Handle function workflow objects (original behavior) self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow.__name__}') def wf(ctx: task.OrchestrationContext, inp: TInput): @@ -100,9 +139,11 @@ def wf(ctx: task.OrchestrationContext, inp: TInput): # this case should ideally never happen wf.__name__ = workflow.__name__ if retry_policy is None: - return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id) + return self.__obj.call_sub_orchestrator( + wf, input=input, instance_id=instance_id, app_id=app_id + ) return self.__obj.call_sub_orchestrator( - wf, input=input, instance_id=instance_id, retry_policy=retry_policy.obj + wf, input=input, instance_id=instance_id, retry_policy=retry_policy.obj, app_id=app_id ) def wait_for_external_event(self, name: str) -> task.Task: diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/version.py b/ext/dapr-ext-workflow/dapr/ext/workflow/version.py index 112a2520..8c6c1296 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/version.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/version.py @@ -13,4 +13,4 @@ limitations under the License. """ -__version__ = '1.15.0.dev' +__version__ = '1.16.1rc1' diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py index b4c85f6a..d6e6ba07 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py @@ -107,18 +107,22 @@ def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task: @abstractmethod def call_activity( - self, activity: Activity[TOutput], *, input: Optional[TInput] = None + self, + activity: Union[Activity[TOutput], str], + *, + input: Optional[TInput] = None, + app_id: Optional[str] = None, ) -> task.Task[TOutput]: """Schedule an activity for execution. Parameters ---------- - activity: Activity[TInput, TOutput] - A reference to the activity function to call. + activity: Activity[TInput, TOutput] | str + A reference to the activity function to call, or a string name for cross-app activities. input: TInput | None The JSON-serializable input (or None) to pass to the activity. - return_type: task.Task[TOutput] - The JSON-serializable output type to expect from the activity result. + app_id: str | None + The AppID that will execute the activity. Returns ------- @@ -130,22 +134,25 @@ def call_activity( @abstractmethod def call_child_workflow( self, - orchestrator: Workflow[TOutput], + orchestrator: Union[Workflow[TOutput], str], *, input: Optional[TInput] = None, instance_id: Optional[str] = None, + app_id: Optional[str] = None, ) -> task.Task[TOutput]: """Schedule child-workflow function for execution. Parameters ---------- - orchestrator: Orchestrator[TInput, TOutput] - A reference to the orchestrator function to call. + orchestrator: Orchestrator[TInput, TOutput] | str + A reference to the orchestrator function to call, or a string name for cross-app workflows. input: TInput The optional JSON-serializable input to pass to the orchestrator function. instance_id: str A unique ID to use for the sub-orchestration instance. If not specified, a random UUID will be used. + app_id: str + The AppID that will execute the workflow. Returns ------- diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index d1f02b35..9f4be622 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -15,7 +15,8 @@ import inspect from functools import wraps -from typing import Optional, TypeVar +from typing import Optional, TypeVar, Union, Sequence +import grpc from durabletask import worker, task @@ -34,6 +35,13 @@ TInput = TypeVar('TInput') TOutput = TypeVar('TOutput') +ClientInterceptor = Union[ + grpc.UnaryUnaryClientInterceptor, + grpc.UnaryStreamClientInterceptor, + grpc.StreamUnaryClientInterceptor, + grpc.StreamStreamClientInterceptor, +] + class WorkflowRuntime: """WorkflowRuntime is the entry point for registering workflows and activities.""" @@ -43,6 +51,10 @@ def __init__( host: Optional[str] = None, port: Optional[str] = None, logger_options: Optional[LoggerOptions] = None, + interceptors: Optional[Sequence[ClientInterceptor]] = None, + maximum_concurrent_activity_work_items: Optional[int] = None, + maximum_concurrent_orchestration_work_items: Optional[int] = None, + maximum_thread_pool_workers: Optional[int] = None, ): self._logger = Logger('WorkflowRuntime', logger_options) metadata = tuple() @@ -62,6 +74,12 @@ def __init__( secure_channel=uri.tls, log_handler=options.log_handler, log_formatter=options.log_formatter, + interceptors=interceptors, + concurrency_options=worker.ConcurrencyOptions( + maximum_concurrent_activity_work_items=maximum_concurrent_activity_work_items, + maximum_concurrent_orchestration_work_items=maximum_concurrent_orchestration_work_items, + maximum_thread_pool_workers=maximum_thread_pool_workers, + ), ) def register_workflow(self, fn: Workflow, *, name: Optional[str] = None): diff --git a/ext/dapr-ext-workflow/setup.cfg b/ext/dapr-ext-workflow/setup.cfg index 3776ec89..83869566 100644 --- a/ext/dapr-ext-workflow/setup.cfg +++ b/ext/dapr-ext-workflow/setup.cfg @@ -24,12 +24,12 @@ python_requires = >=3.9 packages = find_namespace: include_package_data = True install_requires = - dapr-dev >= 1.15.0.dev - durabletask-dapr >= 0.2.0a7 + dapr >= 1.16.1rc1 + durabletask-dapr >= 0.2.0a9 [options.packages.find] include = dapr.* -exclude = +exclude = tests diff --git a/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py b/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py index 9fdfe044..3ae5fdaf 100644 --- a/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py @@ -36,10 +36,10 @@ def __init__(self): def create_timer(self, fire_at): return mock_create_timer - def call_activity(self, activity, input): + def call_activity(self, activity, input, app_id): return mock_call_activity - def call_sub_orchestrator(self, orchestrator, input, instance_id): + def call_sub_orchestrator(self, orchestrator, input, instance_id, app_id): return mock_call_sub_orchestrator def set_custom_status(self, custom_status): diff --git a/ext/flask_dapr/flask_dapr/version.py b/ext/flask_dapr/flask_dapr/version.py index 112a2520..8c6c1296 100644 --- a/ext/flask_dapr/flask_dapr/version.py +++ b/ext/flask_dapr/flask_dapr/version.py @@ -13,4 +13,4 @@ limitations under the License. """ -__version__ = '1.15.0.dev' +__version__ = '1.16.1rc1' diff --git a/ext/flask_dapr/setup.cfg b/ext/flask_dapr/setup.cfg index 64d15941..531a9aea 100644 --- a/ext/flask_dapr/setup.cfg +++ b/ext/flask_dapr/setup.cfg @@ -26,4 +26,4 @@ include_package_data = true zip_safe = false install_requires = Flask >= 1.1 - dapr-dev >= 1.15.0.dev + dapr >= 1.16.1rc1 diff --git a/tests/clients/test_heatlhcheck.py b/tests/clients/test_heatlhcheck.py index f3be8a47..d447e072 100644 --- a/tests/clients/test_heatlhcheck.py +++ b/tests/clients/test_heatlhcheck.py @@ -24,13 +24,13 @@ class DaprHealthCheckTests(unittest.TestCase): @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500') @patch('urllib.request.urlopen') - def test_wait_until_ready_success(self, mock_urlopen): + def test_wait_for_sidecar_success(self, mock_urlopen): mock_urlopen.return_value.__enter__.return_value = MagicMock(status=200) try: - DaprHealth.wait_until_ready() + DaprHealth.wait_for_sidecar() except Exception as e: - self.fail(f'wait_until_ready() raised an exception unexpectedly: {e}') + self.fail(f'wait_for_sidecar() raised an exception unexpectedly: {e}') mock_urlopen.assert_called_once() @@ -45,13 +45,13 @@ def test_wait_until_ready_success(self, mock_urlopen): @patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500') @patch.object(settings, 'DAPR_API_TOKEN', 'mytoken') @patch('urllib.request.urlopen') - def test_wait_until_ready_success_with_api_token(self, mock_urlopen): + def test_wait_for_sidecar_success_with_api_token(self, mock_urlopen): mock_urlopen.return_value.__enter__.return_value = MagicMock(status=200) try: - DaprHealth.wait_until_ready() + DaprHealth.wait_for_sidecar() except Exception as e: - self.fail(f'wait_until_ready() raised an exception unexpectedly: {e}') + self.fail(f'wait_for_sidecar() raised an exception unexpectedly: {e}') mock_urlopen.assert_called_once() @@ -64,13 +64,13 @@ def test_wait_until_ready_success_with_api_token(self, mock_urlopen): @patch.object(settings, 'DAPR_HEALTH_TIMEOUT', '2.5') @patch('urllib.request.urlopen') - def test_wait_until_ready_timeout(self, mock_urlopen): + def test_wait_for_sidecar_timeout(self, mock_urlopen): mock_urlopen.return_value.__enter__.return_value = MagicMock(status=500) start = time.time() with self.assertRaises(TimeoutError): - DaprHealth.wait_until_ready() + DaprHealth.wait_for_sidecar() self.assertGreaterEqual(time.time() - start, 2.5) self.assertGreater(mock_urlopen.call_count, 1)