Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions airbyte/_util/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2235,6 +2235,42 @@ def get_connection_catalog(
)


def get_refreshed_connection_catalog(
connection_id: str,
*,
api_root: str,
client_id: SecretString | None,
client_secret: SecretString | None,
bearer_token: SecretString | None,
) -> dict[str, Any]:
"""Get the configured catalog for a connection with a refreshed schema from the source.

Uses the Config API endpoint: POST /v1/web_backend/connections/get
with ``withRefreshedCatalog: true``, which triggers a discover operation
on the connection's source and returns the updated catalog.

This is equivalent to clicking "Refresh source schema" in the Airbyte UI.

Args:
connection_id: The connection ID to get catalog for.
api_root: The API root URL.
client_id: OAuth client ID.
client_secret: OAuth client secret.
bearer_token: Bearer token for authentication (alternative to client credentials).

Returns:
Dictionary containing the connection info with refreshed syncCatalog.
"""
return _make_config_api_request(
path="/web_backend/connections/get",
json={"connectionId": connection_id, "withRefreshedCatalog": True},
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
bearer_token=bearer_token,
)


def replace_connection_catalog(
connection_id: str,
configured_catalog_dict: dict[str, Any],
Expand Down
164 changes: 164 additions & 0 deletions airbyte/cloud/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,170 @@ def import_raw_catalog(self, catalog: dict[str, Any]) -> None:
bearer_token=self.workspace.bearer_token,
)

def refresh_catalog(self) -> dict[str, Any]:
"""Refresh the connection's catalog by re-discovering the source schema.

Triggers a discover operation on the connection's source connector, then
replaces the connection's catalog with the refreshed result. This is
equivalent to clicking "Refresh source schema" in the Airbyte UI.

This is useful after pinning a new connector version that advertises
new streams or updated sync mode support.

Returns:
The refreshed syncCatalog dict.

Raises:
PyAirbyteInputError: If the refreshed catalog is empty or missing.
"""
refreshed_response = api_util.get_refreshed_connection_catalog(
connection_id=self.connection_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
refreshed_catalog = refreshed_response.get("syncCatalog")
if not refreshed_catalog:
raise PyAirbyteInputError(
message="Refreshed catalog is empty.",
context={"connection_id": self.connection_id},
)

api_util.replace_connection_catalog(
connection_id=self.connection_id,
configured_catalog_dict=refreshed_catalog,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
Comment on lines +681 to +688
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. import_raw_catalog does the same replace_connection_catalog call with the same auth params. Switching to self.import_raw_catalog(refreshed_catalog) would reduce duplication. Will apply if the human reviewer agrees.\n\n---\nDevin session

return refreshed_catalog

def set_stream_sync_mode(
self,
stream_name: str,
*,
sync_mode: str,
stream_namespace: str | None = None,
destination_sync_mode: str | None = None,
cursor_field: str | None = None,
) -> None:
"""Set the sync mode for a specific stream on this connection.

Safely modifies only the specified stream in the connection's syncCatalog.
Validates that the requested sync mode is supported by the stream before
applying the change.

Args:
stream_name: The name of the stream to modify.
sync_mode: The source sync mode to set (``"incremental"`` or ``"full_refresh"``).
stream_namespace: The namespace of the stream to modify. If not provided and
multiple streams share the same name, a ``PyAirbyteInputError`` is raised.
destination_sync_mode: The destination sync mode to set
(``"append"``, ``"overwrite"``, or ``"append_dedup"``). If not provided,
the existing destination sync mode is preserved.
cursor_field: The cursor field to use for incremental syncs. Required when
switching to incremental mode if the stream does not have a default
cursor. If not provided, the existing cursor field is preserved.

Raises:
PyAirbyteInputError: If the stream is not found in the catalog, the requested
sync mode is not in the stream's ``supportedSyncModes``, the stream name
is ambiguous (multiple matches without a namespace), or incremental mode
is requested without a usable cursor field.
"""
catalog = self.dump_raw_catalog()
if not catalog or "streams" not in catalog:
raise PyAirbyteInputError(
message="Connection catalog is empty or missing.",
context={"connection_id": self.connection_id},
)

# Find all matching streams by name (and optionally namespace)
matching_entries: list[dict[str, Any]] = []
for entry in catalog["streams"]:
stream_def = entry.get("stream", {})
if stream_def.get("name") != stream_name:
continue
if stream_namespace is not None and stream_def.get("namespace") != stream_namespace:
continue
matching_entries.append(entry)

if not matching_entries:
available_streams = [
f"{e.get('stream', {}).get('namespace', '')}.{e.get('stream', {}).get('name', '')}"
for e in catalog["streams"]
]
Comment on lines +742 to +745
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid observation — when namespace is None, this produces .users. However, this is a diagnostic context field on an error (not user-facing UI), so the dot-prefix still communicates "no namespace" clearly enough. Happy to clean this up if the human reviewer agrees it's worth addressing.\n\n---\nDevin session

raise PyAirbyteInputError(
message=f"Stream '{stream_name}' not found in connection catalog.",
context={
"connection_id": self.connection_id,
"available_streams": available_streams,
},
)

if len(matching_entries) > 1:
matching_namespaces = [e.get("stream", {}).get("namespace") for e in matching_entries]
raise PyAirbyteInputError(
message=(
f"Stream name '{stream_name}' is ambiguous "
f"({len(matching_entries)} matches found)."
),
context={
"connection_id": self.connection_id,
"stream_name": stream_name,
"matching_namespaces": matching_namespaces,
},
)

target_entry = matching_entries[0]

# Validate the sync mode is supported
stream_def = target_entry.get("stream", {})
supported_sync_modes: list[str] = stream_def.get("supportedSyncModes", [])
if sync_mode not in supported_sync_modes:
raise PyAirbyteInputError(
message=(f"Sync mode '{sync_mode}' is not supported by stream '{stream_name}'."),
context={
"stream_name": stream_name,
"requested_sync_mode": sync_mode,
"supported_sync_modes": supported_sync_modes,
},
)
Comment on lines +774 to +781
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point on consistency — connection_id is included in the other error contexts in this method. Will add if the human reviewer considers it worth a follow-up commit.\n\n---\nDevin session


# Update the stream config
config = target_entry.get("config") or {}

# Guard: fail fast if switching to incremental without a usable cursor
if sync_mode == "incremental":
existing_cursor = config.get("cursorField") or stream_def.get("defaultCursorField")
source_defined_cursor = bool(stream_def.get("sourceDefinedCursor"))
if cursor_field is None and not existing_cursor and not source_defined_cursor:
raise PyAirbyteInputError(
message=(
f"Stream '{stream_name}' needs a cursor field before switching "
"to incremental sync mode."
),
context={
"connection_id": self.connection_id,
"stream_name": stream_name,
},
)

config["syncMode"] = sync_mode

if destination_sync_mode is not None:
config["destinationSyncMode"] = destination_sync_mode

if cursor_field is not None:
config["cursorField"] = [cursor_field]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is by design — allowing cursor_field to be set during full_refresh enables pre-configuring the cursor before a subsequent switch to incremental mode (a common two-step workflow). The docstring could be clearer about this, but silently ignoring the parameter would be surprising. Deferring to human reviewer on whether to add a note in the docstring.\n\n---\nDevin session


target_entry["config"] = config

# Save the updated catalog
self.import_raw_catalog(catalog)

def rename(self, name: str) -> CloudConnection:
"""Rename the connection.

Expand Down
136 changes: 136 additions & 0 deletions airbyte/mcp/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -2586,6 +2586,142 @@ def update_cloud_connection(
)


@mcp_tool(
destructive=True,
open_world=True,
extra_help_text=CLOUD_AUTH_TIP_TEXT,
)
def refresh_connection_catalog(
ctx: Context,
connection_id: Annotated[
str,
Field(description="The ID of the connection to refresh the catalog for."),
],
*,
workspace_id: Annotated[
str | None,
Field(
description=WORKSPACE_ID_TIP_TEXT,
default=None,
),
],
) -> str:
"""Refresh the catalog for a connection by re-discovering the source schema.

Triggers a discover operation on the connection's source connector and updates
the connection's catalog with the latest stream definitions and supported sync
modes. This is equivalent to clicking "Refresh source schema" in the Airbyte UI.

This is useful after pinning a new connector version that advertises new streams
or updated sync mode support.
"""
check_guid_created_in_session(connection_id)
workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
connection = workspace.get_connection(connection_id=connection_id)

refreshed_catalog = connection.refresh_catalog()
stream_count = len(refreshed_catalog.get("streams", []))

return (
f"Successfully refreshed catalog for connection '{connection_id}'. "
f"Catalog now contains {stream_count} stream(s). "
f"URL: {connection.connection_url}"
)


@mcp_tool(
destructive=True,
open_world=True,
extra_help_text=CLOUD_AUTH_TIP_TEXT,
)
def set_stream_sync_mode(
ctx: Context,
connection_id: Annotated[
str,
Field(description="The ID of the connection to modify."),
],
stream_name: Annotated[
str,
Field(description="The name of the stream to change the sync mode for."),
],
sync_mode: Annotated[
Literal["incremental", "full_refresh"],
Field(description="The source sync mode to set: 'incremental' or 'full_refresh'."),
],
*,
stream_namespace: Annotated[
str | None,
Field(
description=(
"The namespace of the stream to modify. Required when multiple streams "
"share the same name but differ by namespace."
),
default=None,
),
],
destination_sync_mode: Annotated[
Literal["append", "overwrite", "append_dedup"] | None,
Field(
description=(
"The destination sync mode to set: 'append', 'overwrite', or 'append_dedup'. "
"If not provided, the existing destination sync mode is preserved."
),
default=None,
),
],
cursor_field: Annotated[
str | None,
Field(
description=(
"The cursor field to use for incremental syncs. "
"Required when switching to incremental mode if the stream does not have "
"a default cursor. If not provided, the existing cursor field is preserved."
),
default=None,
),
],
workspace_id: Annotated[
str | None,
Field(
description=WORKSPACE_ID_TIP_TEXT,
default=None,
),
],
) -> str:
"""Set the sync mode for a specific stream on a connection.

Safely modifies only the specified stream in the connection's syncCatalog.
Validates that the requested sync mode is supported by the stream before
applying the change.

This is useful when switching a stream from full_refresh to incremental
(or vice versa) after a connector version upgrade that adds incremental support.
"""
check_guid_created_in_session(connection_id)
workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
connection = workspace.get_connection(connection_id=connection_id)

connection.set_stream_sync_mode(
stream_name=stream_name,
sync_mode=sync_mode,
stream_namespace=stream_namespace,
destination_sync_mode=destination_sync_mode,
cursor_field=cursor_field,
)

return (
f"Successfully set sync mode for stream '{stream_name}' "
f"on connection '{connection_id}' to '{sync_mode}'"
+ (
f" with destination sync mode '{destination_sync_mode}'"
if destination_sync_mode
else ""
)
+ (f" and cursor field '{cursor_field}'" if cursor_field else "")
+ f". URL: {connection.connection_url}"
)
Comment on lines +2712 to +2722
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reasonable observation. Including namespace in the success message when it was explicitly supplied would improve clarity for multi-namespace sources. Will add if the human reviewer agrees.\n\n---\nDevin session



@mcp_tool(
read_only=True,
idempotent=True,
Expand Down
Loading