Skip to content
104 changes: 96 additions & 8 deletions airbyte/mcp/cloud_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,19 @@ class CloudWorkspaceResult(BaseModel):
"""ID of the organization this workspace belongs to."""


class LogReadResult(BaseModel):
"""Result of reading sync logs with pagination support."""

log_text: str
"""The string containing the log text we are returning."""
log_text_start_line: int
"""1-based line index of the first line returned."""
log_text_line_count: int
"""Count of lines we are returning."""
total_log_lines_available: int
"""Total number of log lines available, shows if any lines were missed due to the limit."""


def _get_cloud_workspace(workspace_id: str | None = None) -> CloudWorkspace:
"""Get an authenticated CloudWorkspace.

Expand Down Expand Up @@ -802,20 +815,65 @@ def get_cloud_sync_logs(
default=None,
),
],
) -> str:
max_lines: Annotated[
int,
Field(
description=(
"Maximum number of lines to return. "
"Defaults to 4000 if not specified. "
"If '0' is provided, no limit is applied."
),
default=4000,
),
],
line_offset: Annotated[
int | None,
Field(
description=(
"Number of lines to skip from the beginning of the logs. "
"Cannot be combined with `from_tail=True`."
),
default=None,
),
] = None,
from_tail: Annotated[
bool | None,
Field(
description=(
"If True, return lines from the end of the logs instead of the beginning. "
"Cannot be combined with `line_offset`."
),
default=None,
),
] = None,
) -> LogReadResult:
"""Get the logs from a sync job attempt on Airbyte Cloud."""
# Validate that line_offset and from_tail are not both set
if line_offset is not None and from_tail:
raise PyAirbyteInputError(
message="Cannot specify both 'line_offset' and 'from_tail' parameters.",
context={"line_offset": line_offset, "from_tail": from_tail},
)

max_lines = None if max_lines = 0 else max_lines
workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
connection = workspace.get_connection(connection_id=connection_id)

sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)

if not sync_result:
return f"No sync job found for connection '{connection_id}'"
raise AirbyteMissingResourceError(
resource_type="sync job",
resource_name_or_id=connection_id,
)

attempts = sync_result.get_attempts()

if not attempts:
return f"No attempts found for job '{sync_result.job_id}'"
raise AirbyteMissingResourceError(
resource_type="sync attempt",
resource_name_or_id=str(sync_result.job_id),
)

if attempt_number is not None:
target_attempt = None
Expand All @@ -825,19 +883,49 @@ def get_cloud_sync_logs(
break

if target_attempt is None:
return f"Attempt number {attempt_number} not found for job '{sync_result.job_id}'"
raise AirbyteMissingResourceError(
resource_type="sync attempt",
resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}",
)
else:
target_attempt = max(attempts, key=lambda a: a.attempt_number)

logs = target_attempt.get_full_log_text()

if not logs:
return (
f"No logs available for job '{sync_result.job_id}', "
f"attempt {target_attempt.attempt_number}"
# Return empty result with zero lines
return LogReadResult(
log_text=(
f"[No logs available for job '{sync_result.job_id}', "
f"attempt {target_attempt.attempt_number}.]"
),
log_text_start_line=1,
log_text_line_count=0,
total_log_lines_available=0,
)

return logs
# Apply line limiting
lines = logs.splitlines()
total_lines = len(lines)

# If max_lines is 0 or None, we return all lines.
effective_max_lines = max_lines or total_lines

if from_tail:
# Return lines from the end
start_index = max(0, total_lines - effective_max_lines)
lines = lines[-effective_max_lines:]
else:
# Return lines from the beginning, with optional offset
start_index = line_offset if line_offset is not None else 0
lines = lines[start_index : start_index + effective_max_lines]

return LogReadResult(
log_text="\n".join(lines),
log_text_start_line=start_index + 1, # Convert to 1-based index
log_text_line_count=len(lines),
total_log_lines_available=total_lines,
)


@mcp_tool(
Expand Down
Loading