Skip to content

Commit 86e0cff

Browse files
Cyb3rWard0gsicoyle
andauthored
Support configurable workflow gRPC payload limits for agents and orchestrators (#250)
* feat(grpc): add workflow gRPC options model and helper Signed-off-by: Roberto Rodriguez <[email protected]> * refactor(components): plumb workflow gRPC options through bases Signed-off-by: Roberto Rodriguez <[email protected]> * feat(workflows): honor gRPC limits in durable agents & orchestrators Signed-off-by: Roberto Rodriguez <[email protected]> * test(grpc): cover durabletask channel patching helper Signed-off-by: Roberto Rodriguez <[email protected]> * Basic quickstart to show how to use the new config Signed-off-by: Roberto Rodriguez <[email protected]> * Make lint happy Signed-off-by: Roberto Rodriguez <[email protected]> * refactor: Clarify gRPC options check to explicitly support setting either or both limits Signed-off-by: Roberto Rodriguez <[email protected]> * docs: Add reference to original durabletask get_grpc_channel implementation Signed-off-by: Roberto Rodriguez <[email protected]> * fix: rm extra quickstart used for testing Signed-off-by: Samantha Coyle <[email protected]> --------- Signed-off-by: Roberto Rodriguez <[email protected]> Signed-off-by: Samantha Coyle <[email protected]> Co-authored-by: Samantha Coyle <[email protected]>
1 parent 588c460 commit 86e0cff

File tree

10 files changed

+255
-0
lines changed

10 files changed

+255
-0
lines changed

dapr_agents/agents/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
AgentRegistryConfig,
1313
AgentStateConfig,
1414
AgentExecutionConfig,
15+
WorkflowGrpcOptions,
1516
DEFAULT_AGENT_WORKFLOW_BUNDLE,
1617
)
1718
from dapr_agents.agents.prompting import AgentProfileConfig, PromptingAgentBase
@@ -65,6 +66,7 @@ def __init__(
6566
tools: Optional[Iterable[Any]] = None,
6667
# Metadata
6768
agent_metadata: Optional[Dict[str, Any]] = None,
69+
workflow_grpc: Optional[WorkflowGrpcOptions] = None,
6870
# Execution
6971
execution: Optional[AgentExecutionConfig] = None,
7072
) -> None:
@@ -95,6 +97,7 @@ def __init__(
9597
tools: Optional tool callables or `AgentTool` instances.
9698
9799
agent_metadata: Extra metadata to store in the registry.
100+
workflow_grpc: Optional gRPC overrides for the workflow runtime channel.
98101
"""
99102
# Resolve and validate profile (ensures non-empty name).
100103
resolved_profile = self._build_profile(
@@ -118,6 +121,7 @@ def __init__(
118121
base_metadata=base_metadata,
119122
max_etag_attempts=max_etag_attempts,
120123
default_bundle=DEFAULT_AGENT_WORKFLOW_BUNDLE,
124+
workflow_grpc_options=workflow_grpc,
121125
)
122126

123127
# -----------------------------

dapr_agents/agents/components.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
AgentRegistryConfig,
1515
AgentStateConfig,
1616
DEFAULT_AGENT_WORKFLOW_BUNDLE,
17+
WorkflowGrpcOptions,
1718
StateModelBundle,
1819
)
1920
from dapr_agents.agents.schemas import AgentWorkflowEntry
@@ -44,6 +45,7 @@ def __init__(
4445
registry: Optional[AgentRegistryConfig] = None,
4546
base_metadata: Optional[Dict[str, Any]] = None,
4647
max_etag_attempts: int = 10,
48+
workflow_grpc_options: Optional[WorkflowGrpcOptions] = None,
4749
default_bundle: Optional[StateModelBundle] = None,
4850
) -> None:
4951
"""
@@ -59,6 +61,7 @@ def __init__(
5961
default_bundle: Default state schema bundle (injected by agent/orchestrator class).
6062
"""
6163
self.name = name
64+
self._workflow_grpc_options = workflow_grpc_options
6265

6366
# -----------------------------
6467
# Pub/Sub configuration (copy)
@@ -179,6 +182,11 @@ def workflow_state(self) -> BaseModel:
179182
"""Return the in-memory workflow state model (customizable model)."""
180183
return self._state_model
181184

185+
@property
186+
def workflow_grpc_options(self) -> Optional[WorkflowGrpcOptions]:
187+
"""Return workflow gRPC tuning options if provided."""
188+
return self._workflow_grpc_options
189+
182190
@property
183191
def state(self) -> Dict[str, Any]:
184192
"""Return the workflow state as a JSON-serializable dict."""

dapr_agents/agents/configs.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,32 @@ class StateModelBundle:
5454
)
5555

5656

57+
@dataclass
58+
class WorkflowGrpcOptions:
59+
"""
60+
Optional overrides for Durable Task gRPC channel limits.
61+
62+
Allows agents/orchestrators to lift the default ~4 MB message size
63+
ceiling when sending or receiving large payloads through the workflow
64+
runtime channel.
65+
"""
66+
67+
max_send_message_length: Optional[int] = None
68+
max_receive_message_length: Optional[int] = None
69+
70+
def __post_init__(self) -> None:
71+
if (
72+
self.max_send_message_length is not None
73+
and self.max_send_message_length <= 0
74+
):
75+
raise ValueError("max_send_message_length must be greater than 0")
76+
if (
77+
self.max_receive_message_length is not None
78+
and self.max_receive_message_length <= 0
79+
):
80+
raise ValueError("max_receive_message_length must be greater than 0")
81+
82+
5783
@dataclass
5884
class AgentStateConfig:
5985
"""

dapr_agents/agents/durable.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
AgentRegistryConfig,
1414
AgentStateConfig,
1515
AgentExecutionConfig,
16+
WorkflowGrpcOptions,
1617
)
1718
from dapr_agents.agents.prompting import AgentProfileConfig
1819
from dapr_agents.agents.schemas import (
@@ -31,6 +32,7 @@
3132
)
3233
from dapr_agents.types.workflow import DaprWorkflowStatus
3334
from dapr_agents.workflow.decorators.routers import message_router
35+
from dapr_agents.workflow.utils.grpc import apply_grpc_options
3436
from dapr_agents.workflow.utils.pubsub import broadcast_message, send_message_to_agent
3537

3638
logger = logging.getLogger(__name__)
@@ -71,6 +73,7 @@ def __init__(
7173
execution: Optional[AgentExecutionConfig] = None,
7274
# Misc
7375
agent_metadata: Optional[Dict[str, Any]] = None,
76+
workflow_grpc: Optional[WorkflowGrpcOptions] = None,
7477
runtime: Optional[wf.WorkflowRuntime] = None,
7578
) -> None:
7679
"""
@@ -96,6 +99,7 @@ def __init__(
9699
tools: Optional tool callables or `AgentTool` instances.
97100
98101
agent_metadata: Extra metadata to publish to the registry.
102+
workflow_grpc: Optional gRPC overrides for the workflow runtime channel.
99103
runtime: Optional pre-existing workflow runtime to attach to.
100104
"""
101105
super().__init__(
@@ -112,11 +116,14 @@ def __init__(
112116
registry=registry,
113117
execution=execution,
114118
agent_metadata=agent_metadata,
119+
workflow_grpc=workflow_grpc,
115120
llm=llm,
116121
tools=tools,
117122
prompt_template=prompt_template,
118123
)
119124

125+
apply_grpc_options(self.workflow_grpc_options)
126+
120127
self._runtime: wf.WorkflowRuntime = runtime or wf.WorkflowRuntime()
121128
self._runtime_owned = runtime is None
122129
self._registered = False

dapr_agents/agents/orchestrators/base.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
AgentPubSubConfig,
1414
AgentRegistryConfig,
1515
AgentStateConfig,
16+
WorkflowGrpcOptions,
1617
StateModelBundle,
1718
)
1819
from dapr_agents.agents.utils.text_printer import ColorTextFormatter
20+
from dapr_agents.workflow.utils.grpc import apply_grpc_options
1921

2022
logger = logging.getLogger(__name__)
2123

@@ -40,6 +42,7 @@ def __init__(
4042
registry: Optional[AgentRegistryConfig] = None,
4143
execution: Optional[AgentExecutionConfig] = None,
4244
agent_metadata: Optional[Dict[str, Any]] = None,
45+
workflow_grpc: Optional[WorkflowGrpcOptions] = None,
4346
runtime: Optional[wf.WorkflowRuntime] = None,
4447
workflow_client: Optional[wf.DaprWorkflowClient] = None,
4548
default_bundle: Optional[StateModelBundle] = None,
@@ -54,6 +57,7 @@ def __init__(
5457
registry: Agent registry configuration for discovery.
5558
agent_metadata: Extra metadata to store in the registry; ``orchestrator=True``
5659
is enforced automatically.
60+
workflow_grpc: Optional gRPC overrides for the workflow runtime channel.
5761
runtime: Optional pre-existing workflow runtime to attach to.
5862
workflow_client: Optional DaprWorkflowClient for dependency injection/testing.
5963
default_bundle: Optional state schema bundle (injected by orchestrator subclass).
@@ -63,6 +67,7 @@ def __init__(
6367
pubsub=pubsub,
6468
state=state,
6569
registry=registry,
70+
workflow_grpc_options=workflow_grpc,
6671
default_bundle=default_bundle,
6772
)
6873

@@ -84,6 +89,8 @@ def __init__(
8489
)
8590

8691
# Runtime wiring
92+
apply_grpc_options(self.workflow_grpc_options)
93+
8794
self._runtime: wf.WorkflowRuntime = runtime or wf.WorkflowRuntime()
8895
self._runtime_owned = runtime is None
8996
self._registered = False

dapr_agents/agents/orchestrators/llm/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
AgentRegistryConfig,
1313
AgentStateConfig,
1414
AgentExecutionConfig,
15+
WorkflowGrpcOptions,
1516
)
1617
from dapr_agents.agents.orchestrators.base import OrchestratorBase
1718
from dapr_agents.agents.orchestrators.llm.configs import build_llm_state_bundle
@@ -49,6 +50,7 @@ def __init__(
4950
agent_metadata: Optional[Dict[str, Any]] = None,
5051
memory: Optional[AgentMemoryConfig] = None,
5152
llm: Optional[ChatClientBase] = None,
53+
workflow_grpc: Optional[WorkflowGrpcOptions] = None,
5254
runtime: Optional[wf.WorkflowRuntime] = None,
5355
workflow_client: Optional[wf.DaprWorkflowClient] = None,
5456
) -> None:
@@ -64,6 +66,7 @@ def __init__(
6466
agent_metadata (Optional[Dict[str, Any]]): Metadata to store alongside the registry entry.
6567
memory (Optional[AgentMemoryConfig]): Memory configuration for the orchestrator.
6668
llm (Optional[ChatClientBase]): LLM client instance.
69+
workflow_grpc (Optional[WorkflowGrpcOptions]): gRPC overrides for the workflow runtime channel.
6770
runtime (Optional[wf.WorkflowRuntime]): Workflow runtime configuration.
6871
workflow_client (Optional[wf.DaprWorkflowClient]): Dapr workflow client.
6972
"""
@@ -74,6 +77,7 @@ def __init__(
7477
registry=registry,
7578
execution=execution,
7679
agent_metadata=agent_metadata,
80+
workflow_grpc=workflow_grpc,
7781
runtime=runtime,
7882
workflow_client=workflow_client,
7983
default_bundle=build_llm_state_bundle(),

dapr_agents/agents/orchestrators/random.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
AgentRegistryConfig,
1414
AgentStateConfig,
1515
AgentExecutionConfig,
16+
WorkflowGrpcOptions,
1617
)
1718
from dapr_agents.agents.orchestrators.base import OrchestratorBase
1819
from dapr_agents.agents.schemas import (
@@ -49,6 +50,7 @@ def __init__(
4950
registry: Optional[AgentRegistryConfig] = None,
5051
agent_metadata: Optional[Dict[str, Any]] = None,
5152
execution: Optional[AgentExecutionConfig] = None,
53+
workflow_grpc: Optional[WorkflowGrpcOptions] = None,
5254
timeout_seconds: int = 60,
5355
runtime: Optional[wf.WorkflowRuntime] = None,
5456
) -> None:
@@ -59,6 +61,7 @@ def __init__(
5961
registry=registry,
6062
execution=execution,
6163
agent_metadata=agent_metadata,
64+
workflow_grpc=workflow_grpc,
6265
runtime=runtime,
6366
)
6467
self.timeout = max(1, timeout_seconds)

dapr_agents/agents/orchestrators/roundrobin.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
AgentRegistryConfig,
1313
AgentStateConfig,
1414
AgentExecutionConfig,
15+
WorkflowGrpcOptions,
1516
)
1617
from dapr_agents.agents.orchestrators.base import OrchestratorBase
1718
from dapr_agents.agents.schemas import (
@@ -43,6 +44,7 @@ def __init__(
4344
registry: Optional[AgentRegistryConfig] = None,
4445
execution: Optional[AgentExecutionConfig] = None,
4546
agent_metadata: Optional[Dict[str, Any]] = None,
47+
workflow_grpc: Optional[WorkflowGrpcOptions] = None,
4648
timeout_seconds: int = 60,
4749
runtime: Optional[wf.WorkflowRuntime] = None,
4850
) -> None:
@@ -53,6 +55,7 @@ def __init__(
5355
registry=registry,
5456
execution=execution,
5557
agent_metadata=agent_metadata,
58+
workflow_grpc=workflow_grpc,
5659
runtime=runtime,
5760
)
5861
self.timeout = max(1, timeout_seconds)

dapr_agents/workflow/utils/grpc.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from typing import Optional, Sequence
5+
6+
from dapr_agents.agents.configs import WorkflowGrpcOptions
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
# This is a copy of the original get_grpc_channel function in durabletask.internal.shared at
12+
# https://github.com/dapr/durabletask-python/blob/7070cb07d07978d079f8c099743ee4a66ae70e05/durabletask/internal/shared.py#L30C1-L61C19
13+
# but with my option overrides applied above.
14+
def apply_grpc_options(options: Optional[WorkflowGrpcOptions]) -> None:
15+
"""
16+
Patch Durable Task's gRPC channel factory with custom message size limits.
17+
18+
Durable Task (and therefore Dapr Workflows) creates its gRPC channels via
19+
``durabletask.internal.shared.get_grpc_channel``. This helper monkey patches
20+
that factory so that subsequent runtime/client instances honour the provided
21+
``grpc.max_send_message_length`` / ``grpc.max_receive_message_length`` values.
22+
23+
Users can set either or both options; any non-None value will be applied.
24+
"""
25+
if not options:
26+
return
27+
# Early return if neither option is set
28+
if (
29+
options.max_send_message_length is None
30+
and options.max_receive_message_length is None
31+
):
32+
return
33+
34+
try:
35+
import grpc
36+
from durabletask.internal import shared
37+
except ImportError as exc:
38+
logger.error(
39+
"Failed to import grpc/durabletask for channel configuration: %s", exc
40+
)
41+
raise
42+
43+
grpc_options = []
44+
if options.max_send_message_length:
45+
grpc_options.append(
46+
("grpc.max_send_message_length", options.max_send_message_length)
47+
)
48+
if options.max_receive_message_length:
49+
grpc_options.append(
50+
("grpc.max_receive_message_length", options.max_receive_message_length)
51+
)
52+
53+
def get_grpc_channel_with_options(
54+
host_address: Optional[str],
55+
secure_channel: bool = False,
56+
interceptors: Optional[Sequence["grpc.ClientInterceptor"]] = None,
57+
):
58+
if host_address is None:
59+
host_address = shared.get_default_host_address()
60+
61+
for protocol in getattr(shared, "SECURE_PROTOCOLS", []):
62+
if host_address.lower().startswith(protocol):
63+
secure_channel = True
64+
host_address = host_address[len(protocol) :]
65+
break
66+
67+
for protocol in getattr(shared, "INSECURE_PROTOCOLS", []):
68+
if host_address.lower().startswith(protocol):
69+
secure_channel = False
70+
host_address = host_address[len(protocol) :]
71+
break
72+
73+
if secure_channel:
74+
credentials = grpc.ssl_channel_credentials()
75+
channel = grpc.secure_channel(
76+
host_address, credentials, options=grpc_options
77+
)
78+
else:
79+
channel = grpc.insecure_channel(host_address, options=grpc_options)
80+
81+
if interceptors:
82+
channel = grpc.intercept_channel(channel, *interceptors)
83+
84+
return channel
85+
86+
shared.get_grpc_channel = get_grpc_channel_with_options
87+
logger.debug(
88+
"Applied gRPC options to durabletask channel factory: %s", dict(grpc_options)
89+
)

0 commit comments

Comments
 (0)