Skip to content

Commit 7b3406e

Browse files
feat(cloud): Add ability to view Cloud sync logs, including SyncAttempt abstraction for fetching job attempts (#781)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent 02ee440 commit 7b3406e

File tree

5 files changed

+534
-71
lines changed

5 files changed

+534
-71
lines changed

airbyte/_util/api_util.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from __future__ import annotations
1515

1616
import json
17+
from http import HTTPStatus
1718
from typing import TYPE_CHECKING, Any, Literal
1819

1920
import airbyte_api
@@ -860,18 +861,27 @@ def _make_config_api_request(
860861
"Authorization": f"Bearer {bearer_token}",
861862
"User-Agent": "PyAirbyte Client",
862863
}
864+
full_url = config_api_root + path
863865
response = requests.request(
864866
method="POST",
865-
url=config_api_root + path,
867+
url=full_url,
866868
headers=headers,
867869
json=json,
868870
)
869871
if not status_ok(response.status_code):
870872
try:
871873
response.raise_for_status()
872874
except requests.HTTPError as ex:
875+
error_message = f"API request failed with status {response.status_code}"
876+
if response.status_code == HTTPStatus.FORBIDDEN: # 403 error
877+
error_message += f" (Forbidden) when accessing: {full_url}"
873878
raise AirbyteError(
879+
message=error_message,
874880
context={
881+
"full_url": full_url,
882+
"config_api_root": config_api_root,
883+
"path": path,
884+
"status_code": response.status_code,
875885
"url": response.request.url,
876886
"body": response.request.body,
877887
"response": response.__dict__,

airbyte/cloud/sync_results.py

Lines changed: 138 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,11 @@
103103
import time
104104
from collections.abc import Iterator, Mapping
105105
from dataclasses import asdict, dataclass
106-
from datetime import datetime
107-
from typing import TYPE_CHECKING, Any, final
106+
from typing import TYPE_CHECKING, Any
107+
108+
from typing_extensions import final
109+
110+
from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse
108111

109112
from airbyte._util import api_util
110113
from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES
@@ -117,6 +120,8 @@
117120
"""The default timeout for waiting for a sync job to complete, in seconds."""
118121

119122
if TYPE_CHECKING:
123+
from datetime import datetime
124+
120125
import sqlalchemy
121126

122127
from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum
@@ -125,6 +130,88 @@
125130
from airbyte.cloud.workspaces import CloudWorkspace
126131

127132

133+
@dataclass
134+
class SyncAttempt:
135+
"""Represents a single attempt of a sync job.
136+
137+
**This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by
138+
calling `.SyncResult.get_attempts()`.
139+
"""
140+
141+
workspace: CloudWorkspace
142+
connection: CloudConnection
143+
job_id: int
144+
attempt_number: int
145+
_attempt_data: dict[str, Any] | None = None
146+
147+
@property
148+
def attempt_id(self) -> int:
149+
"""Return the attempt ID."""
150+
return self._get_attempt_data()["id"]
151+
152+
@property
153+
def status(self) -> str:
154+
"""Return the attempt status."""
155+
return self._get_attempt_data()["status"]
156+
157+
@property
158+
def bytes_synced(self) -> int:
159+
"""Return the number of bytes synced in this attempt."""
160+
return self._get_attempt_data().get("bytesSynced", 0)
161+
162+
@property
163+
def records_synced(self) -> int:
164+
"""Return the number of records synced in this attempt."""
165+
return self._get_attempt_data().get("recordsSynced", 0)
166+
167+
@property
168+
def created_at(self) -> datetime:
169+
"""Return the creation time of the attempt."""
170+
timestamp = self._get_attempt_data()["createdAt"]
171+
return ab_datetime_parse(timestamp)
172+
173+
def _get_attempt_data(self) -> dict[str, Any]:
174+
"""Get attempt data from the provided attempt data."""
175+
if self._attempt_data is None:
176+
raise ValueError(
177+
"Attempt data not provided. SyncAttempt should be created via "
178+
"SyncResult.get_attempts()."
179+
)
180+
return self._attempt_data["attempt"]
181+
182+
def get_full_log_text(self) -> str:
183+
"""Return the complete log text for this attempt.
184+
185+
Returns:
186+
String containing all log text for this attempt, with lines separated by newlines.
187+
"""
188+
if self._attempt_data is None:
189+
return ""
190+
191+
logs_data = self._attempt_data.get("logs")
192+
if not logs_data:
193+
return ""
194+
195+
result = ""
196+
197+
if "events" in logs_data:
198+
log_events = logs_data["events"]
199+
if log_events:
200+
log_lines = []
201+
for event in log_events:
202+
timestamp = event.get("timestamp", "")
203+
level = event.get("level", "INFO")
204+
message = event.get("message", "")
205+
log_lines.append(f"[{timestamp}] {level}: {message}")
206+
result = "\n".join(log_lines)
207+
elif "logLines" in logs_data:
208+
log_lines = logs_data["logLines"]
209+
if log_lines:
210+
result = "\n".join(log_lines)
211+
212+
return result
213+
214+
128215
@dataclass
129216
class SyncResult:
130217
"""The result of a sync operation.
@@ -141,6 +228,7 @@ class SyncResult:
141228
_latest_job_info: JobResponse | None = None
142229
_connection_response: ConnectionResponse | None = None
143230
_cache: CacheBase | None = None
231+
_job_with_attempts_info: dict[str, Any] | None = None
144232

145233
@property
146234
def job_url(self) -> str:
@@ -213,8 +301,53 @@ def records_synced(self) -> int:
213301
@property
214302
def start_time(self) -> datetime:
215303
"""Return the start time of the sync job in UTC."""
216-
# Parse from ISO 8601 format:
217-
return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
304+
try:
305+
return ab_datetime_parse(self._fetch_latest_job_info().start_time)
306+
except (ValueError, TypeError) as e:
307+
if "Invalid isoformat string" in str(e):
308+
job_info_raw = api_util._make_config_api_request( # noqa: SLF001
309+
api_root=self.workspace.api_root,
310+
path="/jobs/get",
311+
json={"id": self.job_id},
312+
client_id=self.workspace.client_id,
313+
client_secret=self.workspace.client_secret,
314+
)
315+
raw_start_time = job_info_raw.get("startTime")
316+
if raw_start_time:
317+
return ab_datetime_parse(raw_start_time)
318+
raise
319+
320+
def _fetch_job_with_attempts(self) -> dict[str, Any]:
321+
"""Fetch job info with attempts from Config API using lazy loading pattern."""
322+
if self._job_with_attempts_info is not None:
323+
return self._job_with_attempts_info
324+
325+
self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper
326+
api_root=self.workspace.api_root,
327+
path="/jobs/get",
328+
json={
329+
"id": self.job_id,
330+
},
331+
client_id=self.workspace.client_id,
332+
client_secret=self.workspace.client_secret,
333+
)
334+
return self._job_with_attempts_info
335+
336+
def get_attempts(self) -> list[SyncAttempt]:
337+
"""Return a list of attempts for this sync job."""
338+
job_with_attempts = self._fetch_job_with_attempts()
339+
attempts_data = job_with_attempts.get("attempts", [])
340+
341+
return [
342+
SyncAttempt(
343+
workspace=self.workspace,
344+
connection=self.connection,
345+
job_id=self.job_id,
346+
attempt_number=i,
347+
_attempt_data=attempt_data,
348+
)
349+
for i, attempt_data in enumerate(attempts_data, start=0)
350+
]
218351

219352
def raise_failure_status(
220353
self,
@@ -362,4 +495,5 @@ def __len__(self) -> int:
362495

363496
__all__ = [
364497
"SyncResult",
498+
"SyncAttempt",
365499
]

airbyte/mcp/_cloud_ops.py

Lines changed: 116 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
22
"""Airbyte Cloud MCP operations."""
33

4-
from typing import Annotated
4+
from typing import Annotated, Any
55

66
from fastmcp import FastMCP
77
from pydantic import Field
88

99
from airbyte import cloud, get_destination, get_source
10-
from airbyte._util.api_imports import JobStatusEnum
1110
from airbyte.cloud.auth import (
1211
resolve_cloud_api_url,
1312
resolve_cloud_client_id,
@@ -343,19 +342,64 @@ def get_cloud_sync_status(
343342
default=None,
344343
),
345344
],
346-
) -> JobStatusEnum | None:
345+
*,
346+
include_attempts: Annotated[
347+
bool,
348+
Field(
349+
description="Whether to include detailed attempts information.",
350+
default=False,
351+
),
352+
],
353+
) -> dict[str, Any]:
347354
"""Get the status of a sync job from the Airbyte Cloud.
348355
349356
By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
350357
and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
351358
Airbyte Cloud API.
352359
"""
353-
workspace: CloudWorkspace = _get_cloud_workspace()
354-
connection = workspace.get_connection(connection_id=connection_id)
360+
try:
361+
workspace: CloudWorkspace = _get_cloud_workspace()
362+
connection = workspace.get_connection(connection_id=connection_id)
355363

356-
# If a job ID is provided, get the job by ID.
357-
sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
358-
return sync_result.get_job_status() if sync_result else None
364+
# If a job ID is provided, get the job by ID.
365+
sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
366+
367+
if not sync_result:
368+
return {"status": None, "job_id": None, "attempts": []}
369+
370+
result = {
371+
"status": sync_result.get_job_status(),
372+
"job_id": sync_result.job_id,
373+
"bytes_synced": sync_result.bytes_synced,
374+
"records_synced": sync_result.records_synced,
375+
"start_time": sync_result.start_time.isoformat(),
376+
"job_url": sync_result.job_url,
377+
"attempts": [],
378+
}
379+
380+
if include_attempts:
381+
attempts = sync_result.get_attempts()
382+
result["attempts"] = [
383+
{
384+
"attempt_number": attempt.attempt_number,
385+
"attempt_id": attempt.attempt_id,
386+
"status": attempt.status,
387+
"bytes_synced": attempt.bytes_synced,
388+
"records_synced": attempt.records_synced,
389+
"created_at": attempt.created_at.isoformat(),
390+
}
391+
for attempt in attempts
392+
]
393+
394+
return result # noqa: TRY300
395+
396+
except Exception as ex:
397+
return {
398+
"status": None,
399+
"job_id": job_id,
400+
"error": f"Failed to get sync status for connection '{connection_id}': {ex}",
401+
"attempts": [],
402+
}
359403

360404

361405
# @app.tool() # << deferred
@@ -382,6 +426,69 @@ def list_deployed_cloud_destination_connectors() -> list[CloudDestination]:
382426
return workspace.list_destinations()
383427

384428

429+
# @app.tool() # << deferred
430+
def get_cloud_sync_logs(
431+
connection_id: Annotated[
432+
str,
433+
Field(description="The ID of the Airbyte Cloud connection."),
434+
],
435+
job_id: Annotated[
436+
int | None,
437+
Field(description="Optional job ID. If not provided, the latest job will be used."),
438+
] = None,
439+
attempt_number: Annotated[
440+
int | None,
441+
Field(
442+
description="Optional attempt number. If not provided, the latest attempt will be used."
443+
),
444+
] = None,
445+
) -> str:
446+
"""Get the logs from a sync job attempt on Airbyte Cloud.
447+
448+
By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
449+
and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
450+
Airbyte Cloud API.
451+
"""
452+
try:
453+
workspace: CloudWorkspace = _get_cloud_workspace()
454+
connection = workspace.get_connection(connection_id=connection_id)
455+
456+
sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
457+
458+
if not sync_result:
459+
return f"No sync job found for connection '{connection_id}'"
460+
461+
attempts = sync_result.get_attempts()
462+
463+
if not attempts:
464+
return f"No attempts found for job '{sync_result.job_id}'"
465+
466+
if attempt_number is not None:
467+
target_attempt = None
468+
for attempt in attempts:
469+
if attempt.attempt_number == attempt_number:
470+
target_attempt = attempt
471+
break
472+
473+
if target_attempt is None:
474+
return f"Attempt number {attempt_number} not found for job '{sync_result.job_id}'"
475+
else:
476+
target_attempt = max(attempts, key=lambda a: a.attempt_number)
477+
478+
logs = target_attempt.get_full_log_text()
479+
480+
if not logs:
481+
return (
482+
f"No logs available for job '{sync_result.job_id}', "
483+
f"attempt {target_attempt.attempt_number}"
484+
)
485+
486+
return logs # noqa: TRY300
487+
488+
except Exception as ex:
489+
return f"Failed to get logs for connection '{connection_id}': {ex}"
490+
491+
385492
# @app.tool() # << deferred
386493
def list_deployed_cloud_connections() -> list[CloudConnection]:
387494
"""List all deployed connections in the Airbyte Cloud workspace.
@@ -403,6 +510,7 @@ def register_cloud_ops_tools(app: FastMCP) -> None:
403510
app.tool(create_connection_on_cloud)
404511
app.tool(run_cloud_sync)
405512
app.tool(get_cloud_sync_status)
513+
app.tool(get_cloud_sync_logs)
406514
app.tool(list_deployed_cloud_source_connectors)
407515
app.tool(list_deployed_cloud_destination_connectors)
408516
app.tool(list_deployed_cloud_connections)

bin/test_mcp_tool.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
1616
poe mcp-tool-test check_airbyte_cloud_workspace '{}'
1717
poe mcp-tool-test list_deployed_cloud_connections '{}'
18+
poe mcp-tool-test get_cloud_sync_status \
19+
'{"connection_id": "0791e193-811b-4fcf-91c3-f8c5963e74a0", "include_attempts": true}'
20+
poe mcp-tool-test get_cloud_sync_logs \
21+
'{"connection_id": "0791e193-811b-4fcf-91c3-f8c5963e74a0"}'
1822
"""
1923

2024
import asyncio

0 commit comments

Comments
 (0)