Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 85 additions & 14 deletions airbyte/mcp/cloud_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@ class CloudConnectionResult(BaseModel):
"""ID of the source used by this connection."""
destination_id: str
"""ID of the destination used by this connection."""
last_job_status: str | None = None
"""Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled').
Only populated when with_connection_status=True."""
last_job_id: int | None = None
"""Job ID of the most recent completed sync. Only populated when with_connection_status=True."""
last_job_time: str | None = None
"""ISO 8601 timestamp of the most recent completed sync.
Only populated when with_connection_status=True."""
currently_running_job_id: int | None = None
"""Job ID of a currently running sync, if any.
Only populated when with_connection_status=True."""
currently_running_job_start_time: str | None = None
"""ISO 8601 timestamp of when the currently running sync started.
Only populated when with_connection_status=True."""


class CloudSourceDetails(BaseModel):
Expand Down Expand Up @@ -825,8 +839,27 @@ def list_deployed_cloud_connections(
int | None,
"Optional maximum number of items to return (default: no limit)",
] = None,
with_connection_status: Annotated[
bool | None,
"If True, include status info for each connection's most recent sync job",
default=False,
],
failing_connections_only: Annotated[
bool | None,
"If True, only return connections where the most recent completed sync failed",
default=False,
],
) -> list[CloudConnectionResult]:
"""List all deployed connections in the Airbyte Cloud workspace."""
"""List all deployed connections in the Airbyte Cloud workspace.

When with_connection_status is True, each connection result will include
information about the most recent sync job status, skipping over any
currently in-progress syncs to find the last completed job.

When failing_connections_only is True, only connections where the most
recent completed sync job failed will be returned. This implicitly enables
with_connection_status.
"""
workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
connections = workspace.list_connections()

Expand All @@ -835,21 +868,59 @@ def list_deployed_cloud_connections(
needle = name_contains.lower()
connections = [c for c in connections if c.name is not None and needle in c.name.lower()]

# Apply limit if requested
if max_items_limit is not None:
connections = connections[:max_items_limit]
# If failing_connections_only is True, implicitly enable with_connection_status
if failing_connections_only:
with_connection_status = True

# Note: name and url are guaranteed non-null from list API responses
return [
CloudConnectionResult(
id=connection.connection_id,
name=cast(str, connection.name),
url=cast(str, connection.connection_url),
source_id=connection.source_id,
destination_id=connection.destination_id,
results: list[CloudConnectionResult] = []

for connection in connections:
last_job_status: str | None = None
last_job_id: int | None = None
last_job_time: str | None = None
currently_running_job_id: int | None = None
currently_running_job_start_time: str | None = None

if with_connection_status:
sync_logs = connection.get_previous_sync_logs(limit=5)

for sync_result in sync_logs:
job_status = sync_result.get_job_status()

if not sync_result.is_job_complete():
currently_running_job_id = sync_result.job_id
currently_running_job_start_time = sync_result.start_time.isoformat()
continue

last_job_status = str(job_status.value) if job_status else None
last_job_id = sync_result.job_id
last_job_time = sync_result.start_time.isoformat()
break

if failing_connections_only and (
last_job_status is None or last_job_status not in {"failed", "cancelled"}
):
continue

results.append(
CloudConnectionResult(
id=connection.connection_id,
name=cast(str, connection.name),
url=cast(str, connection.connection_url),
source_id=connection.source_id,
destination_id=connection.destination_id,
last_job_status=last_job_status,
last_job_id=last_job_id,
last_job_time=last_job_time,
currently_running_job_id=currently_running_job_id,
currently_running_job_start_time=currently_running_job_start_time,
)
)
for connection in connections
]

if max_items_limit is not None and len(results) >= max_items_limit:
break

return results


def _get_custom_source_definition_description(
Expand Down
Loading