Skip to content
61 changes: 61 additions & 0 deletions airbyte/_util/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1775,3 +1775,64 @@ def get_workspace_organization_info(
client_id=client_id,
client_secret=client_secret,
)


def get_connection_state(
connection_id: str,
*,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
) -> dict[str, Any]:
"""Get the state for a connection.

Uses the Config API endpoint: POST /v1/state/get

Args:
connection_id: The connection ID to get state for
api_root: The API root URL
client_id: OAuth client ID
client_secret: OAuth client secret

Returns:
Dictionary containing the connection state.
"""
return _make_config_api_request(
path="/state/get",
json={"connectionId": connection_id},
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)


def get_connection_catalog(
connection_id: str,
*,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
) -> dict[str, Any]:
"""Get the configured catalog for a connection.

Uses the Config API endpoint: POST /v1/web_backend/connections/get

This returns the full connection info including the syncCatalog field,
which contains the configured catalog with full stream schemas.

Args:
connection_id: The connection ID to get catalog for
api_root: The API root URL
client_id: OAuth client ID
client_secret: OAuth client secret

Returns:
Dictionary containing the connection info with syncCatalog.
"""
return _make_config_api_request(
path="/web_backend/connections/get",
json={"connectionId": connection_id, "withRefreshedCatalog": False},
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)
44 changes: 43 additions & 1 deletion airbyte/cloud/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from __future__ import annotations

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

from airbyte._util import api_util
from airbyte.cloud.connectors import CloudDestination, CloudSource
Expand Down Expand Up @@ -280,6 +280,48 @@ def get_sync_result(
job_id=job_id,
)

# Artifacts

def get_state_artifacts(self) -> list[dict[str, Any]] | None:
"""Get the connection state artifacts.

Returns the persisted state for this connection, which can be used
when debugging incremental syncs.

Uses the Config API endpoint: POST /v1/state/get

Returns:
List of state objects for each stream, or None if no state is set.
"""
state_response = api_util.get_connection_state(
connection_id=self.connection_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
)
if state_response.get("stateType") == "not_set":
return None
return state_response.get("streamState", [])

def get_catalog_artifact(self) -> dict[str, Any] | None:
"""Get the configured catalog for this connection.

Returns the full configured catalog (syncCatalog) for this connection,
including stream schemas, sync modes, cursor fields, and primary keys.

Uses the Config API endpoint: POST /v1/web_backend/connections/get

Returns:
Dictionary containing the configured catalog, or `None` if not found.
"""
connection_response = api_util.get_connection_catalog(
connection_id=self.connection_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
)
return connection_response.get("syncCatalog")

def rename(self, name: str) -> CloudConnection:
"""Rename the connection.

Expand Down
51 changes: 50 additions & 1 deletion airbyte/mcp/cloud_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""Airbyte Cloud MCP operations."""

from pathlib import Path
from typing import Annotated, Any, cast
from typing import Annotated, Any, Literal, cast

from fastmcp import FastMCP
from pydantic import BaseModel, Field
Expand Down Expand Up @@ -2284,6 +2284,55 @@ def set_cloud_connection_selected_streams(
)


@mcp_tool(
domain="cloud",
read_only=True,
idempotent=True,
open_world=True,
extra_help_text=CLOUD_AUTH_TIP_TEXT,
)
def get_connection_artifact(
connection_id: Annotated[
str,
Field(description="The ID of the Airbyte Cloud connection."),
],
artifact_type: Annotated[
Literal["state", "catalog"],
Field(description="The type of artifact to retrieve: 'state' or 'catalog'."),
],
*,
workspace_id: Annotated[
str | None,
Field(
description=WORKSPACE_ID_TIP_TEXT,
default=None,
),
],
) -> dict[str, Any] | list[dict[str, Any]]:
"""Get a connection artifact (state or catalog) from Airbyte Cloud.

Retrieves the specified artifact for a connection:
- 'state': Returns the persisted state for incremental syncs as a list of
stream state objects, or {"ERROR": "..."} if no state is set.
- 'catalog': Returns the configured catalog (syncCatalog) as a dict,
or {"ERROR": "..."} if not found.
"""
workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
connection = workspace.get_connection(connection_id=connection_id)

if artifact_type == "state":
result = connection.get_state_artifacts()
if result is None:
return {"ERROR": "No state is set for this connection (stateType: not_set)"}
return result

# artifact_type == "catalog"
result = connection.get_catalog_artifact()
if result is None:
return {"ERROR": "No catalog found for this connection"}
return result


def register_cloud_ops_tools(app: FastMCP) -> None:
"""@private Register tools with the FastMCP app.

Expand Down