Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 47 additions & 3 deletions connector_builder_agents/src/run.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""Functions to run connector builder agents in different modalities."""

import os
import sys
import time
from pathlib import Path

from agents import (
Agent,
OpenAIConversationsSession,
Runner,
SQLiteSession,
gen_trace_id,
trace,
)
from agents.result import RunResult

# from agents import OpenAIConversationsSession
from ._util import get_secrets_dotenv, open_if_browser_available
from .agents import (
add_handback_to_manager,
Expand Down Expand Up @@ -46,6 +47,27 @@ def get_workspace_dir(session_id: str) -> Path:
return workspace_dir


def create_session(session_id: str):
"""Create a session based on OPENAI_SESSION_BACKEND environment variable.

Args:
session_id: The session identifier

Returns:
A session instance (either OpenAIConversationsSession or SQLiteSession)
"""
backend = os.getenv("OPENAI_SESSION_BACKEND", "openai").lower()

if backend == "sqlite":
return SQLiteSession(session_id=session_id)
elif backend == "openai":
return OpenAIConversationsSession()
else:
raise ValueError(
f"Invalid OPENAI_SESSION_BACKEND value: '{backend}'. Must be 'openai' or 'sqlite'"
)


async def run_connector_build(
api_name: str | None = None,
instructions: str | None = None,
Expand Down Expand Up @@ -125,7 +147,7 @@ async def run_interactive_build(
workspace_dir = get_workspace_dir(session_id)
session_state = create_session_state(workspace_dir)

session = SQLiteSession(session_id=session_id)
session = create_session(session_id)
all_mcp_servers, _, _ = create_session_mcp_servers(session_state)
agent = Agent(
name="MCP Connector Builder",
Expand All @@ -143,6 +165,13 @@ async def run_interactive_build(
with trace(workflow_name="Interactive Connector Builder Session", trace_id=trace_id):
trace_url = f"https://platform.openai.com/traces/trace?trace_id={trace_id}"

session_url = None
if isinstance(session, OpenAIConversationsSession):
conversation_id = await session._get_session_id()
session_url = f"https://platform.openai.com/conversations/{conversation_id}"
update_progress_log(f"🔗 Session URL: {session_url}", session_state)
open_if_browser_available(session_url)

input_prompt: str = prompt
while True:
update_progress_log("\n⚙️ AI Agent is working...", session_state)
Expand Down Expand Up @@ -177,13 +206,17 @@ async def run_interactive_build(
input_prompt = input("\n👤 You: ")
if input_prompt.lower() in {"exit", "quit"}:
update_progress_log("☑️ Ending conversation...", session_state)
if session_url:
update_progress_log(f"🪵 Review session at: {session_url}", session_state)
update_progress_log(f"🪵 Review trace logs at: {trace_url}", session_state)
break

except KeyboardInterrupt:
update_progress_log(
"\n🛑 Conversation terminated (ctrl+c input received).", session_state
)
if session_url:
update_progress_log(f"🪵 Review session at: {session_url}", session_state)
update_progress_log(f"🪵 Review trace logs at: {trace_url}", session_state)
sys.exit(0)
finally:
Expand All @@ -205,7 +238,7 @@ async def run_manager_developer_build(
if session_id is None:
session_id = generate_session_id()

session = SQLiteSession(session_id=session_id)
session = create_session(session_id)

# Create workspace directory and session state
workspace_dir = get_workspace_dir(session_id)
Expand Down Expand Up @@ -244,6 +277,13 @@ async def run_manager_developer_build(
with trace(workflow_name="Manager-Developer Connector Build", trace_id=trace_id):
trace_url = f"https://platform.openai.com/traces/trace?trace_id={trace_id}"

session_url = None
if isinstance(session, OpenAIConversationsSession):
conversation_id = await session._get_session_id()
session_url = f"https://platform.openai.com/conversations/{conversation_id}"
update_progress_log(f"🔗 Session URL: {session_url}", session_state)
open_if_browser_available(session_url)

Comment on lines 280 to 286
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Apply the same resilience when fetching the manager session URL.

Same concern as above: wrap _get_session_id() so a logging failure doesn’t cancel the entire manager/developer build run.

-        if isinstance(session, OpenAIConversationsSession):
-            conversation_id = await session._get_session_id()
-            session_url = f"https://platform.openai.com/conversations/{conversation_id}"
-            update_progress_log(f"🔗 Session URL: {session_url}", session_state)
-            open_if_browser_available(session_url)
+        if isinstance(session, OpenAIConversationsSession):
+            try:
+                conversation_id = await session._get_session_id()
+            except Exception as exc:
+                update_progress_log(
+                    f"⚠️ Unable to retrieve session URL: {exc}",
+                    session_state,
+                )
+            else:
+                session_url = f"https://platform.openai.com/conversations/{conversation_id}"
+                update_progress_log(f"🔗 Session URL: {session_url}", session_state)
+                open_if_browser_available(session_url)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
session_url = None
if isinstance(session, OpenAIConversationsSession):
conversation_id = await session._get_session_id()
session_url = f"https://platform.openai.com/conversations/{conversation_id}"
update_progress_log(f"🔗 Session URL: {session_url}", session_state)
open_if_browser_available(session_url)
session_url = None
if isinstance(session, OpenAIConversationsSession):
try:
conversation_id = await session._get_session_id()
except Exception as exc:
update_progress_log(
f"⚠️ Unable to retrieve session URL: {exc}",
session_state,
)
else:
session_url = f"https://platform.openai.com/conversations/{conversation_id}"
update_progress_log(f"🔗 Session URL: {session_url}", session_state)
open_if_browser_available(session_url)

run_prompt = (
f"You are working on a connector build task for the API: '{api_name or 'N/A'}'. "
"Your goal is to ensure the successful completion of all objectives as instructed."
Expand Down Expand Up @@ -291,9 +331,13 @@ async def run_manager_developer_build(

except KeyboardInterrupt:
update_progress_log("\n🛑 Build terminated (ctrl+c input received).", session_state)
if session_url:
update_progress_log(f"🪵 Review session at: {session_url}", session_state)
update_progress_log(f"🪵 Review trace logs at: {trace_url}", session_state)
sys.exit(0)
except Exception as ex:
update_progress_log(f"\n❌ Unexpected error during build: {ex}", session_state)
if session_url:
update_progress_log(f"🪵 Review session at: {session_url}", session_state)
update_progress_log(f"🪵 Review trace logs at: {trace_url}", session_state)
raise ex
Loading