Skip to content
113 changes: 105 additions & 8 deletions airbyte/mcp/cloud_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,23 @@ class CloudWorkspaceResult(BaseModel):
"""ID of the organization this workspace belongs to."""


class LogReadResult(BaseModel):
"""Result of reading sync logs with pagination support."""

job_id: int
"""The job ID the logs belong to."""
attempt_number: int
"""The attempt number the logs belong to."""
log_text: str
"""The string containing the log text we are returning."""
log_text_start_line: int
"""1-based line index of the first line returned."""
log_text_line_count: int
"""Count of lines we are returning."""
total_log_lines_available: int
"""Total number of log lines available, shows if any lines were missed due to the limit."""


def _get_cloud_workspace(workspace_id: str | None = None) -> CloudWorkspace:
"""Get an authenticated CloudWorkspace.

Expand Down Expand Up @@ -802,20 +819,67 @@ def get_cloud_sync_logs(
default=None,
),
],
) -> str:
max_lines: Annotated[
int,
Field(
description=(
"Maximum number of lines to return. "
"Defaults to 4000 if not specified. "
"If '0' is provided, no limit is applied."
),
default=4000,
),
],
from_tail: Annotated[
bool | None,
Field(
description=(
"Pull from the end of the log text if total lines is greater than 'max_lines'. "
"Defaults to True if `line_offset` is not specified. "
"Cannot combine `from_tail=True` with `line_offset`."
),
default=None,
),
],
line_offset: Annotated[
int | None,
Field(
description=(
"Number of lines to skip from the beginning of the logs. "
"Cannot be combined with `from_tail=True`."
),
default=None,
),
],
) -> LogReadResult:
"""Get the logs from a sync job attempt on Airbyte Cloud."""
# Validate that line_offset and from_tail are not both set
if line_offset is not None and from_tail:
raise PyAirbyteInputError(
message="Cannot specify both 'line_offset' and 'from_tail' parameters.",
context={"line_offset": line_offset, "from_tail": from_tail},
)

if from_tail is None and line_offset is None:
from_tail = True
workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
connection = workspace.get_connection(connection_id=connection_id)

sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)

if not sync_result:
return f"No sync job found for connection '{connection_id}'"
raise AirbyteMissingResourceError(
resource_type="sync job",
resource_name_or_id=connection_id,
)

attempts = sync_result.get_attempts()

if not attempts:
return f"No attempts found for job '{sync_result.job_id}'"
raise AirbyteMissingResourceError(
resource_type="sync attempt",
resource_name_or_id=str(sync_result.job_id),
)

if attempt_number is not None:
target_attempt = None
Expand All @@ -825,19 +889,52 @@ def get_cloud_sync_logs(
break

if target_attempt is None:
return f"Attempt number {attempt_number} not found for job '{sync_result.job_id}'"
raise AirbyteMissingResourceError(
resource_type="sync attempt",
resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}",
)
else:
target_attempt = max(attempts, key=lambda a: a.attempt_number)

logs = target_attempt.get_full_log_text()

if not logs:
return (
f"No logs available for job '{sync_result.job_id}', "
f"attempt {target_attempt.attempt_number}"
# Return empty result with zero lines
return LogReadResult(
log_text=(
f"[No logs available for job '{sync_result.job_id}', "
f"attempt {target_attempt.attempt_number}.]"
),
log_text_start_line=1,
log_text_line_count=0,
total_log_lines_available=0,
job_id=sync_result.job_id,
attempt_number=target_attempt.attempt_number,
)

return logs
# Apply line limiting
log_lines = logs.splitlines()
total_lines = len(log_lines)

# Determine effective max_lines (0 means no limit)
effective_max = total_lines if max_lines == 0 else max_lines

# Calculate start_index and slice based on from_tail or line_offset
if from_tail:
start_index = max(0, total_lines - effective_max)
selected_lines = log_lines[start_index:][:effective_max]
else:
start_index = line_offset or 0
selected_lines = log_lines[start_index : start_index + effective_max]

return LogReadResult(
log_text="\n".join(selected_lines),
log_text_start_line=start_index + 1, # Convert to 1-based index
log_text_line_count=len(selected_lines),
total_log_lines_available=total_lines,
job_id=sync_result.job_id,
attempt_number=target_attempt.attempt_number,
)


@mcp_tool(
Expand Down