Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion dapr_agents/document/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from typing import TYPE_CHECKING

from .embedder import NVIDIAEmbedder, OpenAIEmbedder, SentenceTransformerEmbedder
from .fetcher import ArxivFetcher
from .reader import PyMuPDFReader, PyPDFReader
from .splitter import TextSplitter

if TYPE_CHECKING:
from .splitter import TextSplitter

__all__ = [
"ArxivFetcher",
Expand All @@ -12,3 +16,12 @@
"SentenceTransformerEmbedder",
"NVIDIAEmbedder",
]


def __getattr__(name: str):
"""Lazy import for optional dependencies."""
if name == "TextSplitter":
from .splitter import TextSplitter

return TextSplitter
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
71 changes: 32 additions & 39 deletions dapr_agents/llm/utils/request.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from typing import Dict, Any, Optional, List, Type, Union, Iterable, Literal
from dapr_agents.prompt.prompty import Prompty, PromptyHelper
from dapr_agents.types.message import BaseMessage
from dapr_agents.llm.utils.structure import StructureHandler
from dapr_agents.tool.utils.tool import ToolHelper
import logging
from typing import Any, Dict, Iterable, List, Literal, Optional, Type, Union

from pydantic import BaseModel, ValidationError
from dapr_agents.tool.base import AgentTool

import logging
from dapr_agents.llm.utils.structure import StructureHandler
from dapr_agents.prompt.prompty import Prompty, PromptyHelper
from dapr_agents.tool.base import AgentTool
from dapr_agents.tool.utils.tool import ToolHelper
from dapr_agents.types.message import BaseMessage

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -100,46 +101,38 @@ def process_params(
Prepare request parameters for the language model.

Args:
params: Parameters for the request.
llm_provider: The LLM provider to use (e.g., 'openai').
tools: List of tools to include in the request.
response_format: Either a Pydantic model (for function calling)
or a JSON Schema definition/dict (for raw JSON structured output).
structured_mode: The mode of structured output: 'json' or 'function_call'.
Defaults to 'json'.
params: Raw request params (messages/inputs, model, etc.).
llm_provider: Provider key, e.g. "openai", "dapr".
tools: Tools to expose to the model (AgentTool or already-shaped dicts).
response_format:
- If structured_mode == "json": a JSON Schema dict or a Pydantic model
(we'll convert) to request raw JSON output.
- If structured_mode == "function_call": a Pydantic model describing
the function/tool signature for model-side function calling.
structured_mode: "json" for raw JSON structured output,
"function_call" for tool/function calling.

Returns:
Dict[str, Any]: Prepared request parameters.
A params dict ready for the target provider.
"""

# Tools
if tools:
logger.info("Tools are available in the request.")
# Convert AgentTool objects to dict format for the provider
tool_dicts = []
for tool in tools:
if isinstance(tool, AgentTool):
tool_dicts.append(
ToolHelper.format_tool(tool, tool_format=llm_provider)
)
else:
tool_dicts.append(
ToolHelper.format_tool(tool, tool_format=llm_provider)
)
params["tools"] = tool_dicts
params["tools"] = [
ToolHelper.format_tool(t, tool_format=llm_provider) for t in tools
]

# Structured output
if response_format:
logger.info(f"Structured Mode Activated! Mode={structured_mode}.")
# Add system message for JSON formatting
# This is necessary for the response formatting of the data to work correctly when a user has a function call response format.
inputs = params.get("inputs", [])
inputs.insert(
0,
{
"role": "system",
"content": "You must format your response as a valid JSON object matching the provided schema. Do not include any explanatory text or markdown formatting.",
},
)
params["inputs"] = inputs
logger.info(f"Structured Mode Activated! mode={structured_mode}")

# If we're on Dapr, we cannot rely on OpenAI-style `response_format`.
# Add a small system nudge to enforce JSON-only output so we can parse reliably.
if llm_provider == "dapr":
params = StructureHandler.ensure_json_only_system_prompt(params)

# Generate provider-specific request params
params = StructureHandler.generate_request(
response_format=response_format,
llm_provider=llm_provider,
Expand Down
27 changes: 27 additions & 0 deletions dapr_agents/llm/utils/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,3 +627,30 @@ def validate_against_signature(result: Any, expected_type: Any) -> Any:
return adapter.validate_python(result)
except ValidationError as e:
raise TypeError(f"Validation failed for type {expected_type}: {e}")

@staticmethod
def ensure_json_only_system_prompt(params: Dict[str, Any]) -> Dict[str, Any]:
"""
Dapr's chat client (today) does NOT forward OpenAI-style `response_format`
(e.g., {"type":"json_schema", ...}). That means the model won't be hard-constrained
to your schema. As a fallback, we prepend a system message that instructs the
model to return strict JSON so downstream parsing doesn't break.

Note:
- Dapr uses "inputs" (not "messages") for the message array.
- If "inputs" isn't present (future providers), we fall back to "messages".
"""
collection_key = "inputs" if "inputs" in params else "messages"
msgs = list(params.get(collection_key, []))
msgs.insert(
0,
{
"role": "system",
"content": (
"Return ONLY a valid JSON object that matches the provided schema. "
"No markdown, no code fences, no explanations—JSON object only."
),
},
)
params[collection_key] = msgs
return params
10 changes: 9 additions & 1 deletion dapr_agents/workflow/decorators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
from .core import task, workflow
from .fastapi import route
from .messaging import message_router
from .activities import llm_activity, agent_activity

__all__ = ["workflow", "task", "route", "message_router"]
__all__ = [
"workflow",
"task",
"route",
"message_router",
"llm_activity",
"agent_activity",
]
193 changes: 193 additions & 0 deletions dapr_agents/workflow/decorators/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
from __future__ import annotations

import asyncio
import functools
import inspect
import logging
from typing import Any, Callable, Literal, Optional

from dapr.ext.workflow import WorkflowActivityContext # type: ignore

from dapr_agents.agents.base import AgentBase
from dapr_agents.llm.chat import ChatClientBase
from dapr_agents.workflow.utils.activities import (
build_llm_params,
convert_result,
extract_ctx_and_payload,
format_agent_input,
format_prompt,
normalize_input,
strip_context_parameter,
validate_result,
)

logger = logging.getLogger(__name__)


def llm_activity(
*,
prompt: str,
llm: ChatClientBase,
structured_mode: Literal["json", "function_call"] = "json",
**task_kwargs: Any,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Delegate an activity's implementation to an LLM.

The decorated function's body is not executed directly. Instead:
1) Build a prompt from the activity's signature + `prompt`
2) Call the provided LLM client
3) Validate the result against the activity's return annotation

Args:
prompt: Prompt template (e.g., "Summarize {text} in 3 bullets.")
llm: Chat client capable of `generate(**params)`.
structured_mode: Provider structured output mode ("json" or "function_call").
**task_kwargs: Reserved for future routing/provider knobs.

Returns:
A wrapper suitable to register as a Dapr activity.

Raises:
ValueError: If `prompt` is empty or `llm` is missing.
"""
if not prompt:
raise ValueError("@llm_activity requires a prompt template.")
if llm is None:
raise ValueError("@llm_activity requires an explicit `llm` client instance.")

def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
if not callable(func):
raise ValueError("@llm_activity must decorate a callable activity.")

original_sig = inspect.signature(func)
activity_sig = strip_context_parameter(original_sig)
effective_structured_mode = task_kwargs.get("structured_mode", structured_mode)

async def _execute(ctx: WorkflowActivityContext, payload: Any = None) -> Any:
"""Run the LLM pipeline inside the worker."""
normalized = (
normalize_input(activity_sig, payload) if payload is not None else {}
)

formatted_prompt = format_prompt(activity_sig, prompt, normalized)
params = build_llm_params(
activity_sig, formatted_prompt, effective_structured_mode
)

raw = llm.generate(**params)
if inspect.isawaitable(raw):
raw = await raw

converted = convert_result(raw)
validated = await validate_result(converted, activity_sig)
return validated

@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
"""Sync activity wrapper: execute async pipeline to completion."""
ctx, payload = extract_ctx_and_payload(args, dict(kwargs))
result = _execute(ctx, payload) # coroutine

# If we're in a thread with an active loop, run thread-safely
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None

if loop and loop.is_running():
fut = asyncio.run_coroutine_threadsafe(result, loop)
return fut.result()

# Otherwise create and run a fresh loop
return asyncio.run(result)

# Useful metadata for debugging/inspection
wrapper._is_llm_activity = True # noqa: SLF001
wrapper._llm_activity_config = { # noqa: SLF001
"prompt": prompt,
"structured_mode": effective_structured_mode,
"task_kwargs": task_kwargs,
}
wrapper._original_activity = func # noqa: SLF001
return wrapper

return decorator


def agent_activity(
*,
agent: AgentBase,
prompt: Optional[str] = None,
**task_kwargs: Any,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Route an activity through an `AgentBase`.

The agent receives either a formatted `prompt` or a natural-language
rendering of the payload. The result is validated against the activity's return
annotation.

Args:
agent: Agent to run the activity through.
prompt: Optional prompt template for the agent.
**task_kwargs: Reserved for future routing/provider knobs.

Returns:
A wrapper suitable to register as a Dapr activity.

Raises:
ValueError: If `agent` is missing.
"""
if agent is None:
raise ValueError("@agent_activity requires an AgentBase instance.")

def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
if not callable(func):
raise ValueError("@agent_activity must decorate a callable activity.")

original_sig = inspect.signature(func)
activity_sig = strip_context_parameter(original_sig)
prompt_template = prompt or ""

async def _execute(ctx: WorkflowActivityContext, payload: Any = None) -> Any:
normalized = (
normalize_input(activity_sig, payload) if payload is not None else {}
)

if prompt_template:
formatted_prompt = format_prompt(
activity_sig, prompt_template, normalized
)
else:
formatted_prompt = format_agent_input(payload, normalized)

raw = await agent.run(formatted_prompt)
converted = convert_result(raw)
validated = await validate_result(converted, activity_sig)
return validated

@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
"""Sync activity wrapper: execute async pipeline to completion."""
ctx, payload = extract_ctx_and_payload(args, dict(kwargs))
result = _execute(ctx, payload) # coroutine

try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None

if loop and loop.is_running():
fut = asyncio.run_coroutine_threadsafe(result, loop)
return fut.result()

return asyncio.run(result)

wrapper._is_agent_activity = True # noqa: SLF001
wrapper._agent_activity_config = { # noqa: SLF001
"prompt": prompt,
"task_kwargs": task_kwargs,
}
wrapper._original_activity = func # noqa: SLF001
return wrapper

return decorator
Loading