Skip to content

Commit 441105e

Browse files
feat(cloud): Add CloudConnection methods: get_state_artifacts() and get_catalog_artifact() (#906)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent a7c9ccf commit 441105e

File tree

3 files changed

+154
-2
lines changed

3 files changed

+154
-2
lines changed

airbyte/_util/api_util.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1775,3 +1775,64 @@ def get_workspace_organization_info(
17751775
client_id=client_id,
17761776
client_secret=client_secret,
17771777
)
1778+
1779+
1780+
def get_connection_state(
1781+
connection_id: str,
1782+
*,
1783+
api_root: str,
1784+
client_id: SecretString,
1785+
client_secret: SecretString,
1786+
) -> dict[str, Any]:
1787+
"""Get the state for a connection.
1788+
1789+
Uses the Config API endpoint: POST /v1/state/get
1790+
1791+
Args:
1792+
connection_id: The connection ID to get state for
1793+
api_root: The API root URL
1794+
client_id: OAuth client ID
1795+
client_secret: OAuth client secret
1796+
1797+
Returns:
1798+
Dictionary containing the connection state.
1799+
"""
1800+
return _make_config_api_request(
1801+
path="/state/get",
1802+
json={"connectionId": connection_id},
1803+
api_root=api_root,
1804+
client_id=client_id,
1805+
client_secret=client_secret,
1806+
)
1807+
1808+
1809+
def get_connection_catalog(
1810+
connection_id: str,
1811+
*,
1812+
api_root: str,
1813+
client_id: SecretString,
1814+
client_secret: SecretString,
1815+
) -> dict[str, Any]:
1816+
"""Get the configured catalog for a connection.
1817+
1818+
Uses the Config API endpoint: POST /v1/web_backend/connections/get
1819+
1820+
This returns the full connection info including the syncCatalog field,
1821+
which contains the configured catalog with full stream schemas.
1822+
1823+
Args:
1824+
connection_id: The connection ID to get catalog for
1825+
api_root: The API root URL
1826+
client_id: OAuth client ID
1827+
client_secret: OAuth client secret
1828+
1829+
Returns:
1830+
Dictionary containing the connection info with syncCatalog.
1831+
"""
1832+
return _make_config_api_request(
1833+
path="/web_backend/connections/get",
1834+
json={"connectionId": connection_id, "withRefreshedCatalog": False},
1835+
api_root=api_root,
1836+
client_id=client_id,
1837+
client_secret=client_secret,
1838+
)

airbyte/cloud/connections.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from __future__ import annotations
55

6-
from typing import TYPE_CHECKING
6+
from typing import TYPE_CHECKING, Any
77

88
from airbyte._util import api_util
99
from airbyte.cloud.connectors import CloudDestination, CloudSource
@@ -280,6 +280,48 @@ def get_sync_result(
280280
job_id=job_id,
281281
)
282282

283+
# Artifacts
284+
285+
def get_state_artifacts(self) -> list[dict[str, Any]] | None:
286+
"""Get the connection state artifacts.
287+
288+
Returns the persisted state for this connection, which can be used
289+
when debugging incremental syncs.
290+
291+
Uses the Config API endpoint: POST /v1/state/get
292+
293+
Returns:
294+
List of state objects for each stream, or None if no state is set.
295+
"""
296+
state_response = api_util.get_connection_state(
297+
connection_id=self.connection_id,
298+
api_root=self.workspace.api_root,
299+
client_id=self.workspace.client_id,
300+
client_secret=self.workspace.client_secret,
301+
)
302+
if state_response.get("stateType") == "not_set":
303+
return None
304+
return state_response.get("streamState", [])
305+
306+
def get_catalog_artifact(self) -> dict[str, Any] | None:
307+
"""Get the configured catalog for this connection.
308+
309+
Returns the full configured catalog (syncCatalog) for this connection,
310+
including stream schemas, sync modes, cursor fields, and primary keys.
311+
312+
Uses the Config API endpoint: POST /v1/web_backend/connections/get
313+
314+
Returns:
315+
Dictionary containing the configured catalog, or `None` if not found.
316+
"""
317+
connection_response = api_util.get_connection_catalog(
318+
connection_id=self.connection_id,
319+
api_root=self.workspace.api_root,
320+
client_id=self.workspace.client_id,
321+
client_secret=self.workspace.client_secret,
322+
)
323+
return connection_response.get("syncCatalog")
324+
283325
def rename(self, name: str) -> CloudConnection:
284326
"""Rename the connection.
285327

airbyte/mcp/cloud_ops.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"""Airbyte Cloud MCP operations."""
33

44
from pathlib import Path
5-
from typing import Annotated, Any, cast
5+
from typing import Annotated, Any, Literal, cast
66

77
from fastmcp import FastMCP
88
from pydantic import BaseModel, Field
@@ -2284,6 +2284,55 @@ def set_cloud_connection_selected_streams(
22842284
)
22852285

22862286

2287+
@mcp_tool(
2288+
domain="cloud",
2289+
read_only=True,
2290+
idempotent=True,
2291+
open_world=True,
2292+
extra_help_text=CLOUD_AUTH_TIP_TEXT,
2293+
)
2294+
def get_connection_artifact(
2295+
connection_id: Annotated[
2296+
str,
2297+
Field(description="The ID of the Airbyte Cloud connection."),
2298+
],
2299+
artifact_type: Annotated[
2300+
Literal["state", "catalog"],
2301+
Field(description="The type of artifact to retrieve: 'state' or 'catalog'."),
2302+
],
2303+
*,
2304+
workspace_id: Annotated[
2305+
str | None,
2306+
Field(
2307+
description=WORKSPACE_ID_TIP_TEXT,
2308+
default=None,
2309+
),
2310+
],
2311+
) -> dict[str, Any] | list[dict[str, Any]]:
2312+
"""Get a connection artifact (state or catalog) from Airbyte Cloud.
2313+
2314+
Retrieves the specified artifact for a connection:
2315+
- 'state': Returns the persisted state for incremental syncs as a list of
2316+
stream state objects, or {"ERROR": "..."} if no state is set.
2317+
- 'catalog': Returns the configured catalog (syncCatalog) as a dict,
2318+
or {"ERROR": "..."} if not found.
2319+
"""
2320+
workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2321+
connection = workspace.get_connection(connection_id=connection_id)
2322+
2323+
if artifact_type == "state":
2324+
result = connection.get_state_artifacts()
2325+
if result is None:
2326+
return {"ERROR": "No state is set for this connection (stateType: not_set)"}
2327+
return result
2328+
2329+
# artifact_type == "catalog"
2330+
result = connection.get_catalog_artifact()
2331+
if result is None:
2332+
return {"ERROR": "No catalog found for this connection"}
2333+
return result
2334+
2335+
22872336
def register_cloud_ops_tools(app: FastMCP) -> None:
22882337
"""@private Register tools with the FastMCP app.
22892338

0 commit comments

Comments
 (0)