diff --git a/packages/sample-app/sample_app/experiment/made_by_traceloop/travel_agent_exp.py b/packages/sample-app/sample_app/experiment/made_by_traceloop/travel_agent_exp.py new file mode 100644 index 0000000000..2aa25dfb8a --- /dev/null +++ b/packages/sample-app/sample_app/experiment/made_by_traceloop/travel_agent_exp.py @@ -0,0 +1,143 @@ +""" +Travel Agent Evaluators Experiment + +This example demonstrates running the travel agent and collecting prompt/completion +trajectory through the run for evaluation with Traceloop's agent evaluators: +- Agent Goal Accuracy: Validates agent goal achievement +- Agent Tool Error Detector: Detects errors or failures during tool execution +- Agent Flow Quality: Validates agent trajectories against user-defined natural language tests +- Agent Efficiency: Evaluates agent efficiency by checking for redundant calls and optimal paths +- Agent Goal Completeness: Measures whether the agent successfully accomplished all user goals + +The key feature is extracting the full prompt/completion trajectory from OpenTelemetry spans +for detailed analysis and evaluation. +""" + +import asyncio +import sys +from pathlib import Path + +from traceloop.sdk import Traceloop +from traceloop.sdk.evaluator import EvaluatorMadeByTraceloop +from traceloop.sdk.experiment.utils import run_with_span_capture + +# Add the agents directory to sys.path for imports +agents_dir = Path(__file__).parent.parent.parent / "agents" +if str(agents_dir) not in sys.path: + sys.path.insert(0, str(agents_dir)) + +from travel_agent_example import run_travel_query # noqa: E402 + +# Initialize Traceloop client (will be reinitialized per task with in-memory exporter) +client = Traceloop.init() + + +async def travel_agent_task(row): + """ + Unified task function for travel agent evaluators. + + This task: + 1. Initializes Traceloop with InMemorySpanExporter + 2. Runs the travel agent with the query from the dataset + 3. Captures all OpenTelemetry spans + 4. Extracts prompt/completion trajectory from spans + 5. Returns data in format compatible with agent evaluators + + Required fields for agent evaluators: + - question (or prompt): The input question or goal + - completion (or answer, response, text): The agent's final response + - trajectory_prompts (or prompts): The agent's prompt trajectory + - trajectory_completions (or completions): The agent's completion trajectory + - tool_calls: List of tools called during execution + """ + # Get query from row + query = row.get("query", "Plan a 5-day trip to Paris") + + # Run the travel agent with span capture + trajectory_prompts, trajectory_completions, final_completion = await run_with_span_capture( + run_travel_query, # This is the function that calls the Agent + query # This is the agents input + ) + + return { + "prompt": query, + "answer": final_completion if final_completion else query, + "context": f"The agent should create a complete travel itinerary for: {query}", + "trajectory_prompts": trajectory_prompts, + "trajectory_completions": trajectory_completions, + } + + +async def run_travel_agent_experiment(): + """ + Run experiment with travel agent and all 5 agent evaluators. + + This experiment will evaluate the travel agent's performance across: + 1. Agent Goal Accuracy - Did the agent achieve the stated goal? + 2. Agent Tool Error Detector - Were there any tool execution errors? + 3. Agent Flow Quality - Did the agent follow the expected trajectory? + 4. Agent Efficiency - Was the agent efficient (no redundant calls)? + 5. Agent Goal Completeness - Did the agent fully accomplish all goals? + """ + + print("\n" + "="*80) + print("TRAVEL AGENT EVALUATORS EXPERIMENT") + print("="*80 + "\n") + + print("This experiment will test the travel agent with five agent-specific evaluators:\n") + print("1. Agent Goal Accuracy - Validates goal achievement") + print("2. Agent Tool Error Detector - Detects tool execution errors") + print("3. Agent Flow Quality - Validates expected trajectories") + print("4. Agent Efficiency - Checks for optimal execution paths") + print("5. Agent Goal Completeness - Measures full goal accomplishment") + print("\n" + "-"*80 + "\n") + + # Configure agent evaluators + evaluators = [ + EvaluatorMadeByTraceloop.agent_goal_accuracy(), + EvaluatorMadeByTraceloop.agent_flow_quality( + threshold=0.7, + conditions=["create_itinerary tool should be called last"], + ), + EvaluatorMadeByTraceloop.agent_efficiency(), + EvaluatorMadeByTraceloop.agent_goal_completeness(), + ] + + print("Running experiment with evaluators:") + for evaluator in evaluators: + print(f" - {evaluator.slug}") + + print("\n" + "-"*80 + "\n") + + # Run the experiment + # Note: You'll need to create a dataset with travel queries in the Traceloop platform + results, errors = await client.experiment.run( + dataset_slug="travel-queries", # Dataset slug that should exist in traceloop platform + dataset_version="v1", + task=travel_agent_task, + evaluators=evaluators, + experiment_slug="travel-agent-exp", + stop_on_error=False, + wait_for_results=True, + ) + + print("\n" + "="*80) + print("Travel agent evaluators experiment completed!") + print("="*80 + "\n") + + print("Results summary:") + print(f" - Total rows processed: {len(results) if results else 0}") + print(f" - Errors encountered: {len(errors) if errors else 0}") + + if errors: + print("\nErrors:") + for error in errors: + print(f" - {error}") + +if __name__ == "__main__": + print("\nTravel Agent Evaluators Experiment\n") + print("This experiment captures the full prompt/completion trajectory") + print("from the travel agent's execution and evaluates it against") + print("Traceloop's agent evaluators.\n") + + asyncio.run(run_travel_agent_experiment()) diff --git a/packages/traceloop-sdk/pyproject.toml b/packages/traceloop-sdk/pyproject.toml index db7aabd906..80abfeb29c 100644 --- a/packages/traceloop-sdk/pyproject.toml +++ b/packages/traceloop-sdk/pyproject.toml @@ -121,6 +121,7 @@ exclude = [ "traceloop/sdk/tracing", "traceloop/sdk/utils", "traceloop/sdk/__init__.py", + "traceloop/sdk/experiment/utils.py", "tests/", ] diff --git a/packages/traceloop-sdk/traceloop/sdk/evaluator/evaluators_made_by_traceloop.py b/packages/traceloop-sdk/traceloop/sdk/evaluator/evaluators_made_by_traceloop.py index 2b52bd4c21..8887bf86be 100644 --- a/packages/traceloop-sdk/traceloop/sdk/evaluator/evaluators_made_by_traceloop.py +++ b/packages/traceloop-sdk/traceloop/sdk/evaluator/evaluators_made_by_traceloop.py @@ -1,4 +1,4 @@ -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, List from .config import EvaluatorDetails @@ -567,6 +567,8 @@ def agent_tool_error_detector( @staticmethod def agent_flow_quality( + threshold: float = 0.5, + conditions: List[str] = [], ) -> EvaluatorDetails: """ Agent flow quality evaluator - validates agent trajectories against user-defined natural language tests. @@ -574,11 +576,17 @@ def agent_flow_quality( Required task output fields: - trajectory_prompts: The prompts extracted from the span attributes (llm.prompts.*) - trajectory_completions: The completions extracted from the span attributes (llm.completions.*) + Args: + threshold: Minimum threshold for detecting tool errors (0.0-1.0) + conditions: List of conditions in natural language to evaluate the agent flow quality against Returns: EvaluatorDetails configured for agent flow quality evaluation """ - config: Dict[str, Any] = {} + config: Dict[str, Any] = { + "threshold": threshold, + "conditions": conditions, + } return EvaluatorDetails( slug="agent-flow-quality", diff --git a/packages/traceloop-sdk/traceloop/sdk/experiment/utils.py b/packages/traceloop-sdk/traceloop/sdk/experiment/utils.py new file mode 100644 index 0000000000..1476daf6f0 --- /dev/null +++ b/packages/traceloop-sdk/traceloop/sdk/experiment/utils.py @@ -0,0 +1,164 @@ +""" +Shared utilities for running experiments with OpenTelemetry span capture. +""" + +import json +from traceloop.sdk import Traceloop +from traceloop.sdk.utils.in_memory_span_exporter import InMemorySpanExporter +from traceloop.sdk.tracing.tracing import TracerWrapper + + +def extract_trajectory_from_spans(spans): + """ + Extract prompt and completion trajectory from OpenTelemetry spans. + Converts gen_ai.prompt.* to llm.prompts.* format expected by evaluators. + + Args: + spans: List of ReadableSpan objects from InMemorySpanExporter + + Returns: + dict with trajectory_prompts, trajectory_completions, and tool_calls + """ + # Collect all gen_ai attributes and convert to llm.prompts/completions format + trajectory_prompts_dict = {} + trajectory_completions_dict = {} + tool_calls = [] + tool_inputs = [] + tool_outputs = [] + + for span in spans: + if not hasattr(span, 'attributes'): + continue + + attributes = span.attributes or {} + + for key, value in attributes.items(): + if key.startswith("gen_ai.prompt."): + trajectory_prompts_dict[key] = value + elif key.startswith("gen_ai.completion."): + trajectory_completions_dict[key] = value + + # Extract tool calls for summary + if "gen_ai.tool.name" in attributes: + tool_name = attributes["gen_ai.tool.name"] + if tool_name: + tool_calls.append(tool_name) + + # Extract tool input + tool_input = attributes.get("gen_ai.completion.tool.arguments", "") + if not tool_input: + tool_input = attributes.get("gen_ai.tool.input", "") + tool_inputs.append(tool_input) + + # Extract tool output + tool_output = attributes.get("gen_ai.tool.output", "") + if not tool_output: + tool_output = attributes.get("gen_ai.completion.tool.result", "") + tool_outputs.append(tool_output) + + return { + "trajectory_prompts": trajectory_prompts_dict, + "trajectory_completions": trajectory_completions_dict, + "tool_calls": tool_calls, + "tool_inputs": tool_inputs, + "tool_outputs": tool_outputs + } + + +async def run_with_span_capture(task_callable, *args, **kwargs): + """ + Run a task with OpenTelemetry span capture and extract trajectory data. + + This function: + 1. Initializes Traceloop with InMemorySpanExporter + 2. Runs the provided async task callable + 3. Captures all OpenTelemetry spans + 4. Extracts prompt/completion trajectory from spans + 5. Returns trajectory data in JSON format + + Args: + task_callable: Async callable to execute (e.g., run_travel_query) + *args: Positional arguments to pass to the task callable + **kwargs: Keyword arguments to pass to the task callable + + Returns: + Tuple of (trajectory_prompts, trajectory_completions, final_completion) + - trajectory_prompts: JSON string of prompt trajectory + - trajectory_completions: JSON string of completion trajectory + - final_completion: The final completion content string + """ + # Clear singleton if existed to reinitialize with in-memory exporter + if hasattr(TracerWrapper, "instance"): + del TracerWrapper.instance + + # Create in-memory exporter to capture spans + exporter = InMemorySpanExporter() + + # Initialize Traceloop with in-memory exporter + Traceloop.init( + app_name="internal-experiment-exporter", + disable_batch=True, + exporter=exporter, + ) + + try: + # Run the task callable + print(f"\n{'='*80}") + print(f"Running task: {task_callable.__name__}") + print(f"{'='*80}\n") + + tool_calls_made = await task_callable(*args, **kwargs) + + # Get all captured spans + spans = exporter.get_finished_spans() + + print(f"\n{'='*80}") + print(f"Captured {len(spans)} spans from execution") + print(f"{'='*80}\n") + + # Extract trajectory from spans + trajectory_data = extract_trajectory_from_spans(spans) + + # Get the final completion from llm.completions dict + completions_dict = trajectory_data["trajectory_completions"] + final_completion = "" + if completions_dict: + # Find the highest index completion content + max_idx = -1 + for key in completions_dict.keys(): + if ".content" in key: + try: + parts = key.split(".") + idx = int(parts[2]) + if idx > max_idx: + max_idx = idx + final_completion = completions_dict[key] + except (ValueError, IndexError): + pass + + # trajectory_prompts and trajectory_completions are dicts with llm.prompts/completions.* keys + # If empty, use JSON string fallback to avoid validation errors + trajectory_prompts = trajectory_data["trajectory_prompts"] + trajectory_completions = trajectory_data["trajectory_completions"] + + # Convert to JSON strings if empty (evaluators expect string when no data) + if not trajectory_prompts: + trajectory_prompts = json.dumps([]) + if not trajectory_completions: + trajectory_completions = json.dumps([]) + + print("📊 Trajectory Summary:") + print(f" - Prompt attributes captured: {len(trajectory_prompts)}") + print(f" - Completion attributes captured: {len(trajectory_completions)}") + tools_called = ', '.join(trajectory_data['tool_calls']) if trajectory_data['tool_calls'] else 'None' + print(f" - Tools called: {tools_called}") + if tool_calls_made: + print(f" - Tools from run: {', '.join(tool_calls_made) if tool_calls_made else 'None'}\n") + + json_trajectory_prompts = json.dumps(trajectory_prompts) + json_trajectory_completions = json.dumps(trajectory_completions) + + return json_trajectory_prompts, json_trajectory_completions, final_completion + + except Exception as e: + raise e