Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions connector_builder_agents/prompts/phase-1-stream-read.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
# Phase 1: First Successful Stream Read

You are working on Phase 1 of connector development. Your goal is to establish basic connectivity and successfully read records from one stream.
You are managing the Phase 1 of connector development. The goal is to establish basic connectivity and successfully read records from one stream.

## Objectives
- Research the target API and understand its structure
- Create initial manifest using the scaffold tool
- Determine and specify the primary key field for the stream, if one exists
- Identify and define the correct path within the API response that contains the list of records to be extracted
- Set up proper authentication (request secrets from user if needed)
- Configure one stream without pagination initially
- Validate that you can read records from this stream

## Key MCP Tools for This Phase
- `create_connector_manifest_scaffold` - Generate initial manifest structure
- `populate_dotenv_missing_secrets_stubs` - Set up authentication secrets
- `get_connector_builder_docs` - Get relevant documentation if needed, (e.g. `RecordSelector`, `DpathExtractor`, etc)
- `find_connectors_by_class_name` - Find example usage of components (e.g. `DpathExtractor`)
- `execute_stream_test_read` - Test reading from the stream
- `validate_manifest` - Ensure manifest structure is correct

## Success Criteria
- Primary key is defined, if applicable
- Authentication is working correctly
- Can read at least a few records from one stream
- No pagination configured yet (that's Phase 2)
Expand All @@ -30,6 +35,6 @@ You are working on Phase 1 of connector development. Your goal is to establish b
6. Update checklist.md with progress

## Next Phase
Once you can successfully read records from one stream, the manager will delegate Phase 2 to add pagination support.
Once the developer can successfully read records from one stream, you will begin Phase 2 to add pagination support.

Remember to update your checklist.md file as you complete each step.
7 changes: 4 additions & 3 deletions connector_builder_agents/prompts/phase-2-pagination.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Phase 2: Working Pagination

You are working on Phase 2 of connector development. Your goal is to implement and validate pagination for the working stream from Phase 1.
You are managing the Phase 2 of connector development. The goal is to implement and validate pagination for the working stream from Phase 1.

## Objectives
- Add pagination configuration to the manifest
Expand All @@ -12,7 +12,8 @@ You are working on Phase 2 of connector development. Your goal is to implement a
## Key MCP Tools for This Phase
- `execute_stream_test_read` - Test reading with pagination
- `validate_manifest` - Ensure manifest structure is correct
- `get_connector_builder_docs` - Get pagination documentation if needed
- `get_connector_builder_docs` - Get pagination documentation
- `find_connectors_by_class_name` - Find example usage of components (e.g. `DefaultPaginator`)

## Success Criteria
- Pagination works correctly and can read multiple pages
Expand All @@ -28,6 +29,6 @@ You are working on Phase 2 of connector development. Your goal is to implement a
5. Update checklist.md with progress

## Next Phase
Once pagination is working correctly, the manager will delegate Phase 3 to add all remaining streams.
Once pagination is working correctly, the you will begin Phase 3 to add all remaining streams.

Remember to disable records and raw responses when testing high record counts to avoid overloading the context window.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Phase 3: Add Remaining Streams

You are working on Phase 3 of connector development. Your goal is to add and validate all remaining streams for the connector.
You are managing Phase 3 of connector development. Your goal is to add and validate all remaining streams for the connector.

## Objectives
- Identify all available streams from API documentation
Expand Down
3 changes: 2 additions & 1 deletion connector_builder_agents/prompts/root-prompt.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ You will use your web-search MCP tool (or another method if you have one) in ord
- `manifest.yaml` (start with an empty file until you know the expected structure)
- `checklist.md` (mentioned above)
- If any of the above files already exist, please delete them before you begin.
- Many of your tools will except either a manifest.yaml path or a yaml string. You should prefer to send a path and not the string, in order to speed up the process and to reduce context and token usage.
- Be sure to create files that have the exact names specified above.
- Many of your tools will except either a `manifest.yaml` path or a yaml string. You should prefer to send a path and not the string, in order to speed up the process and to reduce context and token usage.

3. **Process**
- After you have created these files, use your checklist, the checklist tool, and other provided documentation tools for an overview of the steps needed.
Expand Down
103 changes: 69 additions & 34 deletions connector_builder_agents/src/agents.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""Agent implementations for the Airbyte connector builder."""

from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext
from pydantic_ai.common_tools.duckduckgo import duckduckgo_search_tool

from .constants import PHASE_1_PROMPT_FILE_PATH, PHASE_2_PROMPT_FILE_PATH, PHASE_3_PROMPT_FILE_PATH
from .guidance import get_default_developer_prompt, get_default_manager_prompt
from .tools import (
SessionState,
Expand All @@ -20,6 +22,41 @@
)


class DelegatedDeveloperTask(BaseModel):
"""Input data for handoff from manager to developer."""

api_name: str = Field(description="The name of the API to build a connector for.")
assignment_title: str = Field(description="The title of the task assigned to the developer.")
assignment_description: str = Field(
description="The description of the task assigned to the developer."
)


class ManagerHandoffInput(BaseModel):
"""Input data for handoff from developer back to manager."""

short_status: str = Field(description="A short status message for the build.")
detailed_progress_update: str = Field(description="A detailed progress update for the build.")
is_full_success: bool = Field(
default=False, description="Whether the current task was successful."
)
is_partial_success: bool = Field(
default=False, description="Whether the current task was partially successful."
)
is_blocked: bool = Field(default=False, description="Whether the current task is blocked.")


class ManagerTaskOutput(BaseModel):
"""Output data for a manager iteration."""

short_status: str = Field(description="A short status message for the build.")
detailed_progress_update: str = Field(description="A detailed progress update for the build.")
phase_1_completed: bool = Field(default=False, description="Whether phase 1 was completed.")
phase_2_completed: bool = Field(default=False, description="Whether phase 2 was completed.")
phase_3_completed: bool = Field(default=False, description="Whether phase 3 was completed.")
is_blocked: bool = Field(default=False, description="Whether the build is blocked.")


def create_developer_agent(
model: str,
api_name: str,
Expand All @@ -28,7 +65,7 @@ def create_developer_agent(
mcp_servers: list,
) -> Agent:
"""Create the developer agent that executes specific phases."""
return Agent(
developer_agent = Agent(
model,
name="MCP Connector Developer",
deps_type=SessionState,
Expand All @@ -44,8 +81,11 @@ def create_developer_agent(
duckduckgo_search_tool(),
],
toolsets=mcp_servers,
output_type=ManagerHandoffInput,
)

return developer_agent


def create_manager_agent(
developer_agent: Agent,
Expand Down Expand Up @@ -75,14 +115,14 @@ def create_manager_agent(
create_get_progress_log_text_tool(session_state),
],
toolsets=mcp_servers,
output_type=ManagerTaskOutput,
)

@manager_agent.tool
async def delegate_to_developer(
ctx: RunContext[SessionState],
assignment_title: str,
assignment_description: str,
) -> str:
delegated_developer_task: DelegatedDeveloperTask,
) -> DelegatedDeveloperTask:
"""Delegate work to the developer agent.

Args:
Expand All @@ -92,54 +132,49 @@ async def delegate_to_developer(
"""
update_progress_log(
f"🤝 [MANAGER → DEVELOPER] Manager delegating task to developer agent."
f"\n Task Name: {assignment_title}"
f"\n Task Description: {assignment_description}",
f"\n Task Name: {delegated_developer_task.assignment_title}"
f"\n Task Description: {delegated_developer_task.assignment_description}",
ctx.deps,
)

result = await developer_agent.run(
assignment_description,
delegated_developer_task.assignment_description,
message_history=ctx.deps.message_history,
deps=ctx.deps,
)

update_progress_log(
f"🤝 [DEVELOPER → MANAGER] Developer completed task: {assignment_title}"
f"🤝 [DEVELOPER → MANAGER] Developer completed task: {delegated_developer_task.assignment_title}"
f"\n Result: {result.output}",
ctx.deps,
)

ctx.deps.message_history.extend(result.new_messages())

return str(result.output)
return result.output

@developer_agent.tool
async def report_back_to_manager(
@manager_agent.tool
async def start_phase_1(
ctx: RunContext[SessionState],
short_status: str,
detailed_progress_update: str,
is_full_success: bool = False,
is_partial_success: bool = False,
is_blocked: bool = False,
) -> str:
"""Report progress or issues back to the manager agent.
"""Start phase 1 of the connector build. Returns the prompt for phase 1."""
update_progress_log("🔧 [Manager] MCP Tool call: start_phase_1", ctx.deps)
return PHASE_1_PROMPT_FILE_PATH.read_text(encoding="utf-8")

Args:
short_status: One sentence summary of what was accomplished.
detailed_progress_update: A detailed update on progress and next steps.
is_full_success: True if the phase is fully completed.
is_partial_success: True if partially done.
is_blocked: True if encountered a blocker.
"""
update_progress_log(
f"🤝 [DEVELOPER → MANAGER] Developer handing back control to manager."
f"\n Summary of status: {short_status}"
f"\n Partial success: {is_partial_success}"
f"\n Full success: {is_full_success}"
f"\n Blocked: {is_blocked}"
f"\n Detailed progress update: {detailed_progress_update}",
ctx.deps,
)
return "Status reported to manager"
@manager_agent.tool
async def start_phase_2(
ctx: RunContext[SessionState],
) -> str:
"""Start phase 2 of the connector build. Returns the prompt for phase 2."""
update_progress_log("🔧 [Manager] MCP Tool call: start_phase_2", ctx.deps)
return PHASE_2_PROMPT_FILE_PATH.read_text(encoding="utf-8")

@manager_agent.tool
async def start_phase_3(
ctx: RunContext[SessionState],
) -> str:
"""Start phase 3 of the connector build. Returns the prompt for phase 3."""
update_progress_log("🔧 [Manager] MCP Tool call: start_phase_3", ctx.deps)
return PHASE_3_PROMPT_FILE_PATH.read_text(encoding="utf-8")

return manager_agent
3 changes: 3 additions & 0 deletions connector_builder_agents/src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
load_dotenv()

ROOT_PROMPT_FILE_PATH = Path(__file__).parent.parent / "prompts" / "root-prompt.md"
PHASE_1_PROMPT_FILE_PATH = Path(__file__).parent.parent / "prompts" / "phase-1-stream-read.md"
PHASE_2_PROMPT_FILE_PATH = Path(__file__).parent.parent / "prompts" / "phase-2-pagination.md"
PHASE_3_PROMPT_FILE_PATH = Path(__file__).parent.parent / "prompts" / "phase-3-remaining-streams.md"
ROOT_PROMPT_FILE_STR = ROOT_PROMPT_FILE_PATH.read_text(encoding="utf-8")
MAX_CONNECTOR_BUILD_STEPS = 100
DEFAULT_CONNECTOR_BUILD_API_NAME: str = "JSONPlaceholder API"
Expand Down
Loading
Loading