From f959f902c89b73cb1b08101e2702f725aee68f85 Mon Sep 17 00:00:00 2001 From: Albert Callarisa Date: Wed, 17 Sep 2025 13:05:07 +0200 Subject: [PATCH] Adds support for interceptors and concurrency_options arguments in the workflow engine Signed-off-by: Albert Callarisa --- .../dapr/ext/workflow/workflow_runtime.py | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index d1f02b35..9f4be622 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -15,7 +15,8 @@ import inspect from functools import wraps -from typing import Optional, TypeVar +from typing import Optional, TypeVar, Union, Sequence +import grpc from durabletask import worker, task @@ -34,6 +35,13 @@ TInput = TypeVar('TInput') TOutput = TypeVar('TOutput') +ClientInterceptor = Union[ + grpc.UnaryUnaryClientInterceptor, + grpc.UnaryStreamClientInterceptor, + grpc.StreamUnaryClientInterceptor, + grpc.StreamStreamClientInterceptor, +] + class WorkflowRuntime: """WorkflowRuntime is the entry point for registering workflows and activities.""" @@ -43,6 +51,10 @@ def __init__( host: Optional[str] = None, port: Optional[str] = None, logger_options: Optional[LoggerOptions] = None, + interceptors: Optional[Sequence[ClientInterceptor]] = None, + maximum_concurrent_activity_work_items: Optional[int] = None, + maximum_concurrent_orchestration_work_items: Optional[int] = None, + maximum_thread_pool_workers: Optional[int] = None, ): self._logger = Logger('WorkflowRuntime', logger_options) metadata = tuple() @@ -62,6 +74,12 @@ def __init__( secure_channel=uri.tls, log_handler=options.log_handler, log_formatter=options.log_formatter, + interceptors=interceptors, + concurrency_options=worker.ConcurrencyOptions( + maximum_concurrent_activity_work_items=maximum_concurrent_activity_work_items, + maximum_concurrent_orchestration_work_items=maximum_concurrent_orchestration_work_items, + maximum_thread_pool_workers=maximum_thread_pool_workers, + ), ) def register_workflow(self, fn: Workflow, *, name: Optional[str] = None):