diff --git a/packages/sample-app/sample_app/agents/travel_agent_example.py b/packages/sample-app/sample_app/agents/travel_agent_example.py new file mode 100644 index 0000000000..df3c8f8688 --- /dev/null +++ b/packages/sample-app/sample_app/agents/travel_agent_example.py @@ -0,0 +1,922 @@ +import asyncio +import random +import argparse +import time +from typing import Dict, List +from dataclasses import dataclass +from pydantic import BaseModel +import requests +from dotenv import load_dotenv +from traceloop.sdk import Traceloop + +from agents import Agent, function_tool, RunContextWrapper, Runner, ToolCallOutputItem +from openai.types.responses import ( + ResponseTextDeltaEvent, + ResponseOutputItemAddedEvent, + ResponseFunctionToolCall, + ResponseOutputText, + ResponseOutputRefusal, +) + +load_dotenv() + +Traceloop.init( + app_name="travel-agent-demo", + disable_batch=False, +) + + +class CountryInfo(BaseModel): + name: str + capital: str + region: str + subregion: str + population: int + currencies: List[str] + languages: List[str] + timezones: List[str] + + +class DestinationSearchResponse(BaseModel): + status: str + message: str + countries: List[CountryInfo] | None = None + count: int = 0 + + +class DailyForecast(BaseModel): + date: str + temp_max: float + temp_min: float + precipitation: float + + +class WeatherForecast(BaseModel): + location: str + latitude: float + longitude: float + current_temperature: float + current_conditions: str + forecast_days: List[DailyForecast] + + +class WeatherResponse(BaseModel): + status: str + message: str + forecast: WeatherForecast | None = None + + +class LocationCoordinates(BaseModel): + location_name: str + latitude: float + longitude: float + country: str + display_name: str + + +class CoordinatesResponse(BaseModel): + status: str + message: str + coordinates: LocationCoordinates | None = None + + +class DestinationInfo(BaseModel): + title: str + summary: str + extract: str + + +class DestinationInfoResponse(BaseModel): + status: str + message: str + info: DestinationInfo | None = None + + +class TravelDistance(BaseModel): + from_location: str + to_location: str + distance_km: float + flight_time_hours: float + + +class DistanceResponse(BaseModel): + status: str + message: str + distance_info: TravelDistance | None = None + + +class DayActivity(BaseModel): + time: str + activity: str + location: str + notes: str + + +class DayPlan(BaseModel): + day_number: int + date: str + title: str + activities: List[DayActivity] + meals: List[str] + accommodation: str + + +class TravelItinerary(BaseModel): + trip_title: str + destination: str + duration_days: int + total_budget_estimate: str + daily_plans: List[DayPlan] + travel_tips: List[str] + packing_suggestions: List[str] + + +class ItineraryResponse(BaseModel): + status: str + message: str + itinerary: TravelItinerary | None = None + + +@dataclass +class TravelContext: + """Context for the travel agent application.""" + conversation_history: List[Dict[str, str]] = None + + def __post_init__(self): + if self.conversation_history is None: + self.conversation_history = [] + + +@function_tool +async def search_destinations( + cw: RunContextWrapper[TravelContext], + region: str = "", + subregion: str = "" +) -> DestinationSearchResponse: + """ + Search for travel destinations by region or subregion using REST Countries API. + + Args: + region: Region to search (e.g., "Europe", "Asia", "Americas") + subregion: Subregion to search (e.g., "Southern Europe", "Southeast Asia") + + Returns: + List of countries matching the search criteria + """ + print(f"Searching destinations for region: '{region}', subregion: '{subregion}'") + + try: + # Add small delay to respect rate limits + await asyncio.sleep(0.5) + + if region: + url = f"https://restcountries.com/v3.1/region/{region}" + else: + url = "https://restcountries.com/v3.1/all" + + response = requests.get(url, timeout=10) + response.raise_for_status() + + countries_data = response.json() + + # Filter by subregion if provided + if subregion: + countries_data = [c for c in countries_data + if c.get("subregion", "").lower() == subregion.lower()] + + # Limit to 10 countries to avoid too much data + countries_data = countries_data[:10] + + countries = [] + for country in countries_data: + country_info = CountryInfo( + name=country.get("name", {}).get("common", "Unknown"), + capital=", ".join(country.get("capital", ["Unknown"])), + region=country.get("region", "Unknown"), + subregion=country.get("subregion", "Unknown"), + population=country.get("population", 0), + currencies=list(country.get("currencies", {}).keys()), + languages=list(country.get("languages", {}).values()), + timezones=country.get("timezones", []) + ) + countries.append(country_info) + + return DestinationSearchResponse( + status="success", + message=f"Found {len(countries)} destinations in {region or 'all regions'}", + countries=countries, + count=len(countries) + ) + + except requests.RequestException as e: + print(f"Error searching destinations: {str(e)}") + return DestinationSearchResponse( + status="error", + message=f"Failed to search destinations: {str(e)}" + ) + + +@function_tool +async def get_weather_forecast( + cw: RunContextWrapper[TravelContext], + location_name: str, + latitude: float, + longitude: float +) -> WeatherResponse: + """ + Get current weather and 7-day forecast using Open-Meteo API (no API key required). + + Args: + location_name: Name of the location + latitude: Latitude coordinate + longitude: Longitude coordinate + + Returns: + Weather forecast information + """ + print(f"Getting weather forecast for {location_name} ({latitude}, {longitude})") + + try: + # Add small delay to respect rate limits + await asyncio.sleep(0.5) + + url = "https://api.open-meteo.com/v1/forecast" + params = { + "latitude": latitude, + "longitude": longitude, + "current": "temperature_2m,relative_humidity_2m,wind_speed_10m,weather_code", + "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum", + "timezone": "auto", + "forecast_days": 7 + } + + response = requests.get(url, params=params, timeout=10) + response.raise_for_status() + + data = response.json() + current = data.get("current", {}) + daily = data.get("daily", {}) + + # Map weather codes to conditions (simplified) + weather_code = current.get("weather_code", 0) + conditions = { + 0: "Clear sky", + 1: "Mainly clear", + 2: "Partly cloudy", + 3: "Overcast", + 45: "Foggy", + 61: "Light rain", + 63: "Moderate rain", + 65: "Heavy rain", + 71: "Light snow", + 95: "Thunderstorm" + } + current_conditions = conditions.get(weather_code, "Unknown") + + forecast_days = [] + times = daily.get("time", []) + temp_max = daily.get("temperature_2m_max", []) + temp_min = daily.get("temperature_2m_min", []) + precip = daily.get("precipitation_sum", []) + + for i in range(min(len(times), len(temp_max), len(temp_min), len(precip))): + daily_forecast = DailyForecast( + date=times[i] if i < len(times) else "", + temp_max=float(temp_max[i]) if i < len(temp_max) and temp_max[i] is not None else 0.0, + temp_min=float(temp_min[i]) if i < len(temp_min) and temp_min[i] is not None else 0.0, + precipitation=float(precip[i]) if i < len(precip) and precip[i] is not None else 0.0 + ) + forecast_days.append(daily_forecast) + + forecast = WeatherForecast( + location=location_name, + latitude=float(latitude), + longitude=float(longitude), + current_temperature=float( + current.get("temperature_2m", 0.0) + ) if current.get("temperature_2m") is not None else 0.0, + current_conditions=current_conditions, + forecast_days=forecast_days + ) + + return WeatherResponse( + status="success", + message=f"Weather forecast retrieved for {location_name}", + forecast=forecast + ) + + except requests.RequestException as e: + print(f"Error getting weather forecast (RequestException): {str(e)}") + return WeatherResponse( + status="error", + message=f"Failed to get weather forecast: {str(e)}" + ) + except Exception as e: + print(f"Error getting weather forecast (Exception): {type(e).__name__}: {str(e)}") + import traceback + traceback.print_exc() + return WeatherResponse( + status="error", + message=f"Failed to get weather forecast: {type(e).__name__}: {str(e)}" + ) + + +@function_tool +async def get_location_coordinates( + cw: RunContextWrapper[TravelContext], + location_name: str +) -> CoordinatesResponse: + """ + Get coordinates for a location using Nominatim (OpenStreetMap) API. + + Args: + location_name: Name of the city or location + + Returns: + Latitude and longitude coordinates + """ + print(f"Getting coordinates for location: {location_name}") + + try: + # Add small delay to respect rate limits (Nominatim requires max 1 req/sec) + await asyncio.sleep(1.1) + + url = "https://nominatim.openstreetmap.org/search" + params = { + "q": location_name, + "format": "json", + "limit": 1 + } + headers = { + "User-Agent": "TravelAgentDemo/1.0 (OpenTelemetry Sample App)" + } + + response = requests.get(url, params=params, headers=headers, timeout=10) + response.raise_for_status() + + data = response.json() + + if not data: + return CoordinatesResponse( + status="error", + message=f"Location not found: {location_name}" + ) + + location_data = data[0] + coordinates = LocationCoordinates( + location_name=location_name, + latitude=float(location_data.get("lat", 0.0)), + longitude=float(location_data.get("lon", 0.0)), + country=location_data.get("display_name", "").split(",")[-1].strip(), + display_name=location_data.get("display_name", "") + ) + + return CoordinatesResponse( + status="success", + message=f"Coordinates found for {location_name}", + coordinates=coordinates + ) + + except requests.RequestException as e: + print(f"Error getting coordinates: {str(e)}") + return CoordinatesResponse( + status="error", + message=f"Failed to get coordinates: {str(e)}" + ) + + +@function_tool +async def get_destination_info( + cw: RunContextWrapper[TravelContext], + destination_name: str +) -> DestinationInfoResponse: + """ + Get information about a destination from Wikipedia API. + + Args: + destination_name: Name of the destination + + Returns: + Summary and information about the destination + """ + print(f"Getting destination info for: {destination_name}") + + try: + # Add small delay to respect rate limits + await asyncio.sleep(0.5) + + url = "https://en.wikipedia.org/api/rest_v1/page/summary/" + destination_name + headers = { + "User-Agent": "TravelAgentDemo/1.0 (OpenTelemetry Sample App)" + } + + response = requests.get(url, headers=headers, timeout=10) + response.raise_for_status() + + data = response.json() + + info = DestinationInfo( + title=data.get("title", destination_name), + summary=data.get("description", "No description available"), + extract=data.get("extract", "No information available") + ) + + return DestinationInfoResponse( + status="success", + message=f"Retrieved information for {destination_name}", + info=info + ) + + except requests.RequestException as e: + print(f"Error getting destination info: {str(e)}") + return DestinationInfoResponse( + status="error", + message=f"Failed to get destination info: {str(e)}" + ) + + +@function_tool +async def calculate_travel_distance( + cw: RunContextWrapper[TravelContext], + from_location: str, + to_location: str, + from_lat: float, + from_lon: float, + to_lat: float, + to_lon: float +) -> DistanceResponse: + """ + Calculate distance and estimated flight time between two locations. + Uses Haversine formula for distance calculation. + + Args: + from_location: Starting location name + to_location: Destination location name + from_lat: Starting latitude + from_lon: Starting longitude + to_lat: Destination latitude + to_lon: Destination longitude + + Returns: + Distance and flight time information + """ + print(f"Calculating distance from {from_location} to {to_location}") + + try: + import math + + # Haversine formula + R = 6371 # Earth's radius in kilometers + + lat1, lon1 = math.radians(from_lat), math.radians(from_lon) + lat2, lon2 = math.radians(to_lat), math.radians(to_lon) + + dlat = lat2 - lat1 + dlon = lon2 - lon1 + + a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2 + c = 2 * math.asin(math.sqrt(a)) + distance_km = R * c + + # Estimate flight time (average speed ~800 km/h including takeoff/landing) + flight_time_hours = distance_km / 800.0 + + distance_info = TravelDistance( + from_location=from_location, + to_location=to_location, + distance_km=round(distance_km, 2), + flight_time_hours=round(flight_time_hours, 2) + ) + + return DistanceResponse( + status="success", + message=f"Distance calculated: {distance_km:.2f} km, ~{flight_time_hours:.2f} hours flight", + distance_info=distance_info + ) + + except Exception as e: + print(f"Error calculating distance: {str(e)}") + return DistanceResponse( + status="error", + message=f"Failed to calculate distance: {str(e)}" + ) + + +@function_tool +async def create_itinerary( + cw: RunContextWrapper[TravelContext], + destination: str, + duration_days: int, + budget: str, + interests: str, + weather_info: str = "", + destination_details: str = "" +) -> ItineraryResponse: + """ + Create a detailed day-by-day travel itinerary using AI. + + Args: + destination: Main destination for the trip + duration_days: Number of days for the trip + budget: Budget level (budget, moderate, luxury) + interests: Traveler interests (e.g., food, history, nature) + weather_info: Weather forecast information (optional) + destination_details: Additional destination details (optional) + + Returns: + Detailed travel itinerary + """ + print(f"Creating {duration_days}-day itinerary for {destination} ({budget} budget)") + + try: + import openai + + # Add small delay + await asyncio.sleep(0.5) + + client = openai.OpenAI() + + itinerary_prompt = f""" +You are an expert travel planner. Create a detailed {duration_days}-day itinerary for {destination}. + +Trip Details: +- Destination: {destination} +- Duration: {duration_days} days +- Budget: {budget} +- Interests: {interests} +{f"- Weather: {weather_info}" if weather_info else ""} +{f"- Destination Info: {destination_details}" if destination_details else ""} + +Create a comprehensive itinerary that includes: +1. A catchy trip title +2. Day-by-day plans with specific activities and timings +3. Meal recommendations for each day +4. Accommodation suggestions +5. Travel tips specific to this destination +6. Packing suggestions based on the weather and activities + +Make the itinerary practical, engaging, and tailored to the {budget} budget level and {interests} interests. +Each day should have 3-5 activities with specific times, locations, and helpful notes. +""" + + response = client.beta.chat.completions.parse( + model="gpt-4o-mini", + messages=[ + { + "role": "system", + "content": "You are an expert travel planner who creates detailed, practical itineraries." + }, + {"role": "user", "content": itinerary_prompt} + ], + temperature=0.7, + max_tokens=3000, + response_format=TravelItinerary + ) + + itinerary = response.choices[0].message.parsed + + return ItineraryResponse( + status="success", + message=f"Created {duration_days}-day itinerary for {destination}", + itinerary=itinerary + ) + + except Exception as e: + print(f"Error creating itinerary: {type(e).__name__}: {str(e)}") + import traceback + traceback.print_exc() + return ItineraryResponse( + status="error", + message=f"Failed to create itinerary: {str(e)}" + ) + + +class TravelPlannerAgent(Agent[TravelContext]): + """Specialized agent for travel planning with 6 tools, always creating itineraries.""" + + def __init__(self, model: str = "gpt-4o"): + super().__init__( + name="Travel Planner Agent", + instructions=""" + You are an expert travel planning assistant. Your PRIMARY GOAL is to ALWAYS create a detailed + travel itinerary for the user, no matter how broad or specific their request is. + + Your workflow: + 1. Gather information based on the user's request (use research tools as needed) + 2. Make reasonable assumptions for missing details (budget, duration, interests) + 3. ALWAYS end by creating a complete itinerary using the create_itinerary tool + + Your 6 tools: + 1. search_destinations - Find destinations by region + 2. get_location_coordinates - Get lat/long for locations + 3. get_weather_forecast - Check weather forecasts + 4. get_destination_info - Get details about places + 5. calculate_travel_distance - Calculate distances between locations + 6. create_itinerary - CREATE THE FINAL ITINERARY (REQUIRED!) + + Response patterns based on request specificity: + + SPECIFIC REQUESTS (destination, duration, budget mentioned): + - Gather targeted information (weather, details) + - Immediately create itinerary + + BROAD REQUESTS (just region or vague preferences): + - Search for destinations in the region + - Pick 1-2 promising destinations + - Get coordinates and weather + - Make reasonable assumptions for duration (default 5-7 days) and budget (default moderate) + - Create itinerary with your recommendations + + VERY VAGUE REQUESTS (no clear destination): + - Search popular destinations + - Recommend based on weather/season + - Assume moderate budget, 5-7 days + - Create itinerary + + CRITICAL: Every response must end with a complete itinerary. Never skip the create_itinerary step. + If information is missing, make sensible assumptions and explain them in the itinerary. + + When creating itineraries, use information from your research tools to make them relevant and practical. + """, + model=model, + tools=[ + search_destinations, + get_location_coordinates, + get_weather_forecast, + get_destination_info, + calculate_travel_distance, + create_itinerary + ], + ) + + +async def handle_runner_stream(runner: "Runner"): + """Process runner events and display output.""" + + tool_calls_made = [] + + async for event in runner.stream_events(): + if event.type == "raw_response_event": + if isinstance(event.data, ResponseTextDeltaEvent): + print(event.data.delta, end="", flush=True) + elif isinstance(event.data, ResponseOutputItemAddedEvent): + if isinstance(event.data.item, ResponseFunctionToolCall): + tool_name = event.data.item.name + tool_calls_made.append(tool_name) + print(f"\n[Calling tool: {tool_name}]\n") + elif event.type == "run_item_stream_event": + if event.name == "tool_output" and isinstance( + event.item, ToolCallOutputItem + ): + raw_item = event.item.raw_item + content = ( + raw_item.get("content") + if isinstance(raw_item, dict) + else getattr(raw_item, "content", "") + ) + if content: + print(f"\n[Tool output: {content[:200]}...]\n", end="", flush=True) + + elif event.name == "message_output_created": + raw_item = event.item.raw_item + role = getattr(raw_item, "role", None) + if role is None and isinstance(raw_item, dict): + role = raw_item.get("role") + + if role == "assistant": + content_parts = [] + for part in getattr(raw_item, "content", []): + if isinstance(part, ResponseOutputText): + content_parts.append(part.text) + elif isinstance(part, ResponseOutputRefusal): + content_parts.append(part.refusal) + if content_parts: + print("".join(content_parts), end="", flush=True) + + print() + return tool_calls_made + + +async def run_travel_query(query: str): + """Run a single travel planning query.""" + + print("=" * 80) + print(f"Query: {query}") + print("=" * 80) + + travel_agent = TravelPlannerAgent() + + print("\nAgent Response: ", end="", flush=True) + + messages = [{"role": "user", "content": query}] + runner = Runner().run_streamed(starting_agent=travel_agent, input=messages) + tool_calls = await handle_runner_stream(runner) + + print(f"\n{'='*80}") + print(f"✅ Query completed! Tools used: {', '.join(tool_calls) if tool_calls else 'None'}") + print(f"{'='*80}\n") + + return tool_calls + + +def generate_travel_queries(n: int = 10) -> List[str]: + """Generate diverse travel planning queries with varying specificity, all leading to itinerary creation.""" + + # Mix of SPECIFIC, BROAD, and VAGUE templates that all result in itinerary creation + templates = [ + # === VERY SPECIFIC REQUESTS (most details provided) === + # Agent should: get weather + destination info + create itinerary (3-4 tools) + ("Plan a {duration}-day {budget} trip to {city} for {travelers} interested in " + "{interest}. Create a complete itinerary."), + + ("I want to visit {city} for {duration} days with a {budget} budget. I love " + "{interest}. Create me an itinerary."), + + ("Create a {duration}-day itinerary for {city}. Budget: {budget}, interests: " + "{interest} and {interest2}."), + + # === MODERATELY SPECIFIC REQUESTS (some details, need destination selection) === + # Agent should: search destinations + coordinates + weather + info + create itinerary (5-6 tools) + ("I want a {duration}-day {budget} {season} vacation in {region}. I'm " + "interested in {interest}. Plan my trip."), + + ("Plan a {adjective} trip to {region} for {travelers}. Budget is {budget}, " + "duration {duration} days. Interested in {interest}."), + + ("I need a {duration}-day itinerary for {region} focusing on {interest} and " + "{interest2}. Budget: {budget}."), + + # === BROAD REQUESTS (region only, need destination research) === + # Agent should: search destinations + coordinates + weather + info + create itinerary (5-7 tools) + ("I want to explore {region} in {season}. Find good destinations and create an " + "itinerary for me."), + + ("Plan a {budget} trip to {region}. I love {interest}. Find the best place and " + "create an itinerary."), + + ("Help me plan a vacation in {region} for {travelers}. I'm interested in " + "{interest}."), + + ("I want to visit {region}. Create a travel plan for me focusing on {interest} " + "and {interest2}."), + + # === COMPARISON REQUESTS (compare then decide and create itinerary) === + # Agent should: coordinates + weather + info for both + create itinerary for winner (6-8 tools) + ("Should I visit {city1} or {city2} for a {duration}-day {season} trip? " + "Compare them and create an itinerary for the better option."), + + ("I'm deciding between {city1} and {city2}. Check weather, compare them, and " + "create a {duration}-day itinerary for your recommendation."), + + # === VAGUE/OPEN-ENDED REQUESTS (minimal details) === + # Agent should: search region + pick destination + weather + info + create itinerary (5-7 tools) + "I need a vacation. I like {interest}. Plan something for me.", + + "Plan a {season} getaway for {travelers}. Surprise me with a good destination.", + + "I want to go somewhere {adjective} for {interest}. Create a trip for me.", + + "Find me a great {budget} destination and plan my trip.", + + # === RESEARCH-HEAVY REQUESTS (lots of comparison before itinerary) === + # Agent should: search + coordinates (multiple) + weather (multiple) + info + create itinerary (7-10 tools) + ("Find the best {season} destinations in {region}. Check weather for top 3, " + "then create an itinerary for the best one."), + + ("I want a {budget} {interest} trip. Search {region}, compare weather in " + "several places, and create an itinerary for the top pick."), + + ("Show me good {adjective} destinations in {region}. Compare a few, then plan " + "a {duration}-day trip to your favorite."), + + # === MULTI-CITY REQUESTS === + # Agent should: coordinates (multiple) + distances + weather (multiple) + # + create multi-city itinerary (7-9 tools) + ("Plan a {duration}-day multi-city trip visiting {city1}, {city2}, and " + "{city3}. Create a complete itinerary."), + + ("I want to visit multiple cities in {region} over {duration} days. Find the " + "best route and create an itinerary."), + ] + + regions = ["Europe", "Asia", "Americas", "Africa", "Oceania"] + budgets = ["budget", "moderate", "luxury"] + durations = ["3", "5", "7", "10", "14"] + travelers = ["solo travelers", "couples", "families", "groups"] + seasons = ["spring", "summer", "fall", "winter"] + interests = ["food", "history", "nature", "beaches", "museums", "adventure", "culture", "nightlife"] + adjectives = ["quick", "relaxing", "adventurous", "cultural", "romantic", "family-friendly", "exciting", "peaceful"] + cities = [ + "Paris", "Tokyo", "New York", "London", "Barcelona", "Rome", "Bangkok", + "Dubai", "Singapore", "Amsterdam", "Berlin", "Sydney", "Istanbul", "Prague", + "Vienna", "Lisbon", "Cairo", "Mumbai", "Toronto", "Buenos Aires" + ] + + queries = [] + for _ in range(n): + template = random.choice(templates) + + # Pick two different interests + interest1 = random.choice(interests) + interest2 = random.choice([i for i in interests if i != interest1]) + + # Pick three different cities + city_choices = random.sample(cities, 3) + + query = template.format( + region=random.choice(regions), + budget=random.choice(budgets), + duration=random.choice(durations), + travelers=random.choice(travelers), + season=random.choice(seasons), + interest=interest1, + interest2=interest2, + adjective=random.choice(adjectives), + city=random.choice(cities), + city1=city_choices[0], + city2=city_choices[1], + city3=city_choices[2] + ) + queries.append(query) + + return queries + + +async def main(): + """Main entry point for the travel agent application.""" + + parser = argparse.ArgumentParser(description="Travel Planning Agent Demo") + parser.add_argument( + "--count", + type=int, + default=1, + help="Number of queries to run (default: 3)" + ) + parser.add_argument( + "--delay", + type=float, + default=2.0, + help="Delay between queries in seconds (default: 2.0)" + ) + + args = parser.parse_args() + + print("=" * 80) + print("Travel Planning Agent with OpenAI Agents SDK") + print("=" * 80) + print(f"Running {args.count} travel planning queries...") + print("Goal: Create complete itineraries with varying research depth") + print("Using real APIs: REST Countries, Open-Meteo, Nominatim, Wikipedia, OpenAI") + print("6 Tools: search, coordinates, weather, info, distance, itinerary") + print("=" * 80) + print() + + queries = generate_travel_queries(args.count) + + all_tool_calls = [] + for i, query in enumerate(queries, 1): + print(f"\n\n{'#'*80}") + print(f"# Query {i} of {args.count}") + print(f"{'#'*80}\n") + + tool_calls = await run_travel_query(query) + all_tool_calls.append({ + "query": query, + "tools_used": tool_calls, + "tool_count": len(tool_calls) + }) + + if i < args.count: + print(f"\nWaiting {args.delay} seconds before next query...") + time.sleep(args.delay) + + # Summary + print("\n\n" + "=" * 80) + print("EXECUTION SUMMARY") + print("=" * 80) + print(f"Total queries executed: {len(all_tool_calls)}") + + tool_usage = {} + for result in all_tool_calls: + for tool in result["tools_used"]: + tool_usage[tool] = tool_usage.get(tool, 0) + 1 + + print("\nTool usage statistics:") + for tool, count in sorted(tool_usage.items(), key=lambda x: x[1], reverse=True): + print(f" - {tool}: {count} times") + + print("\nTrajectory variation:") + unique_trajectories = len(set(tuple(r["tools_used"]) for r in all_tool_calls)) + print(f" - Unique tool call sequences: {unique_trajectories}/{len(all_tool_calls)}") + + avg_tools = sum(r["tool_count"] for r in all_tool_calls) / len(all_tool_calls) + print(f" - Average tools per query: {avg_tools:.2f}") + + print("\n" + "=" * 80) + print("✅ Travel Agent demo completed successfully!") + print("🔍 All spans captured by OpenTelemetry instrumentation") + print("=" * 80) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/packages/sample-app/sample_app/experiment/made_by_traceloop/agents_exp.py b/packages/sample-app/sample_app/experiment/made_by_traceloop/agents_exp.py new file mode 100644 index 0000000000..5ba601c5e1 --- /dev/null +++ b/packages/sample-app/sample_app/experiment/made_by_traceloop/agents_exp.py @@ -0,0 +1,179 @@ +""" +Agent Evaluators Experiment + +This example demonstrates Traceloop's agent evaluators: +- Agent Goal Accuracy: Validates agent goal achievement +- Agent Tool Error Detector: Detects errors or failures during tool execution +- Agent Flow Quality: Validates agent trajectories against user-defined natural language tests +- Agent Efficiency: Evaluates agent efficiency by checking for redundant calls and optimal paths +- Agent Goal Completeness: Measures whether the agent successfully accomplished all user goals + +These evaluators help ensure your AI agents perform optimally and achieve their objectives. +""" + +import asyncio +import os +from openai import AsyncOpenAI +from traceloop.sdk import Traceloop +from traceloop.sdk.evaluator import EvaluatorMadeByTraceloop + +# Initialize Traceloop +client = Traceloop.init() + + +async def generate_agent_trace(task_description: str) -> dict: + """ + Simulate an agent execution and generate trace data. + In a real scenario, this would come from your actual agent framework. + """ + openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) + + # Simulate agent executing a task + response = await openai_client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[ + {"role": "system", "content": "You are a helpful agent that completes tasks step by step."}, + {"role": "user", "content": task_description} + ], + temperature=0.7, + max_tokens=300, + ) + + completion = response.choices[0].message.content + + # Return trace data (simplified for demo) + # In production, this would be actual trace/span data from your agent + return { + "task": task_description, + "completion": completion, + } + + +async def agent_evaluators_task(row): + """ + Unified task function for all 5 agent evaluators. + + IMPORTANT: Thanks to field synonym mapping, you can use flexible field names! + For example: + - "answer", "response", "text" → all map to "completion" + - "prompt", "instructions" → map to "question" + - "context", "ground_truth" → map to "reference" + - "prompts" → maps to "trajectory_prompts" + - "completions" → maps to "trajectory_completions" + + This makes it easier to write tasks without worrying about exact field names. + + Required fields for the 5 agent evaluators: + - question (or prompt): The input question or goal (for agent_goal_accuracy) + - completion (or answer, response, text): The agent's completion (for agent_goal_accuracy) + - reference (or ground_truth, context): The reference answer (for agent_goal_accuracy) + - tool_input: The input to tools (for agent_tool_error_detector) + - tool_output: The output from tools (for agent_tool_error_detector) + - trajectory_prompts (or prompts): The agent's prompt trajectory + (for agent_flow_quality, agent_efficiency, agent_goal_completeness) + - trajectory_completions (or completions): The agent's completion trajectory + (for agent_flow_quality, agent_efficiency, agent_goal_completeness) + """ + # Get data from row or use defaults + question = row.get("question", "Book a flight from New York to Paris") + reference = row.get( + "reference", + "Successfully booked flight NYC to Paris, departure 2024-12-15, return 2024-12-22" + ) + tool_input = row.get("tool_input", "New York to Paris") + tool_output = row.get( + "tool_output", + "Successfully booked flight NYC to Paris, departure 2024-12-15, return 2024-12-22" + ) + trajectory_prompts = row.get("trajectory_prompts", "New York to Paris") + trajectory_completions = row.get( + "trajectory_completions", + "Successfully booked flight NYC to Paris, departure 2024-12-15, return 2024-12-22" + ) + + # Generate agent trace + trace_data = await generate_agent_trace(question) + + # You can use synonyms! These will automatically map to the required fields: + # - Using "answer" instead of "completion" ✓ + # - Using "prompt" instead of "question" ✓ + # - Using "context" instead of "reference" ✓ + return { + "prompt": question, # Maps to "question" + "answer": trace_data["completion"], # Maps to "completion" + "context": reference, # Maps to "reference" + "tool_input": tool_input, + "tool_output": tool_output, + "prompts": trajectory_prompts, # Maps to "trajectory_prompts" + "completions": trajectory_completions, # Maps to "trajectory_completions" + } + + +async def run_agents_experiment(): + """ + Run experiment with all 5 agent evaluators. + + This experiment will evaluate agent performance across: + 1. Agent Goal Accuracy - Did the agent achieve the stated goal? + 2. Agent Tool Error Detector - Were there any tool execution errors? + 3. Agent Flow Quality - Did the agent follow the expected trajectory? + 4. Agent Efficiency - Was the agent efficient (no redundant calls)? + 5. Agent Goal Completeness - Did the agent fully accomplish all goals? + """ + + print("\n" + "="*80) + print("AGENT EVALUATORS EXPERIMENT") + print("="*80 + "\n") + + print("This experiment will test five agent-specific evaluators:\n") + print("1. Agent Goal Accuracy - Validates goal achievement") + print("2. Agent Tool Error Detector - Detects tool execution errors") + print("3. Agent Flow Quality - Validates expected trajectories") + print("4. Agent Efficiency - Checks for optimal execution paths") + print("5. Agent Goal Completeness - Measures full goal accomplishment") + print("\n" + "-"*80 + "\n") + + # Configure agent evaluators + evaluators = [ + EvaluatorMadeByTraceloop.agent_goal_accuracy(), + EvaluatorMadeByTraceloop.agent_tool_error_detector(), + EvaluatorMadeByTraceloop.agent_flow_quality(), + EvaluatorMadeByTraceloop.agent_efficiency(), + EvaluatorMadeByTraceloop.agent_goal_completeness(), + ] + + print("Running experiment with evaluators:") + for evaluator in evaluators: + print(f" - {evaluator.slug}") + + print("\n" + "-"*80 + "\n") + + # Run the experiment + # Note: You'll need to create a dataset with appropriate test cases for agents + results, errors = await client.experiment.run( + dataset_slug="agents", # Set a dataset slug that exists in the traceloop platform + dataset_version="v1", + task=agent_evaluators_task, + evaluators=evaluators, + experiment_slug="agents-evaluators-exp", + stop_on_error=False, + wait_for_results=True, + ) + + print("\n" + "="*80) + print("Agent evaluators experiment completed!") + print("="*80 + "\n") + + print("Results summary:") + print(f" - Total rows processed: {len(results) if results else 0}") + print(f" - Errors encountered: {len(errors) if errors else 0}") + + if errors: + print("\nErrors:") + for error in errors: + print(f" - {error}") + +if __name__ == "__main__": + print("\nAgent Evaluators Experiment\n") + + asyncio.run(run_agents_experiment()) diff --git a/packages/traceloop-sdk/tests/evaluator/test_evaluator.py b/packages/traceloop-sdk/tests/evaluator/test_evaluator.py index fd07d16e77..9da8862ffd 100644 --- a/packages/traceloop-sdk/tests/evaluator/test_evaluator.py +++ b/packages/traceloop-sdk/tests/evaluator/test_evaluator.py @@ -1,5 +1,5 @@ import pytest -from traceloop.sdk.evaluator.evaluator import validate_task_output +from traceloop.sdk.evaluator.evaluator import validate_and_normalize_task_output from traceloop.sdk.evaluator.config import EvaluatorDetails @@ -12,7 +12,7 @@ def test_validate_task_output_with_no_evaluators(self): evaluators = [] # Should not raise any exception - validate_task_output(task_output, evaluators) + validate_and_normalize_task_output(task_output, evaluators) def test_validate_task_output_with_evaluators_no_required_fields(self): """Test that validation passes when evaluators have no required fields""" @@ -23,7 +23,7 @@ def test_validate_task_output_with_evaluators_no_required_fields(self): ] # Should not raise any exception - validate_task_output(task_output, evaluators) + validate_and_normalize_task_output(task_output, evaluators) def test_validate_task_output_with_valid_output(self): """Test that validation passes when all required fields are present""" @@ -37,7 +37,7 @@ def test_validate_task_output_with_valid_output(self): ] # Should not raise any exception - validate_task_output(task_output, evaluators) + validate_and_normalize_task_output(task_output, evaluators) def test_validate_task_output_missing_single_field(self): """Test that validation fails when a single required field is missing""" @@ -47,11 +47,12 @@ def test_validate_task_output_missing_single_field(self): ] with pytest.raises(ValueError) as exc_info: - validate_task_output(task_output, evaluators) + validate_and_normalize_task_output(task_output, evaluators) error_message = str(exc_info.value) assert "Task output missing required fields for evaluators:" in error_message - assert "pii-detector requires: ['text']" in error_message + assert "pii-detector requires:" in error_message + assert "'text'" in error_message assert "Task output contains: ['prompt']" in error_message assert ( "Hint: Update your task function to return a dictionary " @@ -69,7 +70,7 @@ def test_validate_task_output_missing_multiple_fields_single_evaluator(self): ] with pytest.raises(ValueError) as exc_info: - validate_task_output(task_output, evaluators) + validate_and_normalize_task_output(task_output, evaluators) error_message = str(exc_info.value) assert "relevance-checker requires:" in error_message @@ -90,7 +91,7 @@ def test_validate_task_output_missing_fields_multiple_evaluators(self): ] with pytest.raises(ValueError) as exc_info: - validate_task_output(task_output, evaluators) + validate_and_normalize_task_output(task_output, evaluators) error_message = str(exc_info.value) assert "pii-detector requires:" in error_message @@ -112,7 +113,7 @@ def test_validate_task_output_partial_match(self): ] with pytest.raises(ValueError) as exc_info: - validate_task_output(task_output, evaluators) + validate_and_normalize_task_output(task_output, evaluators) error_message = str(exc_info.value) # Should only mention the failing evaluator @@ -127,7 +128,7 @@ def test_validate_task_output_empty_task_output(self): ] with pytest.raises(ValueError) as exc_info: - validate_task_output(task_output, evaluators) + validate_and_normalize_task_output(task_output, evaluators) error_message = str(exc_info.value) assert "Task output contains: []" in error_message @@ -146,7 +147,7 @@ def test_validate_task_output_with_extra_fields(self): ] # Should not raise any exception - extra fields are allowed - validate_task_output(task_output, evaluators) + validate_and_normalize_task_output(task_output, evaluators) def test_validate_task_output_case_sensitive_field_names(self): """Test that field name matching is case-sensitive""" @@ -156,10 +157,11 @@ def test_validate_task_output_case_sensitive_field_names(self): ] with pytest.raises(ValueError) as exc_info: - validate_task_output(task_output, evaluators) + validate_and_normalize_task_output(task_output, evaluators) error_message = str(exc_info.value) - assert "pii-detector requires: ['text']" in error_message + assert "pii-detector requires:" in error_message + assert "'text'" in error_message assert "Task output contains: ['Text']" in error_message def test_validate_task_output_with_evaluator_config(self): @@ -175,7 +177,7 @@ def test_validate_task_output_with_evaluator_config(self): ] # Should not raise any exception - config shouldn't affect validation - validate_task_output(task_output, evaluators) + validate_and_normalize_task_output(task_output, evaluators) def test_validate_task_output_mixed_evaluators(self): """Test validation with a mix of evaluators with and without required fields""" @@ -191,11 +193,12 @@ def test_validate_task_output_mixed_evaluators(self): ] with pytest.raises(ValueError) as exc_info: - validate_task_output(task_output, evaluators) + validate_and_normalize_task_output(task_output, evaluators) error_message = str(exc_info.value) # Should only mention failing evaluator - assert "relevance-checker requires: ['prompt']" in error_message + assert "relevance-checker requires:" in error_message + assert "'prompt'" in error_message assert "evaluator-no-requirements" not in error_message assert "pii-detector" not in error_message or "pii-detector requires:" not in error_message @@ -212,7 +215,7 @@ def test_validate_task_output_duplicate_required_fields(self): ] with pytest.raises(ValueError) as exc_info: - validate_task_output(task_output, evaluators) + validate_and_normalize_task_output(task_output, evaluators) error_message = str(exc_info.value) assert "pii-detector requires:" in error_message diff --git a/packages/traceloop-sdk/tests/evaluator/test_field_mapping.py b/packages/traceloop-sdk/tests/evaluator/test_field_mapping.py new file mode 100644 index 0000000000..962e9b3949 --- /dev/null +++ b/packages/traceloop-sdk/tests/evaluator/test_field_mapping.py @@ -0,0 +1,412 @@ +import pytest +from traceloop.sdk.evaluator.field_mapping import ( + get_synonyms, + normalize_task_output, + get_field_suggestions, + format_field_help, +) + + +class TestGetSynonyms: + """Tests for get_synonyms function""" + + def test_get_synonyms_for_text(self): + """Test getting synonyms for 'text' field""" + synonyms = get_synonyms("text") + assert "text" in synonyms + assert "completion" in synonyms + assert "answer" in synonyms + assert "response" in synonyms + assert len(synonyms) == 4 + + def test_get_synonyms_for_question(self): + """Test getting synonyms for 'question' field""" + synonyms = get_synonyms("question") + assert "question" in synonyms + assert "prompt" in synonyms + assert "instructions" in synonyms + assert "query" in synonyms + assert len(synonyms) == 4 + + def test_get_synonyms_for_reference(self): + """Test getting synonyms for 'reference' field""" + synonyms = get_synonyms("reference") + assert "reference" in synonyms + assert "ground_truth" in synonyms + assert "context" in synonyms + assert len(synonyms) == 3 + + def test_get_synonyms_for_trajectory_prompts(self): + """Test getting synonyms for 'trajectory_prompts' field""" + synonyms = get_synonyms("trajectory_prompts") + assert "trajectory_prompts" in synonyms + assert "prompts" in synonyms + assert len(synonyms) == 2 + + def test_get_synonyms_for_trajectory_completions(self): + """Test getting synonyms for 'trajectory_completions' field""" + synonyms = get_synonyms("trajectory_completions") + assert "trajectory_completions" in synonyms + assert "completions" in synonyms + assert len(synonyms) == 2 + + def test_get_synonyms_for_non_synonym_field(self): + """Test getting synonyms for a field without synonyms""" + synonyms = get_synonyms("tool_input") + assert synonyms == {"tool_input"} + assert len(synonyms) == 1 + + def test_get_synonyms_for_unknown_field(self): + """Test getting synonyms for an unknown field""" + synonyms = get_synonyms("unknown_field") + assert synonyms == {"unknown_field"} + assert len(synonyms) == 1 + + def test_get_synonyms_symmetry(self): + """Test that synonym relationships are symmetric""" + # All synonyms in a group should map to the same set + text_synonyms = get_synonyms("text") + completion_synonyms = get_synonyms("completion") + answer_synonyms = get_synonyms("answer") + response_synonyms = get_synonyms("response") + + assert text_synonyms == completion_synonyms + assert text_synonyms == answer_synonyms + assert text_synonyms == response_synonyms + + +class TestNormalizeTaskOutput: + """Tests for normalize_task_output function""" + + def test_normalize_text_to_completion(self): + """Test normalizing 'text' to 'completion'""" + task_output = {"text": "hello world"} + required = ["completion"] + normalized = normalize_task_output(task_output, required) + + assert "completion" in normalized + assert normalized["completion"] == "hello world" + assert "text" not in normalized # Original field removed after mapping + + def test_normalize_answer_to_completion(self): + """Test normalizing 'answer' to 'completion'""" + task_output = {"answer": "Paris"} + required = ["completion"] + normalized = normalize_task_output(task_output, required) + + assert "completion" in normalized + assert normalized["completion"] == "Paris" + + def test_normalize_prompt_to_question(self): + """Test normalizing 'prompt' to 'question'""" + task_output = {"prompt": "What is the capital?"} + required = ["question"] + normalized = normalize_task_output(task_output, required) + + assert "question" in normalized + assert normalized["question"] == "What is the capital?" + + def test_normalize_context_to_reference(self): + """Test normalizing 'context' to 'reference'""" + task_output = {"context": "The sky is blue"} + required = ["reference"] + normalized = normalize_task_output(task_output, required) + + assert "reference" in normalized + assert normalized["reference"] == "The sky is blue" + + def test_normalize_ground_truth_to_reference(self): + """Test normalizing 'ground_truth' to 'reference'""" + task_output = {"ground_truth": "Correct answer"} + required = ["reference"] + normalized = normalize_task_output(task_output, required) + + assert "reference" in normalized + assert normalized["reference"] == "Correct answer" + + def test_normalize_prompts_to_trajectory_prompts(self): + """Test normalizing 'prompts' to 'trajectory_prompts'""" + task_output = {"prompts": "prompt1, prompt2"} + required = ["trajectory_prompts"] + normalized = normalize_task_output(task_output, required) + + assert "trajectory_prompts" in normalized + assert normalized["trajectory_prompts"] == "prompt1, prompt2" + + def test_normalize_completions_to_trajectory_completions(self): + """Test normalizing 'completions' to 'trajectory_completions'""" + task_output = {"completions": "comp1, comp2"} + required = ["trajectory_completions"] + normalized = normalize_task_output(task_output, required) + + assert "trajectory_completions" in normalized + assert normalized["trajectory_completions"] == "comp1, comp2" + + def test_normalize_multiple_fields(self): + """Test normalizing multiple fields at once""" + task_output = { + "answer": "Paris", + "prompt": "What is the capital?", + "context": "France", + } + required = ["completion", "question", "reference"] + normalized = normalize_task_output(task_output, required) + + assert normalized["completion"] == "Paris" + assert normalized["question"] == "What is the capital?" + assert normalized["reference"] == "France" + + def test_normalize_with_no_mapping_needed(self): + """Test when field names already match required fields""" + task_output = {"completion": "hello", "question": "greet"} + required = ["completion", "question"] + normalized = normalize_task_output(task_output, required) + + assert normalized["completion"] == "hello" + assert normalized["question"] == "greet" + + def test_normalize_preserves_extra_fields(self): + """Test that extra fields not in required list are preserved""" + task_output = { + "answer": "Paris", + "extra_field": "value", + "another": 123, + } + required = ["completion"] + normalized = normalize_task_output(task_output, required) + + assert normalized["completion"] == "Paris" + assert "extra_field" in normalized + assert "another" in normalized + assert normalized["extra_field"] == "value" + assert normalized["another"] == 123 + + def test_normalize_empty_task_output(self): + """Test normalizing with empty task output""" + task_output = {} + required = ["completion"] + normalized = normalize_task_output(task_output, required) + + assert "completion" not in normalized + assert len(normalized) == 0 + + def test_normalize_empty_required_fields(self): + """Test normalizing with empty required fields""" + task_output = {"text": "hello", "extra": "value"} + required = [] + normalized = normalize_task_output(task_output, required) + + # Should preserve all fields when no mapping needed + assert "text" in normalized + assert "extra" in normalized + + def test_normalize_prioritizes_exact_match(self): + """Test that exact field match is preferred over synonyms""" + # If both synonym and exact match exist, exact match should be used + task_output = { + "completion": "exact", + "answer": "synonym", + } + required = ["completion"] + normalized = normalize_task_output(task_output, required) + + # Should use the exact match "completion" + assert normalized["completion"] == "exact" + + def test_normalize_with_non_synonym_fields(self): + """Test normalizing fields that have no synonyms""" + task_output = { + "tool_input": "input_value", + "tool_output": "output_value", + } + required = ["tool_input", "tool_output"] + normalized = normalize_task_output(task_output, required) + + assert normalized["tool_input"] == "input_value" + assert normalized["tool_output"] == "output_value" + + def test_normalize_mixed_synonyms_and_non_synonyms(self): + """Test normalizing a mix of synonym and non-synonym fields""" + task_output = { + "answer": "Paris", + "tool_input": "search", + "prompt": "What is the capital?", + } + required = ["completion", "tool_input", "question"] + normalized = normalize_task_output(task_output, required) + + assert normalized["completion"] == "Paris" + assert normalized["tool_input"] == "search" + assert normalized["question"] == "What is the capital?" + + +class TestGetFieldSuggestions: + """Tests for get_field_suggestions function""" + + def test_suggest_synonym_for_missing_completion(self): + """Test suggesting synonyms for missing 'completion' field""" + missing = "completion" + available = ["answer", "question"] + suggestions = get_field_suggestions(missing, available) + + assert "answer" in suggestions + assert "question" not in suggestions + + def test_suggest_synonym_for_missing_question(self): + """Test suggesting synonyms for missing 'question' field""" + missing = "question" + available = ["prompt", "text"] + suggestions = get_field_suggestions(missing, available) + + assert "prompt" in suggestions + assert "text" not in suggestions + + def test_suggest_synonym_for_missing_reference(self): + """Test suggesting synonyms for missing 'reference' field""" + missing = "reference" + available = ["context", "completion"] + suggestions = get_field_suggestions(missing, available) + + assert "context" in suggestions + assert "completion" not in suggestions + + def test_no_suggestions_when_no_synonyms_available(self): + """Test no suggestions when available fields have no synonyms""" + missing = "completion" + available = ["tool_input", "tool_output"] + suggestions = get_field_suggestions(missing, available) + + assert len(suggestions) == 0 + + def test_multiple_synonyms_available(self): + """Test suggesting when multiple synonyms are available""" + missing = "completion" + available = ["answer", "text", "response"] + suggestions = get_field_suggestions(missing, available) + + assert "answer" in suggestions + assert "text" in suggestions + assert "response" in suggestions + + def test_empty_available_fields(self): + """Test with empty available fields""" + missing = "completion" + available = [] + suggestions = get_field_suggestions(missing, available) + + assert len(suggestions) == 0 + + +class TestFormatFieldHelp: + """Tests for format_field_help function""" + + def test_format_field_with_synonyms(self): + """Test formatting help for field with synonyms""" + help_text = format_field_help("completion") + assert "completion" in help_text + assert "answer" in help_text + assert "response" in help_text + assert "text" in help_text + assert "synonym" in help_text.lower() + + def test_format_field_without_synonyms(self): + """Test formatting help for field without synonyms""" + help_text = format_field_help("tool_input") + assert help_text == "'tool_input'" + assert "synonym" not in help_text.lower() + + def test_format_multiple_fields(self): + """Test formatting help for multiple fields""" + fields = ["completion", "question", "reference"] + help_texts = [format_field_help(field) for field in fields] + + assert len(help_texts) == 3 + assert all("synonym" in text.lower() for text in help_texts) + + +class TestIntegrationWithValidateTaskOutput: + """Integration tests with validate_and_normalize_task_output""" + + def test_validate_with_synonym_mapping(self): + """Test that validate_and_normalize_task_output uses synonym mapping""" + from traceloop.sdk.evaluator.evaluator import validate_and_normalize_task_output + from traceloop.sdk.evaluator.config import EvaluatorDetails + + # User returns "answer" but evaluator needs "completion" + task_output = {"answer": "Paris", "prompt": "What is the capital?"} + evaluators = [ + EvaluatorDetails( + slug="test-evaluator", + required_input_fields=["completion", "question"], + ) + ] + + # Should not raise - synonyms should be mapped + normalized = validate_and_normalize_task_output(task_output, evaluators) + assert "completion" in normalized + assert "question" in normalized + assert normalized["completion"] == "Paris" + assert normalized["question"] == "What is the capital?" + + def test_validate_fails_with_helpful_message(self): + """Test that validation failure includes synonym suggestions""" + from traceloop.sdk.evaluator.evaluator import validate_and_normalize_task_output + from traceloop.sdk.evaluator.config import EvaluatorDetails + + task_output = {"wrong_field": "value"} + evaluators = [ + EvaluatorDetails( + slug="test-evaluator", + required_input_fields=["completion"], + ) + ] + + with pytest.raises(ValueError) as exc_info: + validate_and_normalize_task_output(task_output, evaluators) + + error_message = str(exc_info.value) + assert "test-evaluator requires:" in error_message + assert "completion" in error_message + assert "synonym" in error_message.lower() + + def test_validate_with_context_to_reference_mapping(self): + """Test specific case of context mapping to reference""" + from traceloop.sdk.evaluator.evaluator import validate_and_normalize_task_output + from traceloop.sdk.evaluator.config import EvaluatorDetails + + task_output = { + "answer": "Yes", + "question": "Is it true?", + "context": "The sky is blue", + } + evaluators = [ + EvaluatorDetails( + slug="faithfulness", + required_input_fields=["completion", "question", "reference"], + ) + ] + + normalized = validate_and_normalize_task_output(task_output, evaluators) + assert normalized["completion"] == "Yes" + assert normalized["question"] == "Is it true?" + assert normalized["reference"] == "The sky is blue" + + def test_validate_with_trajectory_fields(self): + """Test mapping for trajectory fields used in agent evaluators""" + from traceloop.sdk.evaluator.evaluator import validate_and_normalize_task_output + from traceloop.sdk.evaluator.config import EvaluatorDetails + + task_output = { + "prompts": "prompt1, prompt2", + "completions": "comp1, comp2", + } + evaluators = [ + EvaluatorDetails( + slug="agent-efficiency", + required_input_fields=["trajectory_prompts", "trajectory_completions"], + ) + ] + + normalized = validate_and_normalize_task_output(task_output, evaluators) + assert normalized["trajectory_prompts"] == "prompt1, prompt2" + assert normalized["trajectory_completions"] == "comp1, comp2" diff --git a/packages/traceloop-sdk/tests/experiment/test_experiment.py b/packages/traceloop-sdk/tests/experiment/test_experiment.py index 2e2ccd2aeb..1e85323301 100644 --- a/packages/traceloop-sdk/tests/experiment/test_experiment.py +++ b/packages/traceloop-sdk/tests/experiment/test_experiment.py @@ -165,7 +165,8 @@ async def task_missing_fields(row): # Should have at least one error and should have broken early assert len(errors) >= 1 assert "Task output missing required fields for evaluators:" in errors[0] - assert "pii-detector requires: ['text']" in errors[0] + assert "pii-detector requires:" in errors[0] + assert "'text'" in errors[0] # With stop_on_error=True, it should break and not process all 3 rows assert len(results) + len(errors) <= 3 @@ -211,7 +212,8 @@ async def task_missing_fields(row): assert len(errors) == 1 assert len(results) == 0 assert "Task output missing required fields for evaluators:" in errors[0] - assert "pii-detector requires: ['text']" in errors[0] + assert "pii-detector requires:" in errors[0] + assert "'text'" in errors[0] @pytest.mark.anyio(backends=["asyncio"]) async def test_run_locally_succeeds_with_valid_output(self): @@ -310,11 +312,12 @@ async def task_partial_output(row): ) # Should have error with validation message + # Note: 'text' in task output maps to 'response' via synonym mapping, + # so only 'prompt' is missing assert len(errors) >= 1 error_message = errors[0] assert "relevance-checker requires:" in error_message assert "'prompt'" in error_message - assert "'response'" in error_message @pytest.mark.anyio(backends=["asyncio"]) async def test_run_locally_no_validation_for_string_evaluators(self): diff --git a/packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py b/packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py index 799f519c72..8d44ab4d95 100644 --- a/packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py +++ b/packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py @@ -1,5 +1,6 @@ import httpx from typing import Dict, Optional, Any, List +from .field_mapping import normalize_task_output, get_field_suggestions, format_field_help from .model import ( InputExtractor, @@ -145,24 +146,37 @@ async def trigger_experiment_evaluator( return execute_response.execution_id -def validate_task_output( +def validate_and_normalize_task_output( task_output: Dict[str, Any], evaluators: List[EvaluatorDetails], -) -> None: +) -> Dict[str, Any]: """ Validate that task output contains all required fields for the given evaluators. + Automatically normalizes field names using synonym mappings. Args: task_output: The dictionary returned by the task function evaluators: List of EvaluatorDetails to validate against + Returns: + Normalized task output with field names mapped to evaluator requirements + Raises: - ValueError: If task output is missing required fields for any evaluator + ValueError: If task output is missing required fields for any evaluator (even after synonym mapping) """ if not evaluators: - return + return task_output + + # Collect all required fields across all evaluators + all_required_fields = [] + for evaluator in evaluators: + if evaluator.required_input_fields: + all_required_fields.extend(evaluator.required_input_fields) + + # Normalize task output to match required fields using synonyms + normalized_output = normalize_task_output(task_output, all_required_fields) - # Collect all validation errors + # Now validate against normalized output missing_fields_by_evaluator: Dict[str, set[str]] = {} for evaluator in evaluators: @@ -171,7 +185,7 @@ def validate_task_output( missing_fields = [ field for field in evaluator.required_input_fields - if field not in task_output + if field not in normalized_output ] if missing_fields: @@ -180,19 +194,29 @@ def validate_task_output( missing_fields_by_evaluator[evaluator.slug] = set() missing_fields_by_evaluator[evaluator.slug].update(missing_fields) - # If there are any missing fields, raise a detailed error + # If there are any missing fields, raise a detailed error with suggestions if missing_fields_by_evaluator: error_lines = ["Task output missing required fields for evaluators:"] for slug, fields in missing_fields_by_evaluator.items(): - error_lines.append(f" - {slug} requires: {sorted(fields)}") + error_lines.append(f" - {slug} requires:") + for field in sorted(fields): + suggestions = get_field_suggestions(field, list(task_output.keys())) + field_help = format_field_help(field) + if suggestions: + error_lines.append(f" {field_help} - Did you mean: {suggestions}?") + else: + error_lines.append(f" {field_help}") error_lines.append(f"\nTask output contains: {list(task_output.keys())}") error_lines.append("\nHint: Update your task function to return a dictionary with the required fields.") + error_lines.append("You can use any of the accepted synonyms shown above.") raise ValueError("\n".join(error_lines)) + return normalized_output + def _extract_error_from_response(response: httpx.Response) -> str: """ diff --git a/packages/traceloop-sdk/traceloop/sdk/evaluator/evaluators_made_by_traceloop.py b/packages/traceloop-sdk/traceloop/sdk/evaluator/evaluators_made_by_traceloop.py index 5ad610eec0..2b52bd4c21 100644 --- a/packages/traceloop-sdk/traceloop/sdk/evaluator/evaluators_made_by_traceloop.py +++ b/packages/traceloop-sdk/traceloop/sdk/evaluator/evaluators_made_by_traceloop.py @@ -299,6 +299,28 @@ def faithfulness( required_input_fields=["question", "completion", "context"], ) + @staticmethod + def context_relevance( + ) -> EvaluatorDetails: + """ + Context relevance evaluator - validates context relevance. + + Required task output fields: + - query: The user's query or question + - context: The retrieved context to evaluate for relevance + + Returns: + EvaluatorDetails configured for context relevance evaluation + """ + config: Dict[str, Any] = {} + + return EvaluatorDetails( + slug="context-relevance", + version=None, + config=config, + required_input_fields=["query", "context"], + ) + @staticmethod def profanity_detector() -> EvaluatorDetails: """ @@ -453,3 +475,247 @@ def perplexity( config=config, required_input_fields=["prompt"], ) + + @staticmethod + def answer_completeness( + ) -> EvaluatorDetails: + """ + Answer completeness evaluator - measures how completely responses use relevant context. + + Required task output fields: + - question: The input question + - completion: The completion to evaluate + - context: The context to evaluate against + + Returns: + EvaluatorDetails configured for answer completeness evaluation + """ + config: Dict[str, Any] = {} + + return EvaluatorDetails( + slug="answer-completeness", + version=None, + config=config, + required_input_fields=["question", "completion", "context"], + ) + + @staticmethod + def answer_correctness( + ) -> EvaluatorDetails: + """ + Answer correctness evaluator - evaluates factual accuracy by comparing answers against ground truth. + + Required task output fields: + - question: The input question + - completion: The completion to evaluate + - ground_truth: The ground truth answer + + Returns: + EvaluatorDetails configured for answer correctness evaluation + """ + config: Dict[str, Any] = {} + + return EvaluatorDetails( + slug="answer-correctness", + version=None, + config=config, + required_input_fields=["question", "completion", "ground_truth"], + ) + + @staticmethod + def uncertainty_detector( + ) -> EvaluatorDetails: + """ + Uncertainty detector evaluator - generates responses and measures model uncertainty from logprobs. + + Required task output fields: + - prompt: The prompt to evaluate uncertainty for + + Returns: + EvaluatorDetails configured for uncertainty detection + """ + config: Dict[str, Any] = {} + + return EvaluatorDetails( + slug="uncertainty-detector", + version=None, + config=config, + required_input_fields=["prompt"], + ) + + @staticmethod + def agent_tool_error_detector( + ) -> EvaluatorDetails: + """ + Agent tool error detector evaluator - detects errors or failures during tool execution. + + Required task output fields: + - tool_input: The input parameters passed to the tool + - tool_output: The output or response from the tool execution + + Returns: + EvaluatorDetails configured for agent tool error detection + """ + config: Dict[str, Any] = {} + + return EvaluatorDetails( + slug="agent-tool-error-detector", + version=None, + config=config, + required_input_fields=["tool_input", "tool_output"], + ) + + @staticmethod + def agent_flow_quality( + ) -> EvaluatorDetails: + """ + Agent flow quality evaluator - validates agent trajectories against user-defined natural language tests. + + Required task output fields: + - trajectory_prompts: The prompts extracted from the span attributes (llm.prompts.*) + - trajectory_completions: The completions extracted from the span attributes (llm.completions.*) + + Returns: + EvaluatorDetails configured for agent flow quality evaluation + """ + config: Dict[str, Any] = {} + + return EvaluatorDetails( + slug="agent-flow-quality", + version=None, + config=config, + required_input_fields=["trajectory_prompts", "trajectory_completions"], + ) + + @staticmethod + def agent_efficiency( + ) -> EvaluatorDetails: + """ + Agent efficiency evaluator - evaluates agent efficiency by checking for redundant calls and optimal paths. + + Required task output fields: + - trajectory_prompts: The prompts extracted from the span attributes (llm.prompts.*) + - trajectory_completions: The completions extracted from the span attributes (llm.completions.*) + + Returns: + EvaluatorDetails configured for agent efficiency evaluation + """ + config: Dict[str, Any] = {} + + return EvaluatorDetails( + slug="agent-efficiency", + version=None, + config=config, + required_input_fields=["trajectory_prompts", "trajectory_completions"], + ) + + @staticmethod + def agent_goal_completeness( + ) -> EvaluatorDetails: + """ + Agent goal completeness evaluator - measures whether the agent successfully accomplished all user goals. + + Required task output fields: + - trajectory_prompts: The prompts extracted from the span attributes (llm.prompts.*) + - trajectory_completions: The completions extracted from the span attributes (llm.completions.*) + + Returns: + EvaluatorDetails configured for agent goal completeness evaluation + """ + config: Dict[str, Any] = {} + + return EvaluatorDetails( + slug="agent-goal-completeness", + version=None, + config=config, + required_input_fields=["trajectory_prompts", "trajectory_completions"], + ) + + @staticmethod + def instruction_adherence( + ) -> EvaluatorDetails: + """ + Instruction adherence evaluator - measures how well the LLM response follows given instructions. + + Required task output fields: + - instructions: The instructions to evaluate against + - response: The response to evaluate + + Returns: + EvaluatorDetails configured for instruction adherence evaluation + """ + config: Dict[str, Any] = {} + + return EvaluatorDetails( + slug="instruction-adherence", + version=None, + config=config, + required_input_fields=["instructions", "response"], + ) + + @staticmethod + def conversation_quality( + ) -> EvaluatorDetails: + """ + Conversation quality evaluator - evaluates conversation quality based on tone, + clarity, flow, responsiveness, and transparency. + + Required task output fields: + - prompts: The conversation prompts (flattened dict with llm.prompts.X.content/role) + - completions: The conversation completions (flattened dict with llm.completions.X.content/role) + + Returns: + EvaluatorDetails configured for conversation quality evaluation + """ + config: Dict[str, Any] = {} + + return EvaluatorDetails( + slug="conversation-quality", + version=None, + config=config, + required_input_fields=["prompts", "completions"], + ) + + @staticmethod + def intent_change( + ) -> EvaluatorDetails: + """ + Intent change evaluator - detects whether the user's primary intent or workflow + changed significantly during a conversation. + + Required task output fields: + - prompts: The conversation prompts (flattened dict with llm.prompts.X.content/role) + - completions: The conversation completions (flattened dict with llm.completions.X.content/role) + + Returns: + EvaluatorDetails configured for intent change detection + """ + config: Dict[str, Any] = {} + + return EvaluatorDetails( + slug="intent-change", + version=None, + config=config, + required_input_fields=["prompts", "completions"], + ) + + @staticmethod + def tone_detection( + ) -> EvaluatorDetails: + """ + Tone detection evaluator - classifies emotional tone of responses (joy, anger, sadness, etc.). + + Required task output fields: + - text: The text to analyze for tone + + Returns: + EvaluatorDetails configured for tone detection + """ + config: Dict[str, Any] = {} + + return EvaluatorDetails( + slug="tone-detection", + version=None, + config=config, + required_input_fields=["text"], + ) diff --git a/packages/traceloop-sdk/traceloop/sdk/evaluator/field_mapping.py b/packages/traceloop-sdk/traceloop/sdk/evaluator/field_mapping.py new file mode 100644 index 0000000000..03ca4227d9 --- /dev/null +++ b/packages/traceloop-sdk/traceloop/sdk/evaluator/field_mapping.py @@ -0,0 +1,167 @@ +""" +Field mapping utilities for evaluator input normalization. + +This module provides synonym mapping to allow users to use flexible field names +in their task outputs, which are automatically normalized to match evaluator requirements. +""" + +from typing import Dict, Any, Set, List + + +# Synonym groups - each group contains fields that can be used interchangeably +# The first field in each group is the "canonical" form that gets preserved +SYNONYM_GROUPS = [ + # Output/Response fields - the most common synonyms + {"text", "completion", "answer", "response"}, + + # Reference/Truth/Context fields - source of truth for comparison + {"reference", "ground_truth", "context"}, + + # Input/Question fields - user input variations + {"question", "prompt", "instructions", "query"}, + + # Conversation/Trajectory prompts + {"prompts", "trajectory_prompts"}, + + # Conversation/Trajectory completions + {"completions", "trajectory_completions"}, +] + + +def _build_synonym_map() -> Dict[str, Set[str]]: + """ + Build a mapping from each field to all its synonyms (including itself). + + Returns: + Dict mapping each field name to the set of all its synonyms + """ + synonym_map: Dict[str, Set[str]] = {} + + for group in SYNONYM_GROUPS: + # Each field in the group maps to all fields in the group + for field in group: + synonym_map[field] = group + + return synonym_map + + +# Pre-built synonym map for efficient lookup +_SYNONYM_MAP = _build_synonym_map() + + +def get_synonyms(field: str) -> Set[str]: + """ + Get all synonyms for a given field name. + + Args: + field: The field name to get synonyms for + + Returns: + Set of all synonym field names (including the field itself) + If field has no synonyms, returns a set containing only the field + """ + return _SYNONYM_MAP.get(field, {field}) + + +def normalize_task_output( + task_output: Dict[str, Any], + required_fields: List[str], +) -> Dict[str, Any]: + """ + Normalize task output field names to match required evaluator fields. + + This function maps user-provided field names to the evaluator's required field names + using synonym groups. Original fields that are mapped to required fields are removed, + while fields that don't participate in mapping are preserved. + + For example, if a user returns {"answer": "..."} but the evaluator needs "completion", + this function will create {"completion": "..."} and remove "answer". + + Args: + task_output: The dictionary returned by the task function + required_fields: List of field names required by the evaluator + + Returns: + A new dictionary with normalized field names that match required_fields. + Original synonym fields are removed, but unmapped fields are preserved. + + Example: + >>> task_output = {"answer": "Paris", "prompt": "What is the capital?"} + >>> required = ["completion", "question"] + >>> normalize_task_output(task_output, required) + {"completion": "Paris", "question": "What is the capital?"} + """ + normalized = {} + mapped_keys: Set[str] = set() + + # First pass: map required fields from task output + for required_field in required_fields: + # If the exact required field already exists, use it (prioritize exact match) + if required_field in task_output: + normalized[required_field] = task_output[required_field] + mapped_keys.add(required_field) + continue + + # Get all possible synonyms for this required field + synonyms = get_synonyms(required_field) + + # Find which synonym (if any) exists in the task output + found_key = None + for synonym in synonyms: + if synonym in task_output: + found_key = synonym + break + + if found_key: + # Map the found field to the required field name + normalized[required_field] = task_output[found_key] + mapped_keys.add(found_key) + + # Second pass: preserve fields that weren't mapped + # (they might be needed by other evaluators or for debugging) + for key, value in task_output.items(): + if key not in mapped_keys and key not in normalized: + normalized[key] = value + + return normalized + + +def get_field_suggestions(missing_field: str, available_fields: List[str]) -> List[str]: + """ + Get suggestions for a missing field based on available fields and synonyms. + + Args: + missing_field: The required field that is missing + available_fields: List of field names available in task output + + Returns: + List of suggested field names that could satisfy the requirement + """ + suggestions = [] + synonyms = get_synonyms(missing_field) + + # Check if any available field is a synonym of the missing field + for available in available_fields: + available_synonyms = get_synonyms(available) + if synonyms & available_synonyms: # If there's any overlap + suggestions.append(available) + + return suggestions + + +def format_field_help(field: str) -> str: + """ + Format help text showing all accepted synonyms for a field. + + Args: + field: The field name to get help for + + Returns: + Formatted string showing the field and its synonyms + """ + synonyms = get_synonyms(field) + if len(synonyms) == 1: + return f"'{field}'" + + synonym_list = sorted(synonyms) + return f"'{field}' (or synonyms: {', '.join(repr(s) for s in synonym_list if s != field)})" diff --git a/packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py b/packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py index 23b88c758b..8c6d5540ff 100644 --- a/packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py +++ b/packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py @@ -5,7 +5,7 @@ from typing import Any, List, Callable, Optional, Tuple, Dict, Awaitable, Union from traceloop.sdk.client.http import HTTPClient from traceloop.sdk.datasets.datasets import Datasets -from traceloop.sdk.evaluator.evaluator import Evaluator, validate_task_output +from traceloop.sdk.evaluator.evaluator import Evaluator, validate_and_normalize_task_output from traceloop.sdk.experiment.model import ( InitExperimentRequest, ExperimentInitResponse, @@ -166,9 +166,9 @@ async def run_single_row(row: Optional[Dict[str, Any]]) -> TaskResponse: try: task_result = await task(row) - # Validate task output with EvaluatorDetails with required_input_fields from evaluators list + # Validate task output with EvaluatorDetails and normalize field names using synonyms if evaluators_to_validate: - validate_task_output(task_result, evaluators_to_validate) + task_result = validate_and_normalize_task_output(task_result, evaluators_to_validate) task_id = self._create_task( experiment_slug=experiment_slug, @@ -481,10 +481,10 @@ async def run_single_row(row: Optional[Dict[str, Any]]) -> TaskResult: try: task_output = await task(row) - # Validate task output schema on first execution + # Validate task output schema and normalize field names using synonyms if evaluators_to_validate: try: - validate_task_output(task_output, evaluators_to_validate) + task_output = validate_and_normalize_task_output(task_output, evaluators_to_validate) except ValueError as validation_error: print(f"\033[91m❌ Task validation failed: {str(validation_error)}\033[0m") raise ValueError(str(validation_error))