Skip to content
5 changes: 2 additions & 3 deletions connector_builder_agents/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"python-dotenv>=1.1.1",
"openai-agents>=0.3.3",
"mcp-agent>=0.1.15", # Transitive dependency of openai-agents
"pydantic-ai>=0.0.14,<1.0",
"pydantic-ai-slim[openai,duckduckgo]>=0.0.14,<1.0",
Comment on lines +9 to +10
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Remove the duplicate Pydantic AI distributions.

pydantic-ai and pydantic-ai-slim[...] both ship the same pydantic_ai package; installing both forces pip to pick one wheel and the other will repeatedly reinstall/overwrite during builds. Choose a single distribution (likely the slim extra) to avoid installation failures.

     "python-dotenv>=1.1.1",
-    "pydantic-ai>=0.0.14,<1.0",
     "pydantic-ai-slim[openai,duckduckgo]>=0.0.14,<1.0",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"pydantic-ai>=0.0.14,<1.0",
"pydantic-ai-slim[openai,duckduckgo]>=0.0.14,<1.0",
"python-dotenv>=1.1.1",
"pydantic-ai-slim[openai,duckduckgo]>=0.0.14,<1.0",
🤖 Prompt for AI Agents
In connector_builder_agents/pyproject.toml around lines 9-10, the file lists
both "pydantic-ai" and "pydantic-ai-slim[openai,duckduckgo]" which are the same
package and conflict during installs; remove the duplicate "pydantic-ai" entry
and keep the slim distribution with the needed extras
(pydantic-ai-slim[openai,duckduckgo]>=0.0.14,<1.0), then update the lockfile
(poetry lock / pip-compile / reinstall deps) so the dependency graph reflects
the change.

"pydantic>=2.11.9,<3.0",
"pyyaml>=6.0.1",
"pandas>=2.0.0",
"arize-phoenix-client>=1.19.1",
"arize-phoenix-otel>=0.13.1",
"arize-phoenix-evals>=2.1.0",
"openinference-instrumentation-openai>=0.1.33",
"openinference-instrumentation-openai-agents>=1.3.0",
"emoji>=2.15.0,<3.0",
]

Expand Down
172 changes: 78 additions & 94 deletions connector_builder_agents/src/agents.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,9 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""Agent implementations for the Airbyte connector builder."""

from collections.abc import Callable
from pydantic_ai import Agent, RunContext
from pydantic_ai.common_tools.duckduckgo import duckduckgo_search_tool

from agents import Agent as OpenAIAgent
from agents import (
WebSearchTool,
handoff,
)
from pydantic.main import BaseModel

# from agents import OpenAIConversationsSession
from .guidance import get_default_developer_prompt, get_default_manager_prompt
from .tools import (
SessionState,
Expand All @@ -33,53 +26,49 @@ def create_developer_agent(
additional_instructions: str,
session_state: SessionState,
mcp_servers: list,
) -> OpenAIAgent:
) -> Agent:
"""Create the developer agent that executes specific phases."""
return OpenAIAgent(
developer_agent = Agent(
model,
name="MCP Connector Developer",
instructions=get_default_developer_prompt(
deps_type=SessionState,
system_prompt=get_default_developer_prompt(
api_name=api_name,
instructions=additional_instructions,
project_directory=session_state.workspace_dir.absolute(),
),
mcp_servers=mcp_servers,
model=model,
tools=[
create_log_progress_milestone_from_developer_tool(session_state),
create_log_problem_encountered_by_developer_tool(session_state),
create_log_tool_failure_tool(session_state),
WebSearchTool(),
duckduckgo_search_tool(),
],
)

for mcp_server in mcp_servers:
developer_agent.toolsets.append(mcp_server)

return developer_agent


def create_manager_agent(
developer_agent: OpenAIAgent,
developer_agent: Agent,
model: str,
api_name: str,
additional_instructions: str,
session_state: SessionState,
mcp_servers: list,
) -> OpenAIAgent:
) -> Agent:
"""Create the manager agent that orchestrates the 3-phase workflow."""
return OpenAIAgent(
manager_agent = Agent(
model,
name="Connector Builder Manager",
instructions=get_default_manager_prompt(
deps_type=SessionState,
system_prompt=get_default_manager_prompt(
api_name=api_name,
instructions=additional_instructions,
project_directory=session_state.workspace_dir.absolute(),
),
handoffs=[
handoff(
agent=developer_agent,
tool_name_override="delegate_to_developer",
tool_description_override="Delegating work to the developer agent",
input_type=DelegatedDeveloperTask,
on_handoff=create_on_developer_delegation(session_state),
),
],
mcp_servers=mcp_servers,
model=model,
tools=[
create_mark_job_success_tool(session_state),
create_mark_job_failed_tool(session_state),
Expand All @@ -91,77 +80,72 @@ def create_manager_agent(
],
)


class DelegatedDeveloperTask(BaseModel):
"""Input data for handoff from manager to developer."""

api_name: str
assignment_title: str
assignment_description: str


class ManagerHandoffInput(BaseModel):
"""Input data for handoff from developer back to manager."""

short_status: str
detailed_progress_update: str
is_full_success: bool
is_partial_success: bool
is_blocked: bool


def create_on_developer_delegation(session_state: SessionState) -> Callable:
"""Create an on_developer_delegation callback bound to a specific session state."""

async def on_developer_delegation(ctx, input_data: DelegatedDeveloperTask) -> None:
for mcp_server in mcp_servers:
manager_agent.toolsets.append(mcp_server)

@manager_agent.tool
async def delegate_to_developer(
ctx: RunContext[SessionState],
assignment_title: str,
assignment_description: str,
) -> str:
"""Delegate work to the developer agent.

Args:
assignment_title: Short title or key for this developer assignment.
assignment_description: Detailed description of the task assigned to the developer,
including all context and success criteria they need to complete it.
"""
update_progress_log(
f"🤝 [MANAGER → DEVELOPER] Manager delegating task to developer agent."
f"\n Task Name: {input_data.assignment_title}"
f"\n Task Description: {input_data.assignment_description}",
session_state,
f"\n Task Name: {assignment_title}"
f"\n Task Description: {assignment_description}",
ctx.deps,
)

return on_developer_delegation

result = await developer_agent.run(
assignment_description,
message_history=ctx.deps.message_history,
deps=ctx.deps,
)

def create_on_manager_handback(session_state: SessionState):
"""Create an on_manager_handback callback bound to a specific session state."""
update_progress_log(
f"🤝 [DEVELOPER → MANAGER] Developer completed task: {assignment_title}"
f"\n Result: {result.output}",
ctx.deps,
)

async def on_manager_handback(ctx, input_data: ManagerHandoffInput) -> None:
ctx.deps.message_history.extend(result.new_messages())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

❓ Verification inconclusive

Monitor unbounded message history growth.

The message history is extended indefinitely without pruning or size limits. In long-running sessions with many delegations, this could lead to excessive memory consumption and slower performance as the context grows.

As noted in the PR objectives checklist, message history growth should be monitored. Consider implementing a mechanism to prune old messages or set a maximum history size.

Verify whether there's a history management mechanism elsewhere in the codebase:


🏁 Script executed:

#!/bin/bash
# Search for message_history pruning, cleanup, or size management
rg -n -C5 'message_history.*(?:prune|clear|trim|limit|max|size)' --type py

Length of output: 0


#!/bin/bash
# Search for all occurrences of message_history and any pruning/size-management logic
rg -n 'message_history' --type py
rg -nP '(?:message_history|history).*\b(prune|trim|limit|max|size)\b' --type py -C5

Limit message history growth.

In connector_builder_agents/src/agents.py:118, ctx.deps.message_history.extend(result.new_messages()) is unbounded. Add pruning or enforce a maximum history size to prevent unbounded memory usage.

🤖 Prompt for AI Agents
In connector_builder_agents/src/agents.py around line 118,
ctx.deps.message_history.extend(result.new_messages()) allows unbounded growth
of the message history; after extending, enforce a maximum history size (e.g., a
configurable MAX_HISTORY constant or setting) and prune older entries to keep
only the most recent N messages, or replace the list with a
collections.deque(maxlen=...) to automatically discard oldest entries — update
code to extend/add new messages then trim the list to the max length (or convert
to deque) so memory usage is bounded.


return str(result.output)

@developer_agent.tool
async def report_back_to_manager(
ctx: RunContext[SessionState],
short_status: str,
detailed_progress_update: str,
is_full_success: bool = False,
is_partial_success: bool = False,
is_blocked: bool = False,
) -> str:
"""Report progress or issues back to the manager agent.

Args:
short_status: One sentence summary of what was accomplished.
detailed_progress_update: A detailed update on progress and next steps.
is_full_success: True if the phase is fully completed.
is_partial_success: True if partially done.
is_blocked: True if encountered a blocker.
"""
update_progress_log(
f"🤝 [DEVELOPER → MANAGER] Developer handing back control to manager."
f"\n Summary of status: {input_data.short_status}"
f"\n Partial success: {input_data.is_partial_success}"
f"\n Full success: {input_data.is_full_success}"
f"\n Blocked: {input_data.is_blocked}"
f"\n Detailed progress update: {input_data.detailed_progress_update}",
session_state,
f"\n Summary of status: {short_status}"
f"\n Partial success: {is_partial_success}"
f"\n Full success: {is_full_success}"
f"\n Blocked: {is_blocked}"
f"\n Detailed progress update: {detailed_progress_update}",
ctx.deps,
)
return "Status reported to manager"

return on_manager_handback


def add_handback_to_manager(
developer_agent: OpenAIAgent,
manager_agent: OpenAIAgent,
session_state: SessionState,
) -> None:
"""Add a handoff from the developer back to the manager to report progress."""
developer_agent.handoffs.extend(
[
handoff(
agent=manager_agent,
tool_name_override="report_back_to_manager",
tool_description_override="Report progress or issues back to the manager agent",
input_type=ManagerHandoffInput,
on_handoff=create_on_manager_handback(session_state),
),
handoff(
agent=manager_agent,
tool_name_override="report_task_completion_to_manager",
tool_description_override="Report task completion to the manager agent",
input_type=ManagerHandoffInput,
on_handoff=create_on_manager_handback(session_state),
),
]
)
return manager_agent
80 changes: 2 additions & 78 deletions connector_builder_agents/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,95 +2,19 @@
"""Constants for the Airbyte connector builder agents."""

import os
import subprocess
from pathlib import Path

from agents import (
set_default_openai_api,
set_default_openai_client,
set_tracing_disabled,
)
from dotenv import load_dotenv
from openai import AsyncOpenAI


# Initialize env vars:
load_dotenv()

GH_MODELS_OPENAI_BASE_URL: str = "https://models.github.ai/inference"
OPENAI_BASE_URL_ENV_VAR: str = "OPENAI_BASE_URL"
OPENAI_API_KEY_ENV_VAR: str = "OPENAI_API_KEY"
OPENAI_BASE_URL: str = "https://api.openai.com/v1"

ROOT_PROMPT_FILE_PATH = Path(__file__).parent.parent / "prompts" / "root-prompt.md"
ROOT_PROMPT_FILE_STR = ROOT_PROMPT_FILE_PATH.read_text(encoding="utf-8")
MAX_CONNECTOR_BUILD_STEPS = 100
DEFAULT_CONNECTOR_BUILD_API_NAME: str = "JSONPlaceholder API"
DEFAULT_DEVELOPER_MODEL: str = (
"gpt-5" # "gpt-4.1" # "o4-mini", "gpt-4.1-turbo", "openai/gpt-4o-mini", "gpt-4o-mini"
)
DEFAULT_MANAGER_MODEL: str = (
"gpt-5" # "gpt-4.1" # "o4-mini", "gpt-4.1-turbo", "openai/gpt-4o-mini", "gpt-4o-mini"
)
DEFAULT_DEVELOPER_MODEL: str = "openai:gpt-4o"
DEFAULT_MANAGER_MODEL: str = "openai:gpt-4o"
AUTO_OPEN_TRACE_URL: bool = os.environ.get("AUTO_OPEN_TRACE_URL", "1").lower() in {"1", "true"}

HEADLESS_BROWSER = True


def initialize_models() -> None:
"""Initialize LLM models based on environment variables."""
global OPENAI_BASE_URL

if OPENAI_BASE_URL_ENV_VAR in os.environ:
print("⚙️ Detected custom OpenAI API root in environment.")
OPENAI_BASE_URL = os.environ[OPENAI_BASE_URL_ENV_VAR]
if "github.ai" in OPENAI_BASE_URL and OPENAI_BASE_URL != GH_MODELS_OPENAI_BASE_URL:
print(
f"⚠️ Warning: Detected GitHub Models endpoint but non-standard API root: {OPENAI_BASE_URL}. "
f"Recommended root URL is: {GH_MODELS_OPENAI_BASE_URL}"
)

if OPENAI_BASE_URL.lower() in {"gh", "github", "github models"}:
print(
f"Found GitHub Models endpoint alias: {OPENAI_BASE_URL}. "
f"Applying recommended Github Models URL root: {GH_MODELS_OPENAI_BASE_URL}"
)
OPENAI_BASE_URL = GH_MODELS_OPENAI_BASE_URL

if "github.ai" in OPENAI_BASE_URL and "OPENAI_API_KEY" not in os.environ:
print(
"GitHub Models endpoint detected but not API Root is set. "
"Attempting to extract token using `gh auth token` CLI command."
)

_ = subprocess.check_output(["gh", "auth", "status"])
openai_api_key: str = (
subprocess.check_output(["gh", "auth", "token"]).decode("utf-8").strip()
)
print(
"✅ Successfully extracted GitHub token from `gh` CLI: "
f"({openai_api_key[:4]}...{openai_api_key[-4:]})"
)
if not openai_api_key.startswith("sk-"):
raise ValueError(
"Extracted GitHub token does not appear to be valid. "
"Please ensure you have the GitHub CLI installed and authenticated."
)
os.environ["OPENAI_API_KEY"] = openai_api_key

print(f"ℹ️ Using Custom OpenAI-Compatible LLM Endpoint: {OPENAI_BASE_URL}")
github_models_client = AsyncOpenAI(
base_url=OPENAI_BASE_URL,
api_key=os.environ.get("OPENAI_API_KEY", None),
)
set_default_openai_client(
github_models_client,
use_for_tracing=False,
)
set_default_openai_api(
"chat_completions"
) # GH Models doesn't support 'responses' endpoint.
set_tracing_disabled(True) # Tracing not supported with GitHub Models endpoint.


initialize_models()
2 changes: 1 addition & 1 deletion connector_builder_agents/src/evals/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async def run_connector_build_task(dataset_row: dict) -> dict:
result = {
"workspace_dir": str(workspace_dir.absolute()),
"success": success,
"final_output": final_result.final_output if final_result else None,
"final_output": final_result.output if final_result else None,
"num_turns": num_turns,
"artifacts": {
"readiness_report": readiness_report_content,
Expand Down
3 changes: 0 additions & 3 deletions connector_builder_agents/src/guidance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

from pathlib import Path

from agents.extensions.handoff_prompt import RECOMMENDED_PROMPT_PREFIX

from .constants import ROOT_PROMPT_FILE_STR


Expand Down Expand Up @@ -70,7 +68,6 @@ def get_default_manager_prompt(
instructions=instructions,
),
get_project_directory_prompt(project_directory),
RECOMMENDED_PROMPT_PREFIX,
ROOT_PROMPT_FILE_STR,
]
)
Expand Down
Loading
Loading