diff --git a/dapr_agents/document/__init__.py b/dapr_agents/document/__init__.py index fb4b45ce..c86ec2c7 100644 --- a/dapr_agents/document/__init__.py +++ b/dapr_agents/document/__init__.py @@ -1,7 +1,11 @@ +from typing import TYPE_CHECKING + from .embedder import NVIDIAEmbedder, OpenAIEmbedder, SentenceTransformerEmbedder from .fetcher import ArxivFetcher from .reader import PyMuPDFReader, PyPDFReader -from .splitter import TextSplitter + +if TYPE_CHECKING: + from .splitter import TextSplitter __all__ = [ "ArxivFetcher", @@ -12,3 +16,12 @@ "SentenceTransformerEmbedder", "NVIDIAEmbedder", ] + + +def __getattr__(name: str): + """Lazy import for optional dependencies.""" + if name == "TextSplitter": + from .splitter import TextSplitter + + return TextSplitter + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/dapr_agents/llm/utils/request.py b/dapr_agents/llm/utils/request.py index 9cf7ce0f..b719a86a 100644 --- a/dapr_agents/llm/utils/request.py +++ b/dapr_agents/llm/utils/request.py @@ -1,12 +1,13 @@ -from typing import Dict, Any, Optional, List, Type, Union, Iterable, Literal -from dapr_agents.prompt.prompty import Prompty, PromptyHelper -from dapr_agents.types.message import BaseMessage -from dapr_agents.llm.utils.structure import StructureHandler -from dapr_agents.tool.utils.tool import ToolHelper +import logging +from typing import Any, Dict, Iterable, List, Literal, Optional, Type, Union + from pydantic import BaseModel, ValidationError -from dapr_agents.tool.base import AgentTool -import logging +from dapr_agents.llm.utils.structure import StructureHandler +from dapr_agents.prompt.prompty import Prompty, PromptyHelper +from dapr_agents.tool.base import AgentTool +from dapr_agents.tool.utils.tool import ToolHelper +from dapr_agents.types.message import BaseMessage logger = logging.getLogger(__name__) @@ -100,46 +101,38 @@ def process_params( Prepare request parameters for the language model. Args: - params: Parameters for the request. - llm_provider: The LLM provider to use (e.g., 'openai'). - tools: List of tools to include in the request. - response_format: Either a Pydantic model (for function calling) - or a JSON Schema definition/dict (for raw JSON structured output). - structured_mode: The mode of structured output: 'json' or 'function_call'. - Defaults to 'json'. + params: Raw request params (messages/inputs, model, etc.). + llm_provider: Provider key, e.g. "openai", "dapr". + tools: Tools to expose to the model (AgentTool or already-shaped dicts). + response_format: + - If structured_mode == "json": a JSON Schema dict or a Pydantic model + (we'll convert) to request raw JSON output. + - If structured_mode == "function_call": a Pydantic model describing + the function/tool signature for model-side function calling. + structured_mode: "json" for raw JSON structured output, + "function_call" for tool/function calling. Returns: - Dict[str, Any]: Prepared request parameters. + A params dict ready for the target provider. """ + + # Tools if tools: logger.info("Tools are available in the request.") - # Convert AgentTool objects to dict format for the provider - tool_dicts = [] - for tool in tools: - if isinstance(tool, AgentTool): - tool_dicts.append( - ToolHelper.format_tool(tool, tool_format=llm_provider) - ) - else: - tool_dicts.append( - ToolHelper.format_tool(tool, tool_format=llm_provider) - ) - params["tools"] = tool_dicts + params["tools"] = [ + ToolHelper.format_tool(t, tool_format=llm_provider) for t in tools + ] + # Structured output if response_format: - logger.info(f"Structured Mode Activated! Mode={structured_mode}.") - # Add system message for JSON formatting - # This is necessary for the response formatting of the data to work correctly when a user has a function call response format. - inputs = params.get("inputs", []) - inputs.insert( - 0, - { - "role": "system", - "content": "You must format your response as a valid JSON object matching the provided schema. Do not include any explanatory text or markdown formatting.", - }, - ) - params["inputs"] = inputs + logger.info(f"Structured Mode Activated! mode={structured_mode}") + + # If we're on Dapr, we cannot rely on OpenAI-style `response_format`. + # Add a small system nudge to enforce JSON-only output so we can parse reliably. + if llm_provider == "dapr": + params = StructureHandler.ensure_json_only_system_prompt(params) + # Generate provider-specific request params params = StructureHandler.generate_request( response_format=response_format, llm_provider=llm_provider, diff --git a/dapr_agents/llm/utils/structure.py b/dapr_agents/llm/utils/structure.py index de0cb21e..f08199e2 100644 --- a/dapr_agents/llm/utils/structure.py +++ b/dapr_agents/llm/utils/structure.py @@ -627,3 +627,30 @@ def validate_against_signature(result: Any, expected_type: Any) -> Any: return adapter.validate_python(result) except ValidationError as e: raise TypeError(f"Validation failed for type {expected_type}: {e}") + + @staticmethod + def ensure_json_only_system_prompt(params: Dict[str, Any]) -> Dict[str, Any]: + """ + Dapr's chat client (today) does NOT forward OpenAI-style `response_format` + (e.g., {"type":"json_schema", ...}). That means the model won't be hard-constrained + to your schema. As a fallback, we prepend a system message that instructs the + model to return strict JSON so downstream parsing doesn't break. + + Note: + - Dapr uses "inputs" (not "messages") for the message array. + - If "inputs" isn't present (future providers), we fall back to "messages". + """ + collection_key = "inputs" if "inputs" in params else "messages" + msgs = list(params.get(collection_key, [])) + msgs.insert( + 0, + { + "role": "system", + "content": ( + "Return ONLY a valid JSON object that matches the provided schema. " + "No markdown, no code fences, no explanations—JSON object only." + ), + }, + ) + params[collection_key] = msgs + return params diff --git a/dapr_agents/workflow/decorators/__init__.py b/dapr_agents/workflow/decorators/__init__.py index 1a5a7943..5809f6c3 100644 --- a/dapr_agents/workflow/decorators/__init__.py +++ b/dapr_agents/workflow/decorators/__init__.py @@ -1,5 +1,13 @@ from .core import task, workflow from .fastapi import route from .messaging import message_router +from .activities import llm_activity, agent_activity -__all__ = ["workflow", "task", "route", "message_router"] +__all__ = [ + "workflow", + "task", + "route", + "message_router", + "llm_activity", + "agent_activity", +] diff --git a/dapr_agents/workflow/decorators/activities.py b/dapr_agents/workflow/decorators/activities.py new file mode 100644 index 00000000..a40b4338 --- /dev/null +++ b/dapr_agents/workflow/decorators/activities.py @@ -0,0 +1,193 @@ +from __future__ import annotations + +import asyncio +import functools +import inspect +import logging +from typing import Any, Callable, Literal, Optional + +from dapr.ext.workflow import WorkflowActivityContext # type: ignore + +from dapr_agents.agents.base import AgentBase +from dapr_agents.llm.chat import ChatClientBase +from dapr_agents.workflow.utils.activities import ( + build_llm_params, + convert_result, + extract_ctx_and_payload, + format_agent_input, + format_prompt, + normalize_input, + strip_context_parameter, + validate_result, +) + +logger = logging.getLogger(__name__) + + +def llm_activity( + *, + prompt: str, + llm: ChatClientBase, + structured_mode: Literal["json", "function_call"] = "json", + **task_kwargs: Any, +) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + """Delegate an activity's implementation to an LLM. + + The decorated function's body is not executed directly. Instead: + 1) Build a prompt from the activity's signature + `prompt` + 2) Call the provided LLM client + 3) Validate the result against the activity's return annotation + + Args: + prompt: Prompt template (e.g., "Summarize {text} in 3 bullets.") + llm: Chat client capable of `generate(**params)`. + structured_mode: Provider structured output mode ("json" or "function_call"). + **task_kwargs: Reserved for future routing/provider knobs. + + Returns: + A wrapper suitable to register as a Dapr activity. + + Raises: + ValueError: If `prompt` is empty or `llm` is missing. + """ + if not prompt: + raise ValueError("@llm_activity requires a prompt template.") + if llm is None: + raise ValueError("@llm_activity requires an explicit `llm` client instance.") + + def decorator(func: Callable[..., Any]) -> Callable[..., Any]: + if not callable(func): + raise ValueError("@llm_activity must decorate a callable activity.") + + original_sig = inspect.signature(func) + activity_sig = strip_context_parameter(original_sig) + effective_structured_mode = task_kwargs.get("structured_mode", structured_mode) + + async def _execute(ctx: WorkflowActivityContext, payload: Any = None) -> Any: + """Run the LLM pipeline inside the worker.""" + normalized = ( + normalize_input(activity_sig, payload) if payload is not None else {} + ) + + formatted_prompt = format_prompt(activity_sig, prompt, normalized) + params = build_llm_params( + activity_sig, formatted_prompt, effective_structured_mode + ) + + raw = llm.generate(**params) + if inspect.isawaitable(raw): + raw = await raw + + converted = convert_result(raw) + validated = await validate_result(converted, activity_sig) + return validated + + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + """Sync activity wrapper: execute async pipeline to completion.""" + ctx, payload = extract_ctx_and_payload(args, dict(kwargs)) + result = _execute(ctx, payload) # coroutine + + # If we're in a thread with an active loop, run thread-safely + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + + if loop and loop.is_running(): + fut = asyncio.run_coroutine_threadsafe(result, loop) + return fut.result() + + # Otherwise create and run a fresh loop + return asyncio.run(result) + + # Useful metadata for debugging/inspection + wrapper._is_llm_activity = True # noqa: SLF001 + wrapper._llm_activity_config = { # noqa: SLF001 + "prompt": prompt, + "structured_mode": effective_structured_mode, + "task_kwargs": task_kwargs, + } + wrapper._original_activity = func # noqa: SLF001 + return wrapper + + return decorator + + +def agent_activity( + *, + agent: AgentBase, + prompt: Optional[str] = None, + **task_kwargs: Any, +) -> Callable[[Callable[..., Any]], Callable[..., Any]]: + """Route an activity through an `AgentBase`. + + The agent receives either a formatted `prompt` or a natural-language + rendering of the payload. The result is validated against the activity's return + annotation. + + Args: + agent: Agent to run the activity through. + prompt: Optional prompt template for the agent. + **task_kwargs: Reserved for future routing/provider knobs. + + Returns: + A wrapper suitable to register as a Dapr activity. + + Raises: + ValueError: If `agent` is missing. + """ + if agent is None: + raise ValueError("@agent_activity requires an AgentBase instance.") + + def decorator(func: Callable[..., Any]) -> Callable[..., Any]: + if not callable(func): + raise ValueError("@agent_activity must decorate a callable activity.") + + original_sig = inspect.signature(func) + activity_sig = strip_context_parameter(original_sig) + prompt_template = prompt or "" + + async def _execute(ctx: WorkflowActivityContext, payload: Any = None) -> Any: + normalized = ( + normalize_input(activity_sig, payload) if payload is not None else {} + ) + + if prompt_template: + formatted_prompt = format_prompt( + activity_sig, prompt_template, normalized + ) + else: + formatted_prompt = format_agent_input(payload, normalized) + + raw = await agent.run(formatted_prompt) + converted = convert_result(raw) + validated = await validate_result(converted, activity_sig) + return validated + + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + """Sync activity wrapper: execute async pipeline to completion.""" + ctx, payload = extract_ctx_and_payload(args, dict(kwargs)) + result = _execute(ctx, payload) # coroutine + + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + + if loop and loop.is_running(): + fut = asyncio.run_coroutine_threadsafe(result, loop) + return fut.result() + + return asyncio.run(result) + + wrapper._is_agent_activity = True # noqa: SLF001 + wrapper._agent_activity_config = { # noqa: SLF001 + "prompt": prompt, + "task_kwargs": task_kwargs, + } + wrapper._original_activity = func # noqa: SLF001 + return wrapper + + return decorator diff --git a/dapr_agents/workflow/decorators/core.py b/dapr_agents/workflow/decorators/core.py index a8b8ea8d..f3971968 100644 --- a/dapr_agents/workflow/decorators/core.py +++ b/dapr_agents/workflow/decorators/core.py @@ -1,6 +1,20 @@ +import logging import functools +import warnings from typing import Any, Callable, Optional -import logging + + +_TASK_DEPRECATION_MESSAGE = ( + "@task is deprecated and will be removed in a future release. " + "Switch to native Dapr activity registration (WorkflowRuntime.activity) and, " + "for LLM/agent helpers, prefer the new @llm_activity or @agent_activity decorators." +) + +_WORKFLOW_DEPRECATION_MESSAGE = ( + "@workflow is deprecated and will be removed in a future release. " + "Switch to native Dapr workflow registration (WorkflowRuntime.workflow) and combine with " + "@message_router or other decorators as needed." +) def task( @@ -41,6 +55,12 @@ def decorator(f: Callable) -> Callable: if not callable(f): raise ValueError(f"@task must be applied to a function, got {type(f)}.") + warnings.warn( + _TASK_DEPRECATION_MESSAGE, + DeprecationWarning, + stacklevel=2, + ) + # Attach task metadata f._is_task = True f._task_name = name or f.__name__ @@ -102,6 +122,12 @@ def decorator(f: Callable) -> Callable: if not callable(f): raise ValueError(f"@workflow must be applied to a function, got {type(f)}.") + warnings.warn( + _WORKFLOW_DEPRECATION_MESSAGE, + DeprecationWarning, + stacklevel=2, + ) + f._is_workflow = True f._workflow_name = name or f.__name__ diff --git a/dapr_agents/workflow/utils/activities.py b/dapr_agents/workflow/utils/activities.py new file mode 100644 index 00000000..95aead2a --- /dev/null +++ b/dapr_agents/workflow/utils/activities.py @@ -0,0 +1,314 @@ +from __future__ import annotations + +import inspect +from dataclasses import asdict, is_dataclass +from types import SimpleNamespace +from typing import Any, Dict, List, Mapping, Tuple + +from dapr.ext.workflow import WorkflowActivityContext # type: ignore +from pydantic import BaseModel + +from dapr_agents.llm.utils import StructureHandler +from dapr_agents.types import BaseMessage, LLMChatResponse, UserMessage + +# Public alias for normalized/filtered inputs passed to activities. +NormalizedInput = Dict[str, Any] + + +def normalize_input( + signature: inspect.Signature, + raw_input: Any, + *, + strict: bool = True, +) -> NormalizedInput: + """Normalize a workflow payload to kwargs that match an activity signature. + + Prefer structured forms (dataclass, ``SimpleNamespace``, Pydantic, mapping). + Fall back to assigning the first positional parameter if the payload is a + scalar or otherwise not mapping-like. + + When ``strict`` is True, raise ``TypeError`` if the payload includes keys + not present in the activity signature. This detects mismatches early. + + Args: + signature: Activity signature (minus context parameter). + raw_input: Original payload provided by the workflow. + strict: Whether to raise on unexpected keys (default: True). + + Returns: + Mapping aligned to parameter names for the activity. + + Raises: + TypeError: If ``strict`` is True and unknown keys are detected. + """ + if raw_input is None: + return {} + + # Coerce common structured inputs to dicts. + if is_dataclass(raw_input): + data = asdict(raw_input) + elif isinstance(raw_input, SimpleNamespace): + data = vars(raw_input) + elif isinstance(raw_input, BaseModel): + data = raw_input.model_dump() + elif isinstance(raw_input, Mapping): + data = dict(raw_input) + else: + # Scalar / positional fallback: bind to the first parameter if present. + if not signature.parameters: + return {} + first_param = next(iter(signature.parameters)) + return {first_param: raw_input} + + param_names = set(signature.parameters.keys()) + unknown = set(data.keys()) - param_names + if strict and unknown: + raise TypeError(f"Unexpected input keys for activity: {sorted(unknown)}") + + return {k: v for k, v in data.items() if k in param_names} + + +def format_prompt( + signature: inspect.Signature, template: str, data: Dict[str, Any] +) -> str: + """Render a prompt template using defaults from the signature and provided data. + + The provided ``data`` should already be normalized via :func:`normalize_input`, + which filters unknown keys. This function binds partial arguments against the + signature to apply parameter defaults before formatting the template. + + Args: + signature: Activity signature used for default resolution. + template: Prompt template (may refer to argument names). + data: Normalized payload for formatting. + + Returns: + Rendered prompt string. Returns an empty string if ``template`` is falsy. + + Raises: + ValueError: If binding or template formatting fails due to missing keys. + """ + if not template: + return "" + + # Bind partially to compute defaults for any omitted optional parameters. + try: + bound = signature.bind_partial(**data) + except TypeError as exc: + raise ValueError( + f"Failed to bind prompt arguments to signature: {exc}" + ) from exc + + bound.apply_defaults() + + # Format with the bound arguments. Report the first missing key clearly. + try: + return template.format(**bound.arguments) + except KeyError as exc: + missing_key = exc.args[0] + raise ValueError( + f"Prompt template expects missing key: '{missing_key}'" + ) from exc + + +def format_agent_input(payload: Any, data: Dict[str, Any]) -> str: + """Create a simple natural-language string for agent input from payload/data. + + This is a best-effort formatter for agent-facing inputs when no explicit + prompt template is used. + + Args: + payload: Original input object. + data: Normalized mapping (post-filtering) of payload fields. + + Returns: + A concise, readable string for agent consumption. + """ + if payload is None: + return "" + + # Fast-path scalars. + if isinstance(payload, (str, int, float, bool)): + return str(payload) + + # If there's a single normalized field, return its string form. + if data and len(data) == 1: + value = next(iter(data.values())) + return "" if value is None else str(value) + + # Otherwise, render a multi-line key: value list. + if data: + parts = [f"{key}: {value}" for key, value in data.items() if value is not None] + return "\n".join(parts) + + # Last-resort stringification of the raw payload. + return str(payload) + + +def build_llm_params( + signature: inspect.Signature, + prompt: str, + structured_mode: str, +) -> Dict[str, Any]: + """Build keyword arguments for ``ChatClientBase.generate``. + + If the activity return type is annotated, add a ``response_format`` so + structured output can be requested from providers that support it. + + Args: + signature: Activity signature; its return annotation drives structure. + prompt: Final prompt string (already formatted). + structured_mode: Provider-specific structured mode (e.g., ``"json"``). + + Returns: + Dict of parameters for the chat client, including messages and, when + applicable, structured output hints. + """ + messages: List[BaseMessage] = [] + if prompt: + messages.append(UserMessage(prompt)) + + params: Dict[str, Any] = {"messages": messages} + + # If a concrete return annotation is present, try to resolve a model. + if signature.return_annotation is not inspect.Signature.empty: + model_cls = StructureHandler.resolve_response_model(signature.return_annotation) + if model_cls: + params["response_format"] = model_cls + params["structured_mode"] = structured_mode + + return params + + +def convert_result(result: Any) -> Any: + """Convert LLM/agent results to plain Python containers. + + Normalization rules: + - ``LLMChatResponse`` → extract underlying message content. + - Pydantic model → ``dict`` via ``model_dump()``. + - List of Pydantic models → ``list[dict]``. + - Otherwise → pass through unchanged. + + Args: + result: Raw result returned by the chat client or agent. + + Returns: + A Python container suitable for validation/serialization. + """ + if isinstance(result, LLMChatResponse): + message = result.get_message() + return getattr(message, "content", None) + + if isinstance(result, BaseMessage): + return result.model_dump() + + if isinstance(result, BaseModel): + return result.model_dump() + + if isinstance(result, list) and all(isinstance(item, BaseModel) for item in result): + return [item.model_dump() for item in result] + + return result + + +async def validate_result(result: Any, signature: inspect.Signature) -> Any: + """Validate/transform a result against an activity's return annotation. + + If the result is awaitable, it is awaited first. If no return annotation is + present, the value is returned as-is. Otherwise, the value is validated and + coerced according to the annotation via :class:`StructureHandler`. + + Args: + result: Raw (possibly awaitable) result. + signature: Activity signature with return annotation. + + Returns: + Validated/converted value compatible with the annotation. + """ + if inspect.isawaitable(result): + result = await result + + if ( + not signature.return_annotation + or signature.return_annotation is inspect.Signature.empty + ): + return result + + return StructureHandler.validate_against_signature( + result, signature.return_annotation + ) + + +def strip_context_parameter(signature: inspect.Signature) -> inspect.Signature: + """Remove the leading workflow context parameter, when present. + + Activities often accept a Dapr workflow context (``ctx``) as the first + argument; removing it simplifies payload binding/formatting, which should + only consider user-provided inputs. + + Args: + signature: Original callable signature. + + Returns: + A new signature without the context parameter (if detected). + """ + params = list(signature.parameters.values()) + if not params: + return signature + + first = params[0] + if ( + first.annotation is WorkflowActivityContext # explicit type annotation + or first.name in {"ctx", "context", "workflow_ctx"} # common names + ): + params = params[1:] + + return inspect.Signature( + parameters=params, return_annotation=signature.return_annotation + ) + + +def extract_ctx_and_payload( + args: Tuple[Any, ...], kwargs: Dict[str, Any] +) -> Tuple[Any, Any]: + """Extract ``(ctx, payload)`` from wrapper ``args``/``kwargs``. + + Accepts either positional ``(ctx, payload)`` or keyword forms + (``ctx=...``, and one of ``input=``/``payload=``). Any remaining keyword + arguments are interpreted as the payload mapping. + + Args: + args: Positional args from the activity invocation. + kwargs: Keyword args from the activity invocation (may be consumed). + + Returns: + ``(ctx, payload)`` pair suitable for execution. + + Raises: + ValueError: If the workflow context is missing. + """ + ctx = None + payload: Any = None + + # Positional: (ctx[, payload]) + if args: + ctx = args[0] + if len(args) > 1: + payload = args[1] + + # Keyword fallback for ctx + if ctx is None: + ctx = kwargs.pop("ctx", kwargs.pop("context", None)) + + # Keyword payload (prefer 'payload', then 'input') + if payload is None: + payload = kwargs.pop("payload", kwargs.pop("input", None)) + + # If there are still kwargs left, treat them as a payload mapping. + if payload is None and kwargs: + payload = kwargs # remaining named fields + + if ctx is None: + raise ValueError("Workflow context is required for activity execution.") + + return ctx, payload diff --git a/pyproject.toml b/pyproject.toml index a2d4a512..aad21017 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,7 +38,6 @@ dependencies = [ "websockets>=15.0.0,<16.0.0", "python-dotenv>=1.1.1,<2.0.0", "posthog<6.0.0", - "nltk>=3.8.0,<4.0.0", "protobuf>=6.31.0,<7.0.0", ] classifiers = [ @@ -80,6 +79,7 @@ vectorstore = [ "torch>=2.7.0", "sentence-transformers>=4.1.0,<5.0.0", "chromadb>=0.4.22,<2.0.0", + "nltk>=3.8.0,<4.0.0", ] [project.urls] diff --git a/quickstarts/04-agent-based-workflows/01_sequential_workflow.py b/quickstarts/04-agent-based-workflows/01_sequential_workflow.py new file mode 100644 index 00000000..c765c7a0 --- /dev/null +++ b/quickstarts/04-agent-based-workflows/01_sequential_workflow.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +import logging +import time + +import dapr.ext.workflow as wf +from dapr.ext.workflow import DaprWorkflowContext +from dotenv import load_dotenv + +from dapr_agents import Agent +from dapr_agents.llm.dapr import DaprChatClient +from dapr_agents.workflow.decorators import agent_activity + +# ----------------------------------------------------------------------------- +# Setup +# ----------------------------------------------------------------------------- +load_dotenv() +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) +runtime = wf.WorkflowRuntime() +llm = DaprChatClient(component_name="openai") + +# ----------------------------------------------------------------------------- +# Agents +# ----------------------------------------------------------------------------- +extractor = Agent( + name="DestinationExtractor", + role="Extract destination", + instructions=[ + "Extract the main city from the user's message.", + "Return only the city name, nothing else.", + ], + llm=llm, +) + +planner = Agent( + name="PlannerAgent", + role="Trip planner", + instructions=[ + "Create a concise 3-day outline for the given destination.", + "Balance culture, food, and leisure activities.", + ], + llm=llm, +) + +expander = Agent( + name="ItineraryAgent", + role="Itinerary expander", + llm=llm, + instructions=[ + "Expand a 3-day outline into a detailed itinerary.", + "Include Morning, Afternoon, and Evening sections each day.", + ], +) + + +# ----------------------------------------------------------------------------- +# Workflow +# ----------------------------------------------------------------------------- + + +@runtime.workflow(name="chained_planner_workflow") +def chained_planner_workflow(ctx: DaprWorkflowContext, user_msg: str) -> str: + """Plan a 3-day trip using chained agent activities.""" + dest = yield ctx.call_activity(extract_destination, input=user_msg) + outline = yield ctx.call_activity(plan_outline, input=dest["content"]) + itinerary = yield ctx.call_activity(expand_itinerary, input=outline["content"]) + return itinerary["content"] + + +# ----------------------------------------------------------------------------- +# Activities (no explicit params, no prompts) +# ----------------------------------------------------------------------------- + + +@runtime.activity(name="extract_destination") +@agent_activity(agent=extractor) +def extract_destination(ctx) -> dict: + """Extract destination city.""" + pass + + +@runtime.activity(name="plan_outline") +@agent_activity(agent=planner) +def plan_outline(ctx) -> dict: + """Generate a 3-day outline for the destination.""" + pass + + +@runtime.activity(name="expand_itinerary") +@agent_activity(agent=expander) +def expand_itinerary(ctx) -> dict: + """Expand the outline into a full detailed itinerary.""" + pass + + +# ----------------------------------------------------------------------------- +# Entrypoint +# ----------------------------------------------------------------------------- + +if __name__ == "__main__": + runtime.start() + time.sleep(5) + + client = wf.DaprWorkflowClient() + user_input = "Plan a trip to Paris." + + logger.info("Starting workflow: %s", user_input) + instance_id = client.schedule_new_workflow( + workflow=chained_planner_workflow, + input=user_input, + ) + + logger.info("Workflow started: %s", instance_id) + state = client.wait_for_workflow_completion(instance_id) + + if not state: + logger.error("No state returned (instance may not exist).") + elif state.runtime_status.name == "COMPLETED": + logger.info("Trip Itinerary:\n%s", state.serialized_output) + else: + logger.error("Workflow ended with status: %s", state.runtime_status) + if state.failure_details: + fd = state.failure_details + logger.error("Failure type: %s", fd.error_type) + logger.error("Failure message: %s", fd.message) + logger.error("Stack trace:\n%s", fd.stack_trace) + else: + logger.error("Custom status: %s", state.serialized_custom_status) + + runtime.shutdown() diff --git a/quickstarts/04-agent-based-workflows/README.md b/quickstarts/04-agent-based-workflows/README.md new file mode 100644 index 00000000..646242b4 --- /dev/null +++ b/quickstarts/04-agent-based-workflows/README.md @@ -0,0 +1,170 @@ +# Agent-based Workflow Patterns + +This quickstart demonstrates how to orchestrate agentic tasks using Dapr Workflows and the `@agent_activity` decorator from Dapr Agents. You’ll learn how to compose multi-step workflows that call autonomous agents—each powered by LLMs—for reasoning, decision-making, and task execution. + +## Prerequisites + +- Python 3.10 (recommended) +- pip package manager +- OpenAI API key +- Dapr CLI and Docker installed + +## Environment Setup + +```bash +# Create a virtual environment +python3.10 -m venv .venv + +# Activate the virtual environment +# On Windows: +.venv\Scripts\activate +# On macOS/Linux: +source .venv/bin/activate + +# Install dependencies +pip install -r requirements.txt +``` + +## Configuration + +The quickstart includes an OpenAI component configuration in the `components` directory. You have two options to configure your API key: + +### Option 1: Using Environment Variables (Recommended) + +1. Create a `.env` file in the project root and add your OpenAI API key: + +```env +OPENAI_API_KEY=your_api_key_here +``` + +2. When running the examples with Dapr, use the helper script to resolve environment variables: + +```bash +# Get the environment variables from the .env file: +export $(grep -v '^#' ../../.env | xargs) + +# Create a temporary resources folder with resolved environment variables +temp_resources_folder=$(../resolve_env_templates.py ./components) + +# Run your dapr command with the temporary resources +dapr run --app-id dapr-agent-wf --resources-path $temp_resources_folder -- python sequential_workflow.py + +# Clean up when done +rm -rf $temp_resources_folder +``` + +> The temporary resources folder will be automatically deleted when the Dapr sidecar is stopped or when the computer is restarted. + +### Option 2: Direct Component Configuration + +You can directly update the `key` in [components/openai.yaml](components/openai.yaml): +```yaml +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: openai +spec: + type: conversation.openai + metadata: + - name: key + value: "YOUR_OPENAI_API_KEY" +``` + +Replace `YOUR_OPENAI_API_KEY` with your actual OpenAI API key. + +> Many LLM providers are compatible with OpenAI's API (DeepSeek, Google AI, etc.) and can be used with this component by configuring the appropriate parameters. Dapr also has [native support](https://docs.dapr.io/reference/components-reference/supported-conversation/) for other providers like Google AI, Anthropic, Mistral, DeepSeek, etc. + +### Additional Components + +Make sure Dapr is initialized on your system: + +```bash +dapr init +``` + +The quickstart includes other necessary Dapr components in the `components` directory. For example, the workflow state store component: + +Look at the `workflowstate.yaml` file in the `components` directory: + +```yaml +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: workflowstatestore +spec: + type: state.redis + version: v1 + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" + - name: actorStateStore + value: "true" +``` + +## Examples + +### 1. Sequential Agent Chain (01_sequential_workflow.py) + +This example chains three autonomous agents in a Dapr Workflow. +Each agent performs one stage of the reasoning process: extraction, planning, and expansion. + +Workflow Overview + +| Step | Agent | Responsibility | +| --- | --- | --- | +| 1️⃣ | DestinationExtractor | Identify the destination city from the user message | +| 2️⃣ | PlannerAgent | Create a concise 3-day outline for the destination | +| 3️⃣ | ItineraryAgent | Expand the outline into a detailed itinerary | + +Run + +```bash +dapr run --app-id dapr-agent-planner --resources-path components/ -- python 01_sequential_workflow.py +``` + +How It Works + +* The workflow begins with the user message, e.g., `"Plan a trip to Paris."` +* `extract_destination` calls the `DestinationExtractor` agent to return the city name. +* `plan_outline` uses the `PlannerAgent` to generate a 3-day itinerary outline. +* `expand_itinerary` passes that outline to the `ItineraryAgent`, which expands it into a detailed plan. +* The final output is logged as a structured itinerary text. + +#### Code Highlights + +* `@agent_activity` decorator: Wraps an activity function so that Dapr automatically delegates its implementation to an Agent. +The function body can remain empty (pass); execution is routed through the agent’s reasoning loop. +* Agents: Each agent defines: + * name, role, and instructions + * a shared llm client (DaprChatClient) + * internal memory, message history, and optional tools +* Workflow Orchestration: The `@runtime.workflow` function coordinates agent tasks: + +```python +dest = yield ctx.call_activity(extract_destination, input=user_msg) +outline = yield ctx.call_activity(plan_outline, input=dest["content"]) +itinerary = yield ctx.call_activity(expand_itinerary, input=outline["content"]) +return itinerary["content"] +``` + +## Integration with Dapr + +Dapr Agents workflows leverage Dapr's core capabilities: + +- **Durability**: Workflows survive process restarts or crashes +- **State Management**: Workflow state is persisted in a distributed state store +- **Actor Model**: Tasks run as reliable, stateful actors within the workflow +- **Event Handling**: Workflows can react to external events + +## Troubleshooting + +1. **Docker is Running**: Ensure Docker is running with `docker ps` and verify you have container instances with `daprio/dapr`, `openzipkin/zipkin`, and `redis` images running +2. **Redis Connection**: Ensure Redis is running (automatically installed by Dapr) +3. **Dapr Initialization**: If components aren't found, verify Dapr is initialized with `dapr init` +4. **API Key**: Check your OpenAI API key if authentication fails + +## Next Steps + +After completing this quickstart, move on to the [Multi-Agent Workflow quickstart](../05-multi-agent-workflows/README.md) to learn how to create distributed systems of collaborating agents. \ No newline at end of file diff --git a/quickstarts/04-agent-based-workflows/components/openai.yaml b/quickstarts/04-agent-based-workflows/components/openai.yaml new file mode 100644 index 00000000..61fcbf17 --- /dev/null +++ b/quickstarts/04-agent-based-workflows/components/openai.yaml @@ -0,0 +1,14 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: openai +spec: + type: conversation.openai + version: v1 + metadata: + - name: key + value: "{{OPENAI_API_KEY}}" + - name: model + value: gpt-4o-mini + - name: temperature + value: 1 diff --git a/quickstarts/04-agent-based-workflows/components/workflowstate.yaml b/quickstarts/04-agent-based-workflows/components/workflowstate.yaml new file mode 100644 index 00000000..2b09d93b --- /dev/null +++ b/quickstarts/04-agent-based-workflows/components/workflowstate.yaml @@ -0,0 +1,14 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: workflowstatestore +spec: + type: state.redis + version: v1 + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" + - name: actorStateStore + value: "true" \ No newline at end of file diff --git a/quickstarts/04-llm-based-workflows/01_single_activity_workflow.py b/quickstarts/04-llm-based-workflows/01_single_activity_workflow.py new file mode 100644 index 00000000..f098c900 --- /dev/null +++ b/quickstarts/04-llm-based-workflows/01_single_activity_workflow.py @@ -0,0 +1,60 @@ +import time + +import dapr.ext.workflow as wf +from dapr.ext.workflow import DaprWorkflowContext +from dotenv import load_dotenv + +from dapr_agents.llm.dapr import DaprChatClient +from dapr_agents.workflow.decorators import llm_activity + +# Load environment variables (e.g., API keys, secrets) +load_dotenv() + +# Initialize the Dapr workflow runtime and LLM client +runtime = wf.WorkflowRuntime() +llm = DaprChatClient(component_name="openai") + + +@runtime.workflow(name="single_task_workflow") +def single_task_workflow(ctx: DaprWorkflowContext, name: str): + """Ask the LLM about a single historical figure and return a short bio.""" + response = yield ctx.call_activity(describe_person, input={"name": name}) + return response + + +@runtime.activity(name="describe_person") +@llm_activity( + prompt="Who was {name}?", + llm=llm, +) +async def describe_person(ctx, name: str) -> str: + pass + + +if __name__ == "__main__": + runtime.start() + time.sleep(5) + + client = wf.DaprWorkflowClient() + instance_id = client.schedule_new_workflow( + workflow=single_task_workflow, + input="Grace Hopper", + ) + print(f"Workflow started: {instance_id}") + + state = client.wait_for_workflow_completion(instance_id) + if not state: + print("No state returned (instance may not exist).") + elif state.runtime_status.name == "COMPLETED": + print(f"Grace Hopper bio:\n{state.serialized_output}") + else: + print(f"Workflow ended with status: {state.runtime_status}") + if state.failure_details: + fd = state.failure_details + print("Failure type:", fd.error_type) + print("Failure message:", fd.message) + print("Stack trace:\n", fd.stack_trace) + else: + print("Custom status:", state.serialized_custom_status) + + runtime.shutdown() diff --git a/quickstarts/04-llm-based-workflows/02_single_structured_activity_workflow.py b/quickstarts/04-llm-based-workflows/02_single_structured_activity_workflow.py new file mode 100644 index 00000000..2f217be6 --- /dev/null +++ b/quickstarts/04-llm-based-workflows/02_single_structured_activity_workflow.py @@ -0,0 +1,76 @@ +import time + +import dapr.ext.workflow as wf +from dapr.ext.workflow import DaprWorkflowContext +from dotenv import load_dotenv +from pydantic import BaseModel + +from dapr_agents.llm.dapr import DaprChatClient +from dapr_agents.workflow.decorators import llm_activity + + +class Dog(BaseModel): + name: str + bio: str + breed: str + + +# Load environment variables (e.g., API keys, secrets) +load_dotenv() + +# Initialize the Dapr workflow runtime and LLM client +runtime = wf.WorkflowRuntime() +llm = DaprChatClient(component_name="openai") + + +@runtime.workflow(name="single_task_workflow_structured") +def single_task_workflow_structured(ctx: DaprWorkflowContext, name: str): + """Ask the LLM for structured data about a dog and return the result.""" + result = yield ctx.call_activity(describe_dog, input={"name": name}) + return result + + +@runtime.activity(name="describe_dog") +@llm_activity( + prompt=""" +You are a JSON-only API. Return a Dog object for the dog named {name}." +JSON schema (informal): +{{ + "name": string, // Dog\'s full name + "bio": string, // 1-3 sentence biography + "breed": string // Primary breed or mixed +}} +""", + llm=llm, +) +def describe_dog(ctx, name: str) -> Dog: + pass + + +if __name__ == "__main__": + runtime.start() + time.sleep(5) + + client = wf.DaprWorkflowClient() + instance_id = client.schedule_new_workflow( + workflow=single_task_workflow_structured, + input="Laika", + ) + print(f"Workflow started: {instance_id}") + + state = client.wait_for_workflow_completion(instance_id) + if not state: + print("No state returned (instance may not exist).") + elif state.runtime_status.name == "COMPLETED": + print(f"Dog Bio:\n{state.serialized_output}") + else: + print(f"Workflow ended with status: {state.runtime_status}") + if state.failure_details: + fd = state.failure_details + print("Failure type:", fd.error_type) + print("Failure message:", fd.message) + print("Stack trace:\n", fd.stack_trace) + else: + print("Custom status:", state.serialized_custom_status) + + runtime.shutdown() diff --git a/quickstarts/04-llm-based-workflows/03_sequential_workflow.py b/quickstarts/04-llm-based-workflows/03_sequential_workflow.py new file mode 100644 index 00000000..e4b74288 --- /dev/null +++ b/quickstarts/04-llm-based-workflows/03_sequential_workflow.py @@ -0,0 +1,83 @@ +import time + +import dapr.ext.workflow as wf +from dapr.ext.workflow import DaprWorkflowContext +from dotenv import load_dotenv + +from dapr_agents.llm.dapr import DaprChatClient +from dapr_agents.workflow.decorators import llm_activity + +# Load environment variables (e.g., API keys, secrets) +load_dotenv() + +# Initialize the Dapr workflow runtime and LLM client +runtime = wf.WorkflowRuntime() +llm = DaprChatClient(component_name="openai") + + +@runtime.workflow(name="task_chain_workflow") +def task_chain_workflow(ctx: DaprWorkflowContext): + """ + Chain two LLM-backed activities: + 1) Pick a random LOTR character (name only) + 2) Ask for a famous quote from that character + """ + character = yield ctx.call_activity(get_character) + line = yield ctx.call_activity(get_line, input={"character": character}) + return line + + +@runtime.activity(name="get_character") +@llm_activity( + prompt=""" +Pick a random character from The Lord of the Rings. +Respond with the character's name only. +""", + llm=llm, +) +def get_character(ctx) -> str: + # The llm_activity decorator handles the LLM call using the prompt above. + # Just declare the signature; the body can be empty or 'pass'. + pass + + +@runtime.activity(name="get_line") +@llm_activity( + prompt="What is a famous line by {character}?", + llm=llm, +) +def get_line(ctx, character: str) -> str: + # The llm_activity decorator will format the prompt with 'character'. + pass + + +if __name__ == "__main__": + # Start the workflow runtime sidecar + runtime.start() + time.sleep(5) # small grace period for runtime to be ready + + # Kick off the workflow + client = wf.DaprWorkflowClient() + instance_id = client.schedule_new_workflow( + workflow=task_chain_workflow, + input=None, # no input expected for this workflow + ) + print(f"Workflow started: {instance_id}") + + # Wait for completion and print results + state = client.wait_for_workflow_completion(instance_id) + if not state: + print("No state returned (instance may not exist).") + elif state.runtime_status.name == "COMPLETED": + print(f"Famous Line:\n{state.serialized_output}") + else: + print(f"Workflow ended with status: {state.runtime_status}") + if state.failure_details: + fd = state.failure_details + print("Failure type:", fd.error_type) + print("Failure message:", fd.message) + print("Stack trace:\n", fd.stack_trace) + else: + print("Custom status:", state.serialized_custom_status) + + runtime.shutdown() diff --git a/quickstarts/04-llm-based-workflows/04_parallel_workflow.py b/quickstarts/04-llm-based-workflows/04_parallel_workflow.py new file mode 100644 index 00000000..9de81d75 --- /dev/null +++ b/quickstarts/04-llm-based-workflows/04_parallel_workflow.py @@ -0,0 +1,157 @@ +import logging +import time +from typing import List + +import dapr.ext.workflow as wf +from dapr.ext.workflow import DaprWorkflowContext +from dotenv import load_dotenv +from pydantic import BaseModel, Field + +from dapr_agents.llm.dapr import DaprChatClient +from dapr_agents.workflow.decorators import llm_activity + +# Load environment variables (API keys, etc.) +load_dotenv() + +# Configure logging +logging.basicConfig(level=logging.INFO) + +# Initialize the Dapr workflow runtime and LLM client +runtime = wf.WorkflowRuntime() +llm = DaprChatClient(component_name="openai") + + +# ----- Models ----- + + +class Question(BaseModel): + """Represents a single research question.""" + + text: str = Field(..., description="A research question related to the topic.") + + +class Questions(BaseModel): + """Encapsulates a list of research questions.""" + + questions: List[Question] = Field( + ..., description="A list of research questions generated for the topic." + ) + + +# ----- Workflow ----- + + +@runtime.workflow(name="research_workflow") +def research_workflow(ctx: DaprWorkflowContext, topic: str): + """Defines a Dapr workflow for researching a given topic.""" + # 1) Generate research questions + questions = yield ctx.call_activity(generate_questions, input={"topic": topic}) + + # Extract question texts from the dictionary structure + q_list = [q["text"] for q in questions["questions"]] + + # 2) Gather information for each question in parallel + parallel_tasks = [ + ctx.call_activity(gather_information, input={"question": q}) for q in q_list + ] + research_results: List[str] = yield wf.when_all(parallel_tasks) + + # 3) Synthesize final report + final_report: str = yield ctx.call_activity( + synthesize_results, input={"topic": topic, "research_results": research_results} + ) + + return final_report + + +# ----- Activities ----- + + +@runtime.activity(name="generate_questions") +@llm_activity( + prompt=""" +You are a research assistant. Generate exactly 3 focused research questions about the topic: {topic}. +Return ONLY a JSON object matching this schema (no prose): + +{{ + "questions": [ + {{ "text": "..." }}, + {{ "text": "..." }}, + {{ "text": "..." }} + ] +}} +""", + llm=llm, +) +def generate_questions(ctx, topic: str) -> Questions: + # Implemented by llm_activity via the prompt above. + pass + + +@runtime.activity(name="gather_information") +@llm_activity( + prompt=""" +Research the following question and provide a detailed, well-cited answer (paragraphs + bullet points where helpful). +Question: {question} +""", + llm=llm, +) +def gather_information(ctx, question: str) -> str: + # Implemented by llm_activity via the prompt above. + pass + + +@runtime.activity(name="synthesize_results") +@llm_activity( + prompt=""" +Create a comprehensive research report on the topic "{topic}" using the following research findings: + +{research_results} + +Requirements: +- Clear executive summary (3-5 sentences) +- Key findings (bulleted) +- Risks/unknowns +- Short conclusion + +Return plain text (no JSON). +""", + llm=llm, +) +def synthesize_results(ctx, topic: str, research_results: List[str]) -> str: + # Implemented by llm_activity via the prompt above. + pass + + +# ----- Entrypoint ----- + +if __name__ == "__main__": + runtime.start() + time.sleep(5) # small grace period for runtime readiness + + client = wf.DaprWorkflowClient() + research_topic = "The environmental impact of quantum computing" + + logging.info(f"Starting research workflow on: {research_topic}") + instance_id = client.schedule_new_workflow( + workflow=research_workflow, + input=research_topic, + ) + logging.info(f"Workflow started: {instance_id}") + + state = client.wait_for_workflow_completion(instance_id) + if not state: + logging.error("No state returned (instance may not exist).") + elif state.runtime_status.name == "COMPLETED": + logging.info(f"\nResearch Report:\n{state.serialized_output}") + else: + logging.error(f"Workflow ended with status: {state.runtime_status}") + if state.failure_details: + fd = state.failure_details + logging.error("Failure type: %s", fd.error_type) + logging.error("Failure message: %s", fd.message) + logging.error("Stack trace:\n%s", fd.stack_trace) + else: + logging.error("Custom status: %s", state.serialized_custom_status) + + runtime.shutdown() diff --git a/quickstarts/04-llm-based-workflows/README.md b/quickstarts/04-llm-based-workflows/README.md index 76c03473..cb00d630 100644 --- a/quickstarts/04-llm-based-workflows/README.md +++ b/quickstarts/04-llm-based-workflows/README.md @@ -1,7 +1,7 @@ # LLM-based Workflow Patterns -This quickstart demonstrates how to orchestrate sequential and parallel tasks using Dapr Agents' workflow capabilities powered by Language Models (LLMs). You'll learn how to build resilient, stateful workflows that leverage LLMs for reasoning, decision-making, and automation. +This quickstart demonstrates how to orchestrate sequential and parallel tasks using Dapr Agents' workflow capabilities powered by Language Models (LLMs). You'll learn how to build resilient, stateful workflows that leverage LLMs for reasoning, structured output, and automation, all using the new `@llm_activity` decorator and native Dapr workflow runtime. ## Prerequisites @@ -33,11 +33,13 @@ The quickstart includes an OpenAI component configuration in the `components` di ### Option 1: Using Environment Variables (Recommended) 1. Create a `.env` file in the project root and add your OpenAI API key: + ```env OPENAI_API_KEY=your_api_key_here ``` 2. When running the examples with Dapr, use the helper script to resolve environment variables: + ```bash # Get the environment variables from the .env file: export $(grep -v '^#' ../../.env | xargs) @@ -52,7 +54,7 @@ dapr run --app-id dapr-agent-wf --resources-path $temp_resources_folder -- pytho rm -rf $temp_resources_folder ``` -Note: The temporary resources folder will be automatically deleted when the Dapr sidecar is stopped or when the computer is restarted. +> The temporary resources folder will be automatically deleted when the Dapr sidecar is stopped or when the computer is restarted. ### Option 2: Direct Component Configuration @@ -71,7 +73,7 @@ spec: Replace `YOUR_OPENAI_API_KEY` with your actual OpenAI API key. -Note: Many LLM providers are compatible with OpenAI's API (DeepSeek, Google AI, etc.) and can be used with this component by configuring the appropriate parameters. Dapr also has [native support](https://docs.dapr.io/reference/components-reference/supported-conversation/) for other providers like Google AI, Anthropic, Mistral, DeepSeek, etc. +> Many LLM providers are compatible with OpenAI's API (DeepSeek, Google AI, etc.) and can be used with this component by configuring the appropriate parameters. Dapr also has [native support](https://docs.dapr.io/reference/components-reference/supported-conversation/) for other providers like Google AI, Anthropic, Mistral, DeepSeek, etc. ### Additional Components @@ -120,7 +122,7 @@ timeout_seconds: 30 output_match_mode: substring --> ```bash -dapr run --app-id dapr-agent-wf --resources-path components/ -- python sequential_workflow.py +dapr run --app-id dapr-agent-wf-sequence --resources-path components/ -- python 03_sequential_workflow.py ``` @@ -144,7 +146,7 @@ expected_stdout_lines: output_match_mode: substring --> ```bash -dapr run --app-id dapr-agent-research --resources-path components/ -- python parallel_workflow.py +dapr run --app-id dapr-agent-research --resources-path components/ -- python 04_parallel_workflow.py ``` @@ -173,4 +175,4 @@ Dapr Agents workflows leverage Dapr's core capabilities: ## Next Steps -After completing this quickstart, move on to the [Multi-Agent Workflow quickstart](../05-multi-agent-workflows/README.md) to learn how to create distributed systems of collaborating agents. \ No newline at end of file +After completing this quickstart, move on to the [Agent Based Workflow Quickstart](../04-agent-absed-workflows/README.md) to learn how to integrate the concept of an agent on specific activity steps. \ No newline at end of file diff --git a/quickstarts/04-llm-based-workflows/parallel_workflow.py b/quickstarts/04-llm-based-workflows/parallel_workflow.py deleted file mode 100644 index 3de4220a..00000000 --- a/quickstarts/04-llm-based-workflows/parallel_workflow.py +++ /dev/null @@ -1,92 +0,0 @@ -import logging -from typing import List - -from dotenv import load_dotenv -from pydantic import BaseModel, Field - -from dapr_agents.workflow import WorkflowApp, workflow, task -from dapr.ext.workflow import DaprWorkflowContext - -# Load environment variables -load_dotenv() - -# Configure logging -logging.basicConfig(level=logging.INFO) - - -# Define a structured model for a single question -class Question(BaseModel): - """Represents a single research question.""" - - text: str = Field(..., description="A research question related to the topic.") - - -# Define a model that holds multiple questions -class Questions(BaseModel): - """Encapsulates a list of research questions.""" - - questions: List[Question] = Field( - ..., description="A list of research questions generated for the topic." - ) - - -# Define Workflow logic -@workflow(name="research_workflow") -def research_workflow(ctx: DaprWorkflowContext, topic: str): - """Defines a Dapr workflow for researching a given topic.""" - - # Generate research questions - questions: Questions = yield ctx.call_activity( - generate_questions, input={"topic": topic} - ) - - # Gather information for each question in parallel - parallel_tasks = [ - ctx.call_activity(gather_information, input={"question": q["text"]}) - for q in questions["questions"] - ] - research_results = yield wfapp.when_all( - parallel_tasks - ) # Ensure wfapp is initialized - - # Synthesize the results into a final report - final_report = yield ctx.call_activity( - synthesize_results, input={"topic": topic, "research_results": research_results} - ) - - return final_report - - -@task(description="Generate 3 focused research questions about {topic}.") -def generate_questions(topic: str) -> Questions: - """Generates three research questions related to the given topic.""" - pass - - -@task( - description="Research information to answer this question: {question}. Provide a detailed response." -) -def gather_information(question: str) -> str: - """Fetches relevant information based on the research question provided.""" - pass - - -@task( - description="Create a comprehensive research report on {topic} based on the following research: {research_results}" -) -def synthesize_results(topic: str, research_results: List[str]) -> str: - """Synthesizes the gathered research into a structured report.""" - pass - - -if __name__ == "__main__": - wfapp = WorkflowApp() - - research_topic = "The environmental impact of quantum computing" - - logging.info(f"Starting research workflow on: {research_topic}") - results = wfapp.run_and_monitor_workflow_sync( - research_workflow, input=research_topic - ) - if len(results) > 0: - logging.info(f"\nResearch Report:\n{results}") diff --git a/quickstarts/04-llm-based-workflows/sequential_workflow.py b/quickstarts/04-llm-based-workflows/sequential_workflow.py deleted file mode 100644 index f9dd0840..00000000 --- a/quickstarts/04-llm-based-workflows/sequential_workflow.py +++ /dev/null @@ -1,38 +0,0 @@ -from dapr_agents.workflow import WorkflowApp, workflow, task -from dapr.ext.workflow import DaprWorkflowContext -from dotenv import load_dotenv - -# Load environment variables -load_dotenv() - - -# Define Workflow logic -@workflow(name="task_chain_workflow") -def task_chain_workflow(ctx: DaprWorkflowContext): - result1 = yield ctx.call_activity(get_character) - result2 = yield ctx.call_activity(get_line, input={"character": result1}) - return result2 - - -@task( - description=""" - Pick a random character from The Lord of the Rings\n - and respond with the character's name only -""" -) -def get_character() -> str: - pass - - -@task( - description="What is a famous line by {character}", -) -def get_line(character: str) -> str: - pass - - -if __name__ == "__main__": - wfapp = WorkflowApp() - - results = wfapp.run_and_monitor_workflow_sync(task_chain_workflow) - print(f"Famous Line: {results}") diff --git a/quickstarts/04-llm-based-workflows/workflow_dapr.py b/quickstarts/04-llm-based-workflows/workflow_dapr.py deleted file mode 100644 index 1374848e..00000000 --- a/quickstarts/04-llm-based-workflows/workflow_dapr.py +++ /dev/null @@ -1,62 +0,0 @@ -import dapr.ext.workflow as wf -from dotenv import load_dotenv -from openai import OpenAI -from time import sleep - -# Load environment variables -load_dotenv() - -# Initialize Workflow Instance -wfr = wf.WorkflowRuntime() - - -# Define Workflow logic -@wfr.workflow(name="task_chain_workflow") -def task_chain_workflow(ctx: wf.DaprWorkflowContext): - result1 = yield ctx.call_activity(get_character) - result2 = yield ctx.call_activity(get_line, input=result1) - return result2 - - -# Activity 1 -@wfr.activity(name="step1") -def get_character(ctx): - client = OpenAI() - response = client.chat.completions.create( - messages=[ - { - "role": "user", - "content": "Pick a random character from The Lord of the Rings and respond with the character name only", - } - ], - model="gpt-4o", - ) - character = response.choices[0].message.content - print(f"Character: {character}") - return character - - -# Activity 2 -@wfr.activity(name="step2") -def get_line(ctx, character: str): - client = OpenAI() - response = client.chat.completions.create( - messages=[{"role": "user", "content": f"What is a famous line by {character}"}], - model="gpt-4o", - ) - line = response.choices[0].message.content - print(f"Line: {line}") - return line - - -if __name__ == "__main__": - wfr.start() - sleep(5) # wait for workflow runtime to start - - wf_client = wf.DaprWorkflowClient() - instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow) - print(f"Workflow started. Instance ID: {instance_id}") - state = wf_client.wait_for_workflow_completion(instance_id) - print(f"Workflow completed! Status: {state.runtime_status}") - - wfr.shutdown() diff --git a/quickstarts/README.md b/quickstarts/README.md index ba1f311b..f2d26034 100644 --- a/quickstarts/README.md +++ b/quickstarts/README.md @@ -48,7 +48,6 @@ Learn how to interact with Language Models using Dapr Agents' `DaprChatClient`: - **Resilience**: Setting timeout, retry and circuit-breaking - **PII Obfuscation** – Automatically detect and mask sensitive user information. - This quickstart shows basic text generation using plain text prompts and templates. Using the `DaprChatClient` you can target different LLM providers without changing your agent's code. [Go to Dapr LLM Call](./02_llm_call_dapr) @@ -95,7 +94,7 @@ This quickstart demonstrates how to build a weather assistant with durable, work ### LLM-based Workflow Patterns -Learn to orchestrate stateful, resilient workflows powered by Language Models (LLMs) using Dapr Agents: +Learn to orchestrate stateful, resilient workflows powered by Language Models (LLMs) using `@llm_activity` decorator. - **LLM-powered Tasks**: Automate reasoning and decision-making in workflows - **Task Chaining**: Build multi-step processes with reliable state management @@ -105,6 +104,21 @@ This quickstart demonstrates how to design and run sequential and parallel workf [Go to LLM-based Workflow Patterns](./04-llm-based-workflows/) +### Agent-based Workflow Patterns + +Learn to orchestrate **autonomous, role-driven agents** inside Dapr Workflows using the `@agent_activity` decorator. +These patterns focus on chaining and coordinating specialized agents that reason, plan, and act within durable, stateful workflows. + +> Currently, this does not work with `DurableAgents`. + +- **Agent-driven Tasks**: Execute workflow activities through autonomous agents with defined roles and instructions +- **Sequential & Composed Flows**: Chain multiple agents together, passing context and results between steps +- **Resilient Orchestration**: Combine agent reasoning with Dapr’s durable state, recovery, and execution guarantees + +This quickstart demonstrates how to design and run **agent-based workflows**, starting with a sequential chain of agents collaborating to complete a shared objective. + +[Go to Agent-based Workflow Patterns](./04-agent-based-workflows/) + ### Multi-Agent Workflows Advanced example of event-driven workflows with multiple autonomous agents: diff --git a/quickstarts/components/openai.yaml b/quickstarts/components/openai.yaml index e1c03b1d..ba2399f5 100644 --- a/quickstarts/components/openai.yaml +++ b/quickstarts/components/openai.yaml @@ -9,6 +9,6 @@ spec: - name: key value: "YOUR_OPENAI_API_KEY" - name: model - value: gpt-5-2025-08-07 + value: gpt-5-mini - name: temperature value: 1 diff --git a/tests/conftest.py b/tests/conftest.py index 08f19012..cc3fcb98 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,11 +1,9 @@ """Configuration for pytest.""" import pytest -import asyncio import sys import os import tempfile import shutil -from typing import Generator from unittest.mock import MagicMock # Add the project root to the Python path for imports @@ -147,14 +145,6 @@ class StreamInactiveError(Exception): # and provide shared fixtures across all tests. -@pytest.fixture(scope="session") -def event_loop() -> Generator[asyncio.AbstractEventLoop, None, None]: - """Create an instance of the default event loop for each test case.""" - loop = asyncio.get_event_loop_policy().new_event_loop() - yield loop - loop.close() - - @pytest.fixture(scope="session") def temp_dir(): """Create a temporary directory for tests.""" diff --git a/tests/workflow/orchestrators/test_random.py b/tests/workflow/orchestrators/test_random.py index af94d0b6..bf115cb3 100644 --- a/tests/workflow/orchestrators/test_random.py +++ b/tests/workflow/orchestrators/test_random.py @@ -16,8 +16,7 @@ def orchestrator_config(): } -@pytest.mark.asyncio -async def test_random_orchestrator_initialization(orchestrator_config): +def test_random_orchestrator_initialization(orchestrator_config): """Test that RandomOrchestrator can be initialized.""" with patch( "dapr_agents.workflow.orchestrators.random.OrchestratorWorkflowBase.model_post_init" @@ -49,8 +48,7 @@ async def test_process_input(orchestrator_config): assert result["content"] == task -@pytest.mark.asyncio -async def test_select_random_speaker(orchestrator_config): +def test_select_random_speaker(orchestrator_config): """Test the select_random_speaker task.""" with patch( "dapr_agents.workflow.orchestrators.random.OrchestratorWorkflowBase.model_post_init" diff --git a/tests/workflow/test_activity_decorators.py b/tests/workflow/test_activity_decorators.py new file mode 100644 index 00000000..01debea5 --- /dev/null +++ b/tests/workflow/test_activity_decorators.py @@ -0,0 +1,435 @@ +import pytest +from unittest.mock import MagicMock, AsyncMock, patch +from typing import List +from pydantic import BaseModel, Field + +from dapr_agents.workflow.decorators.activities import llm_activity, agent_activity +from dapr_agents.llm.chat import ChatClientBase +from dapr_agents.agents.base import AgentBase +from dapr_agents.types import LLMChatResponse, AssistantMessage + + +# Test Models +class Person(BaseModel): + """Test model for structured responses.""" + + name: str = Field(..., description="Person's name") + age: int = Field(..., description="Person's age") + + +class QuestionList(BaseModel): + """Test model for list responses.""" + + questions: List[str] = Field(..., description="List of questions") + + +# Fixtures +@pytest.fixture +def mock_llm_client(): + """Mock LLM client that returns test responses.""" + mock_client = MagicMock(spec=ChatClientBase) + mock_client.generate = MagicMock(return_value="Test response from LLM") + return mock_client + + +@pytest.fixture +def mock_llm_client_async(): + """Mock async LLM client.""" + mock_client = MagicMock(spec=ChatClientBase) + mock_client.generate = AsyncMock(return_value="Async test response") + return mock_client + + +@pytest.fixture +def mock_llm_client_structured(): + """Mock LLM client that returns structured (LLMChatResponse) responses.""" + from dapr_agents.types import LLMChatCandidate + + mock_client = MagicMock(spec=ChatClientBase) + candidate = LLMChatCandidate( + message=AssistantMessage(content="Structured response"), + finish_reason="stop", + ) + response = LLMChatResponse( + results=[candidate], + metadata={"model": "test-model"}, + ) + mock_client.generate = MagicMock(return_value=response) + return mock_client + + +@pytest.fixture +def mock_agent(): + """Mock agent that returns test responses.""" + mock_agent = MagicMock(spec=AgentBase) + mock_agent.run = AsyncMock(return_value="Agent response") + return mock_agent + + +@pytest.fixture +def mock_workflow_context(): + """Mock WorkflowActivityContext.""" + ctx = MagicMock() + ctx.instance_id = "test-instance-123" + return ctx + + +# Tests for llm_activity decorator +def test_llm_activity_requires_prompt(): + """Test that llm_activity raises ValueError when prompt is empty.""" + mock_llm = MagicMock(spec=ChatClientBase) + + with pytest.raises(ValueError, match="@llm_activity requires a prompt template"): + llm_activity(prompt="", llm=mock_llm) + + +def test_llm_activity_requires_llm(): + """Test that llm_activity raises ValueError when llm is None.""" + with pytest.raises( + ValueError, match="@llm_activity requires an explicit `llm` client instance" + ): + llm_activity(prompt="Test prompt", llm=None) + + +def test_llm_activity_decorator_basic(mock_llm_client, mock_workflow_context): + """Test basic llm_activity decoration and execution.""" + + @llm_activity(prompt="Say hello to {name}", llm=mock_llm_client) + def greet(ctx, name: str) -> str: + pass + + # Check wrapper metadata + assert hasattr(greet, "_is_llm_activity") + assert greet._is_llm_activity is True + assert hasattr(greet, "_llm_activity_config") + assert greet._llm_activity_config["prompt"] == "Say hello to {name}" + + # Execute the decorated function + result = greet(mock_workflow_context, payload={"name": "Alice"}) + + # Verify the result + assert result == "Test response from LLM" + mock_llm_client.generate.assert_called_once() + + +def test_llm_activity_positional_args(mock_llm_client, mock_workflow_context): + """Test llm_activity with positional arguments.""" + + @llm_activity(prompt="Process {text}", llm=mock_llm_client) + def process_text(ctx, text: str) -> str: + pass + + result = process_text(mock_workflow_context, {"text": "Hello World"}) + assert result == "Test response from LLM" + + +def test_llm_activity_keyword_args(mock_llm_client, mock_workflow_context): + """Test llm_activity with keyword arguments.""" + + @llm_activity(prompt="Summarize {content}", llm=mock_llm_client) + def summarize(ctx, content: str) -> str: + pass + + result = summarize(ctx=mock_workflow_context, payload={"content": "Long text"}) + assert result == "Test response from LLM" + + +def test_llm_activity_multiple_params(mock_llm_client, mock_workflow_context): + """Test llm_activity with multiple parameters.""" + + @llm_activity(prompt="Write a {length} story about {topic}", llm=mock_llm_client) + def write_story(ctx, topic: str, length: str) -> str: + pass + + result = write_story( + mock_workflow_context, payload={"topic": "robots", "length": "short"} + ) + assert result == "Test response from LLM" + + +def test_llm_activity_no_payload(mock_llm_client, mock_workflow_context): + """Test llm_activity with no input payload.""" + + @llm_activity(prompt="Generate a random fact", llm=mock_llm_client) + def random_fact(ctx) -> str: + pass + + result = random_fact(mock_workflow_context) + assert result == "Test response from LLM" + + +def test_llm_activity_async_llm(mock_llm_client_async, mock_workflow_context): + """Test llm_activity with async LLM client.""" + + @llm_activity(prompt="Test prompt", llm=mock_llm_client_async) + def async_test(ctx) -> str: + pass + + result = async_test(mock_workflow_context) + assert result == "Async test response" + + +def test_llm_activity_structured_response( + mock_llm_client_structured, mock_workflow_context +): + """Test llm_activity with LLMChatResponse (structured response).""" + + @llm_activity(prompt="Get info", llm=mock_llm_client_structured) + def get_info(ctx) -> str: + pass + + result = get_info(mock_workflow_context) + # convert_result should extract the content from LLMChatResponse + assert result == "Structured response" + + +def test_llm_activity_structured_mode(mock_llm_client, mock_workflow_context): + """Test llm_activity with different structured modes.""" + + @llm_activity( + prompt="Get data", llm=mock_llm_client, structured_mode="function_call" + ) + def get_data(ctx) -> str: + pass + + assert get_data._llm_activity_config["structured_mode"] == "function_call" + + +def test_llm_activity_preserves_function_metadata(mock_llm_client): + """Test that llm_activity preserves function name and docstring.""" + + @llm_activity(prompt="Test", llm=mock_llm_client) + def my_function(ctx, param: str) -> str: + """This is a docstring.""" + pass + + assert my_function.__name__ == "my_function" + assert my_function.__doc__ == "This is a docstring." + + +# Tests for agent_activity decorator + + +def test_agent_activity_requires_agent(): + """Test that agent_activity raises ValueError when agent is None.""" + with pytest.raises(ValueError, match="@agent_activity requires an AgentBase"): + agent_activity(agent=None) + + +def test_agent_activity_decorator_basic(mock_agent, mock_workflow_context): + """Test basic agent_activity decoration and execution.""" + + @agent_activity(agent=mock_agent) + def run_task(ctx, task: str) -> str: + pass + + # Check wrapper metadata + assert hasattr(run_task, "_is_agent_activity") + assert run_task._is_agent_activity is True + assert hasattr(run_task, "_agent_activity_config") + + # Execute the decorated function + result = run_task(mock_workflow_context, payload={"task": "analyze data"}) + + # Verify the result + assert result == "Agent response" + mock_agent.run.assert_called_once() + + +def test_agent_activity_with_prompt(mock_agent, mock_workflow_context): + """Test agent_activity with custom prompt template.""" + + @agent_activity(agent=mock_agent, prompt="Analyze {data} and provide insights") + def analyze(ctx, data: str) -> str: + pass + + result = analyze(mock_workflow_context, payload={"data": "sales numbers"}) + assert result == "Agent response" + + # Verify the agent was called with formatted prompt + call_args = mock_agent.run.call_args[0][0] + assert "sales numbers" in call_args + + +def test_agent_activity_without_prompt(mock_agent, mock_workflow_context): + """Test agent_activity without prompt (uses format_agent_input).""" + + @agent_activity(agent=mock_agent) + def process(ctx, input_data: str) -> str: + pass + + result = process(mock_workflow_context, payload={"input_data": "test data"}) + assert result == "Agent response" + mock_agent.run.assert_called_once() + + +def test_agent_activity_multiple_params(mock_agent, mock_workflow_context): + """Test agent_activity with multiple parameters.""" + + @agent_activity(agent=mock_agent, prompt="Compare {item1} and {item2}") + def compare(ctx, item1: str, item2: str) -> str: + pass + + result = compare( + mock_workflow_context, payload={"item1": "apple", "item2": "orange"} + ) + assert result == "Agent response" + + +def test_agent_activity_preserves_function_metadata(mock_agent): + """Test that agent_activity preserves function name and docstring.""" + + @agent_activity(agent=mock_agent) + def agent_function(ctx, task: str) -> str: + """Agent function docstring.""" + pass + + assert agent_function.__name__ == "agent_function" + assert agent_function.__doc__ == "Agent function docstring." + + +# Integration tests + + +def test_llm_activity_with_pydantic_return_type(mock_llm_client, mock_workflow_context): + """Test llm_activity with Pydantic model return type annotation.""" + # Mock the LLM to return a dict that matches Person schema + mock_llm_client.generate = MagicMock(return_value='{"name": "John Doe", "age": 30}') + + @llm_activity(prompt="Get person info for {person_id}", llm=mock_llm_client) + def get_person(ctx, person_id: str) -> Person: + pass + + with patch( + "dapr_agents.workflow.decorators.activities.validate_result" + ) as mock_validate: + # Mock validate_result to return a Person instance + mock_validate.return_value = AsyncMock( + return_value=Person(name="John Doe", age=30) + )() + + _ = get_person(mock_workflow_context, payload={"person_id": "123"}) + + # Verify the decorator called the LLM correctly + assert mock_llm_client.generate.called + + +def test_llm_activity_ctx_parameter_stripped(mock_llm_client, mock_workflow_context): + """Test that ctx parameter is properly stripped from signature processing.""" + + @llm_activity(prompt="Process {data}", llm=mock_llm_client) + def process(ctx, data: str) -> str: + pass + + # The decorator should handle ctx internally and not include it in prompt formatting + result = process(mock_workflow_context, payload={"data": "test"}) + assert result == "Test response from LLM" + + # Verify that the LLM was called with proper parameters (not including ctx) + call_args = mock_llm_client.generate.call_args + assert call_args is not None + + +def test_agent_activity_ctx_parameter_stripped(mock_agent, mock_workflow_context): + """Test that ctx parameter is properly stripped from signature processing.""" + + @agent_activity(agent=mock_agent, prompt="Process {input_val}") + def process(ctx, input_val: str) -> str: + pass + + result = process(mock_workflow_context, payload={"input_val": "data"}) + assert result == "Agent response" + + +def test_llm_activity_scalar_input(mock_llm_client, mock_workflow_context): + """Test llm_activity with scalar (non-dict) input.""" + + @llm_activity(prompt="Analyze {text}", llm=mock_llm_client) + def analyze(ctx, text: str) -> str: + pass + + # Pass a scalar string instead of a dict + result = analyze(mock_workflow_context, "Simple text input") + assert result == "Test response from LLM" + + +def test_agent_activity_scalar_input(mock_agent, mock_workflow_context): + """Test agent_activity with scalar (non-dict) input.""" + + @agent_activity(agent=mock_agent) + def process(ctx, data: str) -> str: + pass + + result = process(mock_workflow_context, "scalar input") + assert result == "Agent response" + + +def test_llm_activity_with_task_kwargs(mock_llm_client, mock_workflow_context): + """Test llm_activity with additional task_kwargs.""" + + @llm_activity( + prompt="Test", + llm=mock_llm_client, + custom_param="custom_value", + another_param=123, + ) + def test_func(ctx) -> str: + pass + + assert ( + test_func._llm_activity_config["task_kwargs"]["custom_param"] == "custom_value" + ) + assert test_func._llm_activity_config["task_kwargs"]["another_param"] == 123 + + +def test_agent_activity_with_task_kwargs(mock_agent, mock_workflow_context): + """Test agent_activity with additional task_kwargs.""" + + @agent_activity(agent=mock_agent, custom_setting="value", timeout=300) + def test_func(ctx) -> str: + pass + + assert test_func._agent_activity_config["task_kwargs"]["custom_setting"] == "value" + assert test_func._agent_activity_config["task_kwargs"]["timeout"] == 300 + + +# Error handling tests + + +def test_llm_activity_non_callable_raises_error(mock_llm_client): + """Test that decorating a non-callable raises ValueError.""" + decorator = llm_activity(prompt="Test", llm=mock_llm_client) + + with pytest.raises(ValueError, match="must decorate a callable activity"): + decorator("not a function") + + +def test_agent_activity_non_callable_raises_error(mock_agent): + """Test that decorating a non-callable raises ValueError.""" + decorator = agent_activity(agent=mock_agent) + + with pytest.raises(ValueError, match="must decorate a callable activity"): + decorator("not a function") + + +def test_llm_activity_missing_context_raises_error(mock_llm_client): + """Test that calling without context raises ValueError.""" + + @llm_activity(prompt="Test", llm=mock_llm_client) + def test_func(ctx) -> str: + pass + + # Call without any context - should raise error from extract_ctx_and_payload + with pytest.raises(ValueError, match="Workflow context is required"): + test_func() + + +def test_agent_activity_missing_context_raises_error(mock_agent): + """Test that calling without context raises ValueError.""" + + @agent_activity(agent=mock_agent) + def test_func(ctx) -> str: + pass + + with pytest.raises(ValueError, match="Workflow context is required"): + test_func()