diff --git a/connector_builder_agents/src/run.py b/connector_builder_agents/src/run.py index d415e60..d39baf6 100644 --- a/connector_builder_agents/src/run.py +++ b/connector_builder_agents/src/run.py @@ -1,12 +1,14 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. """Functions to run connector builder agents in different modalities.""" +import os import sys import time from pathlib import Path from agents import ( Agent, + OpenAIConversationsSession, Runner, SQLiteSession, gen_trace_id, @@ -14,7 +16,6 @@ ) from agents.result import RunResult -# from agents import OpenAIConversationsSession from ._util import get_secrets_dotenv, open_if_browser_available from .agents import ( add_handback_to_manager, @@ -46,6 +47,27 @@ def get_workspace_dir(session_id: str) -> Path: return workspace_dir +def create_session(session_id: str): + """Create a session based on OPENAI_SESSION_BACKEND environment variable. + + Args: + session_id: The session identifier + + Returns: + A session instance (either OpenAIConversationsSession or SQLiteSession) + """ + backend = os.getenv("OPENAI_SESSION_BACKEND", "openai").lower() + + if backend == "sqlite": + return SQLiteSession(session_id=session_id) + elif backend == "openai": + return OpenAIConversationsSession(conversation_id=session_id) + else: + raise ValueError( + f"Invalid OPENAI_SESSION_BACKEND value: '{backend}'. Must be 'openai' or 'sqlite'" + ) + + async def run_connector_build( api_name: str | None = None, instructions: str | None = None, @@ -125,7 +147,7 @@ async def run_interactive_build( workspace_dir = get_workspace_dir(session_id) session_state = create_session_state(workspace_dir) - session = SQLiteSession(session_id=session_id) + session = create_session(session_id) all_mcp_servers, _, _ = create_session_mcp_servers(session_state) agent = Agent( name="MCP Connector Builder", @@ -143,6 +165,13 @@ async def run_interactive_build( with trace(workflow_name="Interactive Connector Builder Session", trace_id=trace_id): trace_url = f"https://platform.openai.com/traces/trace?trace_id={trace_id}" + session_url = None + if isinstance(session, OpenAIConversationsSession): + conversation_id = await session._get_session_id() + session_url = f"https://platform.openai.com/logs/{conversation_id}" + update_progress_log(f"🔗 Session URL: {session_url}", session_state) + open_if_browser_available(session_url) + input_prompt: str = prompt while True: update_progress_log("\n⚙️ AI Agent is working...", session_state) @@ -177,6 +206,8 @@ async def run_interactive_build( input_prompt = input("\n👤 You: ") if input_prompt.lower() in {"exit", "quit"}: update_progress_log("☑️ Ending conversation...", session_state) + if session_url: + update_progress_log(f"🪵 Review session at: {session_url}", session_state) update_progress_log(f"🪵 Review trace logs at: {trace_url}", session_state) break @@ -184,6 +215,8 @@ async def run_interactive_build( update_progress_log( "\n🛑 Conversation terminated (ctrl+c input received).", session_state ) + if session_url: + update_progress_log(f"🪵 Review session at: {session_url}", session_state) update_progress_log(f"🪵 Review trace logs at: {trace_url}", session_state) sys.exit(0) finally: @@ -205,7 +238,7 @@ async def run_manager_developer_build( if session_id is None: session_id = generate_session_id() - session = SQLiteSession(session_id=session_id) + session = create_session(session_id) # Create workspace directory and session state workspace_dir = get_workspace_dir(session_id) @@ -244,6 +277,13 @@ async def run_manager_developer_build( with trace(workflow_name="Manager-Developer Connector Build", trace_id=trace_id): trace_url = f"https://platform.openai.com/traces/trace?trace_id={trace_id}" + session_url = None + if isinstance(session, OpenAIConversationsSession): + conversation_id = await session._get_session_id() + session_url = f"https://platform.openai.com/logs/{conversation_id}" + update_progress_log(f"🔗 Session URL: {session_url}", session_state) + open_if_browser_available(session_url) + run_prompt = ( f"You are working on a connector build task for the API: '{api_name or 'N/A'}'. " "Your goal is to ensure the successful completion of all objectives as instructed." @@ -291,9 +331,13 @@ async def run_manager_developer_build( except KeyboardInterrupt: update_progress_log("\n🛑 Build terminated (ctrl+c input received).", session_state) + if session_url: + update_progress_log(f"🪵 Review session at: {session_url}", session_state) update_progress_log(f"🪵 Review trace logs at: {trace_url}", session_state) sys.exit(0) except Exception as ex: update_progress_log(f"\n❌ Unexpected error during build: {ex}", session_state) + if session_url: + update_progress_log(f"🪵 Review session at: {session_url}", session_state) update_progress_log(f"🪵 Review trace logs at: {trace_url}", session_state) raise ex