diff --git a/connector_builder_agents/src/evals/evaluators.py b/connector_builder_agents/src/evals/evaluators.py index 9a4fc82..cf52ab7 100644 --- a/connector_builder_agents/src/evals/evaluators.py +++ b/connector_builder_agents/src/evals/evaluators.py @@ -1,11 +1,19 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. -"""Evaluators for connector builder agents.""" +"""Unified LLM-based evaluator for connector builder agents. + +This module provides a single LLM-based evaluator that assesses connector quality +across multiple criteria (readiness and stream presence) using GPT-4o. This approach +simplifies the evaluation system by replacing separate programmatic and LLM evaluators +with a unified prompt-based approach. + +The evaluator is designed for use with the Phoenix evaluation framework and returns +structured scores that can be aggregated and reported in evaluation summaries. +""" import json import logging import pandas as pd -import yaml from dotenv import load_dotenv from opentelemetry.trace import get_current_span from phoenix.evals import OpenAIModel, llm_classify @@ -15,8 +23,21 @@ logger = logging.getLogger(__name__) -READINESS_EVAL_MODEL = "gpt-4o" -READINESS_EVAL_TEMPLATE = """You are evaluating whether a connector readiness test passed or failed. +UNIFIED_EVAL_MODEL = "gpt-4o" +"""Model used for unified LLM-based evaluation.""" + +UNIFIED_EVAL_TEMPLATE = """You are evaluating the quality of a generated Airbyte connector. + +You will evaluate based on two criteria and return scores for each. + +**Artifacts Provided:** +1. **Readiness Report**: Markdown report showing test results for the connector +2. **Manifest**: YAML defining the connector's streams and configuration +3. **Expected Streams**: List of stream names that should be present in the manifest + +**Evaluation Criteria:** + +Evaluate whether the connector readiness test passed or failed. A passing report should have all of the following: - All streams tested successfully (marked with ✅) @@ -30,75 +51,161 @@ - Zero records extracted from streams - Error messages indicating failure -Based on the connector readiness report below, classify whether the test PASSED or FAILED. Your answer should be a single word, either "PASSED" or "FAILED". +**Score: 1.0 if PASSED, 0.0 if FAILED** -{readiness_report} -""" +Evaluate what percentage of expected streams are present in the manifest. +Instructions: +- Extract all stream names from the manifest YAML (look for `streams:` section, each with a `name:` field) +- Compare against the expected streams list +- Count only exact name matches (case-sensitive) +- Calculate: (number of expected streams found) / (total expected streams) -def readiness_eval(output: dict) -> int: - """Create Phoenix LLM classifier for readiness evaluation. Return 1 if PASSED, 0 if FAILED.""" +Example: +- Expected: ["posts", "users", "comments"] +- Found in manifest: ["posts", "comments", "albums"] +- Matched: ["posts", "comments"] +- Score: 2/3 = 0.67 - if output is None: - logger.warning("Output is None, cannot evaluate readiness") - return 0 +**Score: float between 0.0 and 1.0** - readiness_report = output.get("artifacts", {}).get("readiness_report", None) - if readiness_report is None: - logger.warning("No readiness report found") - return 0 +--- - rails = ["PASSED", "FAILED"] +**Input Data:** - eval_df = llm_classify( - model=OpenAIModel(model=READINESS_EVAL_MODEL), - data=pd.DataFrame([{"readiness_report": readiness_report}]), - template=READINESS_EVAL_TEMPLATE, - rails=rails, - provide_explanation=True, - ) +Readiness Report: +``` +{readiness_report} +``` - logger.info(f"Readiness evaluation result: {eval_df}") +Manifest: +``` +{manifest} +``` - label = eval_df["label"][0] - score = 1 if label.upper() == "PASSED" else 0 +Expected Streams: {expected_streams} - return score +--- +**Instructions:** +Carefully analyze the artifacts above and classify the readiness as either "PASSED" or "FAILED", and calculate the streams percentage. -def streams_eval(expected: dict, output: dict) -> float: - """Evaluate if all expected streams were built. Return the percentage of expected streams that are present in available streams.""" +Your response must be in this exact format (one word for readiness, one number for streams): +READINESS: +STREAMS: +""" + +def unified_eval(expected: dict, output: dict) -> dict: + """Unified LLM-based evaluator for all connector quality criteria. + + This evaluator replaces the previous hybrid approach (readiness_eval + streams_eval) + with a single LLM-based evaluation using GPT-4o. It evaluates both readiness + (pass/fail based on test results) and stream presence (percentage match against + expected streams) in a single LLM call. + + The evaluator uses a structured prompt template that instructs the LLM to analyze + connector artifacts (readiness report and manifest) and return scores in a + standardized format that can be parsed programmatically. + + Args: + expected: Dict containing expected evaluation criteria. Should include an + 'expected' key with JSON string containing 'expected_streams' list. + output: Dict containing task output with 'artifacts' key containing: + - 'readiness_report': Markdown report of connector test results + - 'manifest': YAML manifest defining connector streams and config + + Returns: + Dict with two float scores: + - 'readiness': 0.0 (failed) or 1.0 (passed) based on test results + - 'streams': 0.0-1.0 representing percentage of expected streams found + + Example: + >>> expected = {"expected": '{"expected_streams": ["users", "posts"]}'} + >>> output = {"artifacts": {"readiness_report": "...", "manifest": "..."}} + >>> scores = unified_eval(expected, output) + >>> scores + {'readiness': 1.0, 'streams': 1.0} + """ if output is None: - logger.warning("Output is None, cannot evaluate streams") - return 0.0 + logger.warning("Output is None, cannot evaluate") + return {"readiness": 0.0, "streams": 0.0} + + readiness_report = output.get("artifacts", {}).get("readiness_report", "Not available") + manifest = output.get("artifacts", {}).get("manifest", "Not available") + + if readiness_report == "Not available": + logger.warning("No readiness report found") - manifest_str = output.get("artifacts", {}).get("manifest", None) - if manifest_str is None: + if manifest == "Not available": logger.warning("No manifest found") - return 0 - manifest = yaml.safe_load(manifest_str) - available_streams = manifest.get("streams", []) - available_stream_names = [stream.get("name", "") for stream in available_streams] - logger.info(f"Available stream names: {available_stream_names}") + try: + expected_obj = json.loads(expected.get("expected", "{}")) + except json.JSONDecodeError as e: + logger.error(f"Failed to parse expected JSON: {e}", exc_info=True) + return {"readiness": 0.0, "streams": 0.0} + expected_streams = expected_obj.get("expected_streams", []) - expected_obj = json.loads(expected.get("expected", "{}")) - expected_stream_names = expected_obj.get("expected_streams", []) - logger.info(f"Expected stream names: {expected_stream_names}") + logger.info(f"Expected streams: {expected_streams}") # Set attributes on span for visibility span = get_current_span() - span.set_attribute("available_stream_names", available_stream_names) - span.set_attribute("expected_stream_names", expected_stream_names) - - if not expected_stream_names: - logger.warning("No expected streams found") - return 0.0 - - # Calculate the percentage of expected streams that are present in available streams - matched_streams = set(available_stream_names) & set(expected_stream_names) - logger.info(f"Matched streams: {matched_streams}") - percent_matched = len(matched_streams) / len(expected_stream_names) - logger.info(f"Percent matched: {percent_matched}") - return float(percent_matched) + span.set_attribute("expected_streams", expected_streams) + + if not expected_streams: + logger.warning("No expected streams provided") + + try: + eval_df = llm_classify( + model=OpenAIModel(model=UNIFIED_EVAL_MODEL), + data=pd.DataFrame( + [ + { + "readiness_report": readiness_report, + "manifest": manifest, + "expected_streams": json.dumps(expected_streams), + } + ] + ), + template=UNIFIED_EVAL_TEMPLATE, + rails=None, + provide_explanation=True, + ) + + logger.info(f"Unified evaluation result: {eval_df}") + + response_text = eval_df["label"][0] + + if response_text is None: + logger.error("LLM returned None response") + return {"readiness": 0.0, "streams": 0.0} + + readiness_score = 0.0 + streams_score = 0.0 + + for line in response_text.strip().split("\n"): + line = line.strip() + if line.startswith("READINESS:"): + readiness_value = line.split(":", 1)[1].strip().upper() + readiness_score = 1.0 if readiness_value == "PASSED" else 0.0 + elif line.startswith("STREAMS:"): + streams_value = line.split(":", 1)[1].strip() + try: + streams_score = float(streams_value) + streams_score = max(0.0, min(1.0, streams_score)) + except ValueError: + logger.warning(f"Could not parse streams score from: {streams_value}") + streams_score = 0.0 + + logger.info(f"Parsed readiness score: {readiness_score}") + logger.info(f"Parsed streams score: {streams_score}") + + span.set_attribute("readiness_score", readiness_score) + span.set_attribute("streams_score", streams_score) + + return {"readiness": readiness_score, "streams": streams_score} + + except Exception as e: + logger.error(f"Error during unified evaluation: {e}", exc_info=True) + return {"readiness": 0.0, "streams": 0.0} diff --git a/connector_builder_agents/src/evals/phoenix_run.py b/connector_builder_agents/src/evals/phoenix_run.py index 020c307..3f64c1d 100644 --- a/connector_builder_agents/src/evals/phoenix_run.py +++ b/connector_builder_agents/src/evals/phoenix_run.py @@ -24,7 +24,7 @@ from phoenix.otel import register from .dataset import get_or_create_phoenix_dataset -from .evaluators import READINESS_EVAL_MODEL, readiness_eval, streams_eval +from .evaluators import UNIFIED_EVAL_MODEL, unified_eval from .summary import generate_markdown_summary from .task import EVAL_DEVELOPER_MODEL, EVAL_MANAGER_MODEL, run_connector_build_task @@ -51,7 +51,7 @@ async def main(connectors: list[str] | None = None, *, dataset_prefix: str): experiment_id = str(uuid.uuid4())[:5] experiment_name = f"builder-evals-{experiment_id}" - evaluators = [readiness_eval, streams_eval] + evaluators = [unified_eval] logger.info(f"Using evaluators: {[eval.__name__ for eval in evaluators]}") @@ -66,7 +66,7 @@ async def main(connectors: list[str] | None = None, *, dataset_prefix: str): experiment_metadata={ "developer_model": EVAL_DEVELOPER_MODEL, "manager_model": EVAL_MANAGER_MODEL, - "readiness_eval_model": READINESS_EVAL_MODEL, + "unified_eval_model": UNIFIED_EVAL_MODEL, }, timeout=1800, )