Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dapr_agents/agents/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
AgentRegistryConfig,
AgentStateConfig,
AgentExecutionConfig,
WorkflowGrpcOptions,
DEFAULT_AGENT_WORKFLOW_BUNDLE,
)
from dapr_agents.agents.prompting import AgentProfileConfig, PromptingAgentBase
Expand Down Expand Up @@ -65,6 +66,7 @@ def __init__(
tools: Optional[Iterable[Any]] = None,
# Metadata
agent_metadata: Optional[Dict[str, Any]] = None,
workflow_grpc: Optional[WorkflowGrpcOptions] = None,
# Execution
execution: Optional[AgentExecutionConfig] = None,
) -> None:
Expand Down Expand Up @@ -95,6 +97,7 @@ def __init__(
tools: Optional tool callables or `AgentTool` instances.

agent_metadata: Extra metadata to store in the registry.
workflow_grpc: Optional gRPC overrides for the workflow runtime channel.
"""
# Resolve and validate profile (ensures non-empty name).
resolved_profile = self._build_profile(
Expand All @@ -118,6 +121,7 @@ def __init__(
base_metadata=base_metadata,
max_etag_attempts=max_etag_attempts,
default_bundle=DEFAULT_AGENT_WORKFLOW_BUNDLE,
workflow_grpc_options=workflow_grpc,
)

# -----------------------------
Expand Down
8 changes: 8 additions & 0 deletions dapr_agents/agents/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
AgentRegistryConfig,
AgentStateConfig,
DEFAULT_AGENT_WORKFLOW_BUNDLE,
WorkflowGrpcOptions,
)
from dapr_agents.agents.schemas import AgentWorkflowEntry
from dapr_agents.storage.daprstores.stateservice import StateStoreError
Expand Down Expand Up @@ -47,6 +48,7 @@ def __init__(
base_metadata: Optional[Dict[str, Any]] = None,
max_etag_attempts: int = 10,
default_bundle: Optional["StateModelBundle"] = None,
workflow_grpc_options: Optional["WorkflowGrpcOptions"] = None,
) -> None:
"""
Initialize component wiring.
Expand All @@ -61,6 +63,7 @@ def __init__(
default_bundle: Default state schema bundle (injected by agent/orchestrator class).
"""
self.name = name
self._workflow_grpc_options = workflow_grpc_options

# -----------------------------
# Pub/Sub configuration (copy)
Expand Down Expand Up @@ -173,6 +176,11 @@ def workflow_state(self) -> BaseModel:
"""Return the in-memory workflow state model (customizable model)."""
return self._state_model

@property
def workflow_grpc_options(self) -> Optional[WorkflowGrpcOptions]:
"""Return workflow gRPC tuning options if provided."""
return self._workflow_grpc_options

@property
def state(self) -> Dict[str, Any]:
"""Return the workflow state as a JSON-serializable dict."""
Expand Down
26 changes: 26 additions & 0 deletions dapr_agents/agents/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,32 @@ class StateModelBundle:
)


@dataclass
class WorkflowGrpcOptions:
"""
Optional overrides for Durable Task gRPC channel limits.

Allows agents/orchestrators to lift the default ~4 MB message size
ceiling when sending or receiving large payloads through the workflow
runtime channel.
"""

max_send_message_length: Optional[int] = None
max_receive_message_length: Optional[int] = None

def __post_init__(self) -> None:
if (
self.max_send_message_length is not None
and self.max_send_message_length <= 0
):
raise ValueError("max_send_message_length must be greater than 0")
if (
self.max_receive_message_length is not None
and self.max_receive_message_length <= 0
):
raise ValueError("max_receive_message_length must be greater than 0")


@dataclass
class AgentStateConfig:
"""
Expand Down
7 changes: 7 additions & 0 deletions dapr_agents/agents/durable.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
AgentRegistryConfig,
AgentStateConfig,
AgentExecutionConfig,
WorkflowGrpcOptions,
)
from dapr_agents.agents.prompting import AgentProfileConfig
from dapr_agents.agents.schemas import (
Expand All @@ -31,6 +32,7 @@
)
from dapr_agents.types.workflow import DaprWorkflowStatus
from dapr_agents.workflow.decorators.routers import message_router
from dapr_agents.workflow.utils.grpc import apply_grpc_options
from dapr_agents.workflow.utils.pubsub import broadcast_message, send_message_to_agent

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -71,6 +73,7 @@ def __init__(
execution: Optional[AgentExecutionConfig] = None,
# Misc
agent_metadata: Optional[Dict[str, Any]] = None,
workflow_grpc: Optional[WorkflowGrpcOptions] = None,
runtime: Optional[wf.WorkflowRuntime] = None,
) -> None:
"""
Expand All @@ -96,6 +99,7 @@ def __init__(
tools: Optional tool callables or `AgentTool` instances.

agent_metadata: Extra metadata to publish to the registry.
workflow_grpc: Optional gRPC overrides for the workflow runtime channel.
runtime: Optional pre-existing workflow runtime to attach to.
"""
super().__init__(
Expand All @@ -112,11 +116,14 @@ def __init__(
registry=registry,
execution=execution,
agent_metadata=agent_metadata,
workflow_grpc=workflow_grpc,
llm=llm,
tools=tools,
prompt_template=prompt_template,
)

apply_grpc_options(self.workflow_grpc_options)

self._runtime: wf.WorkflowRuntime = runtime or wf.WorkflowRuntime()
self._runtime_owned = runtime is None
self._registered = False
Expand Down
7 changes: 7 additions & 0 deletions dapr_agents/agents/orchestrators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
AgentPubSubConfig,
AgentRegistryConfig,
AgentStateConfig,
WorkflowGrpcOptions,
)
from dapr_agents.agents.utils.text_printer import ColorTextFormatter
from dapr_agents.workflow.utils.grpc import apply_grpc_options

if TYPE_CHECKING:
from dapr_agents.agents.configs import StateModelBundle
Expand Down Expand Up @@ -42,6 +44,7 @@ def __init__(
registry: Optional[AgentRegistryConfig] = None,
execution: Optional[AgentExecutionConfig] = None,
agent_metadata: Optional[Dict[str, Any]] = None,
workflow_grpc: Optional[WorkflowGrpcOptions] = None,
runtime: Optional[wf.WorkflowRuntime] = None,
workflow_client: Optional[wf.DaprWorkflowClient] = None,
default_bundle: Optional["StateModelBundle"] = None,
Expand All @@ -56,6 +59,7 @@ def __init__(
registry: Agent registry configuration for discovery.
agent_metadata: Extra metadata to store in the registry; ``orchestrator=True``
is enforced automatically.
workflow_grpc: Optional gRPC overrides for the workflow runtime channel.
runtime: Optional pre-existing workflow runtime to attach to.
workflow_client: Optional DaprWorkflowClient for dependency injection/testing.
default_bundle: Optional state schema bundle (injected by orchestrator subclass).
Expand All @@ -65,6 +69,7 @@ def __init__(
pubsub=pubsub,
state=state,
registry=registry,
workflow_grpc_options=workflow_grpc,
default_bundle=default_bundle,
)

Expand All @@ -86,6 +91,8 @@ def __init__(
)

# Runtime wiring
apply_grpc_options(self.workflow_grpc_options)

self._runtime: wf.WorkflowRuntime = runtime or wf.WorkflowRuntime()
self._runtime_owned = runtime is None
self._registered = False
Expand Down
4 changes: 4 additions & 0 deletions dapr_agents/agents/orchestrators/llm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
AgentRegistryConfig,
AgentStateConfig,
AgentExecutionConfig,
WorkflowGrpcOptions,
)
from dapr_agents.agents.orchestrators.base import OrchestratorBase
from dapr_agents.agents.orchestrators.llm.configs import build_llm_state_bundle
Expand Down Expand Up @@ -49,6 +50,7 @@ def __init__(
agent_metadata: Optional[Dict[str, Any]] = None,
memory: Optional[AgentMemoryConfig] = None,
llm: Optional[ChatClientBase] = None,
workflow_grpc: Optional[WorkflowGrpcOptions] = None,
runtime: Optional[wf.WorkflowRuntime] = None,
workflow_client: Optional[wf.DaprWorkflowClient] = None,
) -> None:
Expand All @@ -64,6 +66,7 @@ def __init__(
agent_metadata (Optional[Dict[str, Any]]): Metadata to store alongside the registry entry.
memory (Optional[AgentMemoryConfig]): Memory configuration for the orchestrator.
llm (Optional[ChatClientBase]): LLM client instance.
workflow_grpc (Optional[WorkflowGrpcOptions]): gRPC overrides for the workflow runtime channel.
runtime (Optional[wf.WorkflowRuntime]): Workflow runtime configuration.
workflow_client (Optional[wf.DaprWorkflowClient]): Dapr workflow client.
"""
Expand All @@ -74,6 +77,7 @@ def __init__(
registry=registry,
execution=execution,
agent_metadata=agent_metadata,
workflow_grpc=workflow_grpc,
runtime=runtime,
workflow_client=workflow_client,
default_bundle=build_llm_state_bundle(),
Expand Down
3 changes: 3 additions & 0 deletions dapr_agents/agents/orchestrators/random.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
AgentRegistryConfig,
AgentStateConfig,
AgentExecutionConfig,
WorkflowGrpcOptions,
)
from dapr_agents.agents.orchestrators.base import OrchestratorBase
from dapr_agents.agents.schemas import (
Expand Down Expand Up @@ -49,6 +50,7 @@ def __init__(
registry: Optional[AgentRegistryConfig] = None,
agent_metadata: Optional[Dict[str, Any]] = None,
execution: Optional[AgentExecutionConfig] = None,
workflow_grpc: Optional[WorkflowGrpcOptions] = None,
timeout_seconds: int = 60,
runtime: Optional[wf.WorkflowRuntime] = None,
) -> None:
Expand All @@ -59,6 +61,7 @@ def __init__(
registry=registry,
execution=execution,
agent_metadata=agent_metadata,
workflow_grpc=workflow_grpc,
runtime=runtime,
)
self.timeout = max(1, timeout_seconds)
Expand Down
3 changes: 3 additions & 0 deletions dapr_agents/agents/orchestrators/roundrobin.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
AgentRegistryConfig,
AgentStateConfig,
AgentExecutionConfig,
WorkflowGrpcOptions,
)
from dapr_agents.agents.orchestrators.base import OrchestratorBase
from dapr_agents.agents.schemas import (
Expand Down Expand Up @@ -43,6 +44,7 @@ def __init__(
registry: Optional[AgentRegistryConfig] = None,
execution: Optional[AgentExecutionConfig] = None,
agent_metadata: Optional[Dict[str, Any]] = None,
workflow_grpc: Optional[WorkflowGrpcOptions] = None,
timeout_seconds: int = 60,
runtime: Optional[wf.WorkflowRuntime] = None,
) -> None:
Expand All @@ -53,6 +55,7 @@ def __init__(
registry=registry,
execution=execution,
agent_metadata=agent_metadata,
workflow_grpc=workflow_grpc,
runtime=runtime,
)
self.timeout = max(1, timeout_seconds)
Expand Down
83 changes: 83 additions & 0 deletions dapr_agents/workflow/utils/grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from __future__ import annotations

import logging
from typing import Optional, Sequence

from dapr_agents.agents.configs import WorkflowGrpcOptions

logger = logging.getLogger(__name__)


def apply_grpc_options(options: Optional[WorkflowGrpcOptions]) -> None:
"""
Patch Durable Task's gRPC channel factory with custom message size limits.

Durable Task (and therefore Dapr Workflows) creates its gRPC channels via
``durabletask.internal.shared.get_grpc_channel``. This helper monkey patches
that factory so that subsequent runtime/client instances honour the provided
``grpc.max_send_message_length`` / ``grpc.max_receive_message_length`` values.
"""
if not options:
return
if not options.max_send_message_length and not options.max_receive_message_length:
return

try:
import grpc
from durabletask.internal import shared
except ImportError as exc:
logger.error(
"Failed to import grpc/durabletask for channel configuration: %s", exc
)
raise

grpc_options = []
if options.max_send_message_length:
grpc_options.append(
("grpc.max_send_message_length", options.max_send_message_length)
)
if options.max_receive_message_length:
grpc_options.append(
("grpc.max_receive_message_length", options.max_receive_message_length)
)

if not grpc_options:
return

def get_grpc_channel_with_options(
host_address: Optional[str],
secure_channel: bool = False,
interceptors: Optional[Sequence["grpc.ClientInterceptor"]] = None,
):
if host_address is None:
host_address = shared.get_default_host_address()

for protocol in getattr(shared, "SECURE_PROTOCOLS", []):
if host_address.lower().startswith(protocol):
secure_channel = True
host_address = host_address[len(protocol) :]
break

for protocol in getattr(shared, "INSECURE_PROTOCOLS", []):
if host_address.lower().startswith(protocol):
secure_channel = False
host_address = host_address[len(protocol) :]
break

if secure_channel:
credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel(
host_address, credentials, options=grpc_options
)
else:
channel = grpc.insecure_channel(host_address, options=grpc_options)

if interceptors:
channel = grpc.intercept_channel(channel, *interceptors)

return channel

shared.get_grpc_channel = get_grpc_channel_with_options
logger.debug(
"Applied gRPC options to durabletask channel factory: %s", dict(grpc_options)
)
41 changes: 41 additions & 0 deletions quickstarts/12-test-durable-agent-grpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# DurableAgent with custom gRPC limits

## Prerequisites

- Python 3.10+
- Dapr CLI & Docker
- OpenAI-compatible API key (for the `llm_activity` decorators)

## Setup

Install dependencies in your virtual environment (see repository root instructions) and ensure Dapr is initialised:

```bash
dapr init
```

Provide your OpenAI key via `.env` or by editing `components/openai.yaml`, identical to the earlier quickstart.

This variant mirrors **10-test-durable-agent** but demonstrates how to raise the
workflow gRPC message size limits (defaults are ~4 MB). The app configures
`WorkflowGrpcOptions` with 32 MB send/receive ceilings before the workflow runtime
starts, which you should see logged when the service boots.

## Run the app

```bash
# Terminal 1 – run the workflow app
dapr run \
--app-id blog-app-agent \
--resources-path ./components \
-- python app.py

# Terminal 2 – publish a message to start the workflow
dapr run \
--app-id blog-app-client \
--resources-path ./components \
-- python message_client.py
```

You should see the workflow start, log the gRPC override, and print the generated
blog post in the app logs.
Loading