Skip to content
24 changes: 23 additions & 1 deletion airbyte/_util/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
JOB_WAIT_INTERVAL_SECS = 2.0
JOB_WAIT_TIMEOUT_SECS_DEFAULT = 60 * 60 # 1 hour

# Job ordering constants for list_jobs API
JOB_ORDER_BY_CREATED_AT_DESC = "createdAt|DESC"
JOB_ORDER_BY_CREATED_AT_ASC = "createdAt|ASC"


def status_ok(status_code: int) -> bool:
"""Check if a status code is OK."""
Expand Down Expand Up @@ -412,8 +416,24 @@ def get_job_logs(
api_root: str,
client_id: SecretString,
client_secret: SecretString,
offset: int | None = None,
order_by: str | None = None,
) -> list[models.JobResponse]:
"""Get a job's logs."""
"""Get a list of jobs for a connection.

Args:
workspace_id: The workspace ID.
connection_id: The connection ID.
limit: Maximum number of jobs to return. Defaults to 100.
api_root: The API root URL.
client_id: The client ID for authentication.
client_secret: The client secret for authentication.
offset: Number of jobs to skip from the beginning. Defaults to None (0).
order_by: Field and direction to order by (e.g., "createdAt|DESC"). Defaults to None.

Returns:
A list of JobResponse objects.
"""
airbyte_instance = get_airbyte_server_instance(
client_id=client_id,
client_secret=client_secret,
Expand All @@ -424,6 +444,8 @@ def get_job_logs(
workspace_ids=[workspace_id],
connection_id=connection_id,
limit=limit,
offset=offset,
order_by=order_by,
),
)
if status_ok(response.status_code) and response.jobs_response:
Expand Down
28 changes: 26 additions & 2 deletions airbyte/cloud/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,38 @@ def __repr__(self) -> str:
def get_previous_sync_logs(
self,
*,
limit: int = 10,
limit: int = 20,
offset: int | None = None,
from_tail: bool = True,
) -> list[SyncResult]:
"""Get the previous sync logs for a connection."""
"""Get previous sync jobs for a connection with pagination support.

Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
rows_synced, start_time). Full log text can be fetched lazily via
`SyncResult.get_full_log_text()`.

Args:
limit: Maximum number of jobs to return. Defaults to 20.
offset: Number of jobs to skip from the beginning. Defaults to None (0).
from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
If False, returns jobs ordered oldest-first (createdAt ASC).
Defaults to True.

Returns:
A list of SyncResult objects representing the sync jobs.
"""
order_by = (
api_util.JOB_ORDER_BY_CREATED_AT_DESC
if from_tail
else api_util.JOB_ORDER_BY_CREATED_AT_ASC
)
sync_logs: list[JobResponse] = api_util.get_job_logs(
connection_id=self.connection_id,
api_root=self.workspace.api_root,
workspace_id=self.workspace.workspace_id,
limit=limit,
offset=offset,
order_by=order_by,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
)
Expand Down
135 changes: 135 additions & 0 deletions airbyte/mcp/cloud_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,36 @@ class LogReadResult(BaseModel):
"""Total number of log lines available, shows if any lines were missed due to the limit."""


class SyncJobResult(BaseModel):
"""Information about a sync job."""

job_id: int
"""The job ID."""
status: str
"""The job status (e.g., 'succeeded', 'failed', 'running', 'pending')."""
bytes_synced: int
"""Number of bytes synced in this job."""
records_synced: int
"""Number of records synced in this job."""
start_time: str
"""ISO 8601 timestamp of when the job started."""
job_url: str
"""URL to view the job in Airbyte Cloud."""


class SyncJobListResult(BaseModel):
"""Result of listing sync jobs with pagination support."""

jobs: list[SyncJobResult]
"""List of sync jobs."""
jobs_count: int
"""Number of jobs returned in this response."""
jobs_offset: int
"""Offset used for this request (0 if not specified)."""
from_tail: bool
"""Whether jobs are ordered newest-first (True) or oldest-first (False)."""


def _get_cloud_workspace(workspace_id: str | None = None) -> CloudWorkspace:
"""Get an authenticated CloudWorkspace.

Expand Down Expand Up @@ -601,6 +631,111 @@ def get_cloud_sync_status(
return result


@mcp_tool(
domain="cloud",
read_only=True,
idempotent=True,
open_world=True,
extra_help_text=CLOUD_AUTH_TIP_TEXT,
)
def list_cloud_sync_jobs(
connection_id: Annotated[
str,
Field(description="The ID of the Airbyte Cloud connection."),
],
*,
workspace_id: Annotated[
str | None,
Field(
description=WORKSPACE_ID_TIP_TEXT,
default=None,
),
],
max_jobs: Annotated[
int,
Field(
description=(
"Maximum number of jobs to return. "
"Defaults to 20 if not specified. "
"If '0' is provided, no limit is applied (uses API default)."
),
default=20,
),
],
from_tail: Annotated[
bool | None,
Field(
description=(
"When True, jobs are ordered newest-first (createdAt DESC). "
"When False, jobs are ordered oldest-first (createdAt ASC). "
"Defaults to True if `jobs_offset` is not specified. "
"Cannot combine `from_tail=True` with `jobs_offset`."
),
default=None,
),
],
jobs_offset: Annotated[
int | None,
Field(
description=(
"Number of jobs to skip from the beginning. "
"Cannot be combined with `from_tail=True`."
),
default=None,
),
],
) -> SyncJobListResult:
"""List sync jobs for a connection with pagination support.

This tool allows you to retrieve a list of sync jobs for a connection,
with control over ordering and pagination. By default, jobs are returned
newest-first (from_tail=True).
"""
# Validate that jobs_offset and from_tail are not both set
if jobs_offset is not None and from_tail is True:
raise PyAirbyteInputError(
message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.",
context={"jobs_offset": jobs_offset, "from_tail": from_tail},
)

# Default to from_tail=True if neither is specified
if from_tail is None and jobs_offset is None:
from_tail = True
elif from_tail is None:
from_tail = False

workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
connection = workspace.get_connection(connection_id=connection_id)

# Use 0 as "no limit" - pass a high number to the API
effective_limit = max_jobs if max_jobs > 0 else 1000

sync_results = connection.get_previous_sync_logs(
limit=effective_limit,
offset=jobs_offset,
from_tail=from_tail,
)

jobs = [
SyncJobResult(
job_id=sync_result.job_id,
status=str(sync_result.get_job_status()),
bytes_synced=sync_result.bytes_synced,
records_synced=sync_result.records_synced,
start_time=sync_result.start_time.isoformat(),
job_url=sync_result.job_url,
)
for sync_result in sync_results
]

return SyncJobListResult(
jobs=jobs,
jobs_count=len(jobs),
jobs_offset=jobs_offset or 0,
from_tail=from_tail,
)


@mcp_tool(
domain="cloud",
read_only=True,
Expand Down