-
Notifications
You must be signed in to change notification settings - Fork 71
Expand file tree
/
Copy pathsync_results.py
More file actions
506 lines (403 loc) · 17.2 KB
/
sync_results.py
File metadata and controls
506 lines (403 loc) · 17.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Sync results for Airbyte Cloud workspaces.
## Examples
### Run a sync job and wait for completion
To get started, we'll need a `.CloudConnection` object. You can obtain this object by calling
`.CloudWorkspace.get_connection()`.
```python
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Get a connection object
connection = workspace.get_connection(connection_id="456")
```
Once we have a `.CloudConnection` object, we can simply call `run_sync()`
to start a sync job and wait for it to complete.
```python
# Run a sync job
sync_result: SyncResult = connection.run_sync()
```
### Run a sync job and return immediately
By default, `run_sync()` will wait for the job to complete and raise an
exception if the job fails. You can instead return immediately by setting
`wait=False`.
```python
# Start the sync job and return immediately
sync_result: SyncResult = connection.run_sync(wait=False)
while not sync_result.is_job_complete():
print("Job is still running...")
time.sleep(5)
print(f"Job is complete! Status: {sync_result.get_job_status()}")
```
### Examining the sync result
You can examine the sync result to get more information about the job:
```python
sync_result: SyncResult = connection.run_sync()
# Print the job details
print(
f'''
Job ID: {sync_result.job_id}
Job URL: {sync_result.job_url}
Start Time: {sync_result.start_time}
Records Synced: {sync_result.records_synced}
Bytes Synced: {sync_result.bytes_synced}
Job Status: {sync_result.get_job_status()}
List of Stream Names: {', '.join(sync_result.stream_names)}
'''
)
```
### Reading data from Airbyte Cloud sync result
**This feature is currently only available for specific SQL-based destinations.** This includes
SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be
determined by inspecting the constant `airbyte.cloud.constants.READABLE_DESTINATION_TYPES`.
If your destination is supported, you can read records directly from the SyncResult object.
```python
# Assuming we've already created a `connection` object...
sync_result = connection.get_sync_result()
# Print a list of available stream names
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get the SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
```
------
"""
from __future__ import annotations
import time
from collections.abc import Iterator, Mapping
from dataclasses import asdict, dataclass
from typing import TYPE_CHECKING, Any
from typing_extensions import final
from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse
from airbyte._util import api_util
from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES
from airbyte.datasets import CachedDataset
from airbyte.destinations._translate_dest_to_cache import destination_to_cache
from airbyte.exceptions import AirbyteConnectionSyncError, AirbyteConnectionSyncTimeoutError
DEFAULT_SYNC_TIMEOUT_SECONDS = 30 * 60 # 30 minutes
"""The default timeout for waiting for a sync job to complete, in seconds."""
if TYPE_CHECKING:
from datetime import datetime
import sqlalchemy
from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum
from airbyte.caches.base import CacheBase
from airbyte.cloud.connections import CloudConnection
from airbyte.cloud.workspaces import CloudWorkspace
@dataclass
class SyncAttempt:
"""Represents a single attempt of a sync job.
**This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by
calling `.SyncResult.get_attempts()`.
"""
workspace: CloudWorkspace
connection: CloudConnection
job_id: int
attempt_number: int
_attempt_data: dict[str, Any] | None = None
@property
def attempt_id(self) -> int:
"""Return the attempt ID."""
return self._get_attempt_data()["id"]
@property
def status(self) -> str:
"""Return the attempt status."""
return self._get_attempt_data()["status"]
@property
def bytes_synced(self) -> int:
"""Return the number of bytes synced in this attempt."""
return self._get_attempt_data().get("bytesSynced", 0)
@property
def records_synced(self) -> int:
"""Return the number of records synced in this attempt."""
return self._get_attempt_data().get("recordsSynced", 0)
@property
def created_at(self) -> datetime:
"""Return the creation time of the attempt."""
timestamp = self._get_attempt_data()["createdAt"]
return ab_datetime_parse(timestamp)
def _get_attempt_data(self) -> dict[str, Any]:
"""Get attempt data from the provided attempt data."""
if self._attempt_data is None:
raise ValueError(
"Attempt data not provided. SyncAttempt should be created via "
"SyncResult.get_attempts()."
)
return self._attempt_data["attempt"]
def get_full_log_text(self) -> str:
"""Return the complete log text for this attempt.
Returns:
String containing all log text for this attempt, with lines separated by newlines.
"""
if self._attempt_data is None:
return ""
logs_data = self._attempt_data.get("logs")
if not logs_data:
return ""
result = ""
if "events" in logs_data:
log_events = logs_data["events"]
if log_events:
log_lines = []
for event in log_events:
timestamp = event.get("timestamp", "")
level = event.get("level", "INFO")
message = event.get("message", "")
log_lines.append(
f"[{timestamp}] {level}: {message}" # pyrefly: ignore[bad-argument-type]
)
result = "\n".join(log_lines)
elif "logLines" in logs_data:
log_lines = logs_data["logLines"]
if log_lines:
result = "\n".join(log_lines)
return result
@dataclass
class SyncResult:
"""The result of a sync operation.
**This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
"""
workspace: CloudWorkspace
connection: CloudConnection
job_id: int
table_name_prefix: str = ""
table_name_suffix: str = ""
_latest_job_info: JobResponse | None = None
_connection_response: ConnectionResponse | None = None
_cache: CacheBase | None = None
_job_with_attempts_info: dict[str, Any] | None = None
@property
def job_url(self) -> str:
"""Return the URL of the sync job.
Note: This currently returns the connection's job history URL, as there is no direct URL
to a specific job in the Airbyte Cloud web app.
TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
"""
return f"{self.connection.job_history_url}"
def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
"""Return connection info for the sync job."""
if self._connection_response and not force_refresh:
return self._connection_response
self._connection_response = api_util.get_connection(
workspace_id=self.workspace.workspace_id,
api_root=self.workspace.api_root,
connection_id=self.connection.connection_id,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return self._connection_response
def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
"""Return the destination configuration for the sync job."""
connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
destination_response = api_util.get_destination(
destination_id=connection_info.destination_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return asdict(destination_response.configuration)
def is_job_complete(self) -> bool:
"""Check if the sync job is complete."""
return self.get_job_status() in FINAL_STATUSES
def get_job_status(self) -> JobStatusEnum:
"""Check if the sync job is still running."""
return self._fetch_latest_job_info().status
def _fetch_latest_job_info(self) -> JobResponse:
"""Return the job info for the sync job."""
if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
return self._latest_job_info
self._latest_job_info = api_util.get_job_info(
job_id=self.job_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return self._latest_job_info
@property
def bytes_synced(self) -> int:
"""Return the number of records processed."""
return self._fetch_latest_job_info().bytes_synced or 0
@property
def records_synced(self) -> int:
"""Return the number of records processed."""
return self._fetch_latest_job_info().rows_synced or 0
@property
def start_time(self) -> datetime:
"""Return the start time of the sync job in UTC."""
try:
return ab_datetime_parse(self._fetch_latest_job_info().start_time)
except (ValueError, TypeError) as e:
if "Invalid isoformat string" in str(e):
job_info_raw = api_util._make_config_api_request( # noqa: SLF001
api_root=self.workspace.api_root,
path="/jobs/get",
json={"id": self.job_id},
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
raw_start_time = job_info_raw.get("startTime")
if raw_start_time:
return ab_datetime_parse(raw_start_time)
raise
def _fetch_job_with_attempts(self) -> dict[str, Any]:
"""Fetch job info with attempts from Config API using lazy loading pattern."""
if self._job_with_attempts_info is not None:
return self._job_with_attempts_info
self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper
api_root=self.workspace.api_root,
path="/jobs/get",
json={
"id": self.job_id,
},
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
bearer_token=self.workspace.bearer_token,
)
return self._job_with_attempts_info
def get_attempts(self) -> list[SyncAttempt]:
"""Return a list of attempts for this sync job."""
job_with_attempts = self._fetch_job_with_attempts()
attempts_data = job_with_attempts.get("attempts", [])
return [
SyncAttempt(
workspace=self.workspace,
connection=self.connection,
job_id=self.job_id,
attempt_number=i,
_attempt_data=attempt_data,
)
for i, attempt_data in enumerate(attempts_data, start=0)
]
def raise_failure_status(
self,
*,
refresh_status: bool = False,
) -> None:
"""Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set `refresh_status=True`. If the job has failed, this
method will raise a `AirbyteConnectionSyncError`.
Otherwise, do nothing.
"""
if not refresh_status and self._latest_job_info:
latest_status = self._latest_job_info.status
else:
latest_status = self.get_job_status()
if latest_status in FAILED_STATUSES:
raise AirbyteConnectionSyncError(
workspace=self.workspace,
connection_id=self.connection.connection_id,
job_id=self.job_id,
job_status=self.get_job_status(),
)
def wait_for_completion(
self,
*,
wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
raise_timeout: bool = True,
raise_failure: bool = False,
) -> JobStatusEnum:
"""Wait for a job to finish running."""
start_time = time.time()
while True:
latest_status = self.get_job_status()
if latest_status in FINAL_STATUSES:
if raise_failure:
# No-op if the job succeeded or is still running:
self.raise_failure_status()
return latest_status
if time.time() - start_time > wait_timeout:
if raise_timeout:
raise AirbyteConnectionSyncTimeoutError(
workspace=self.workspace,
connection_id=self.connection.connection_id,
job_id=self.job_id,
job_status=latest_status,
timeout=wait_timeout,
)
return latest_status # This will be a non-final status
time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
def get_sql_cache(self) -> CacheBase:
"""Return a SQL Cache object for working with the data in a SQL-based destination's."""
if self._cache:
return self._cache
destination_configuration = self._get_destination_configuration()
self._cache = destination_to_cache(destination_configuration=destination_configuration)
return self._cache
def get_sql_engine(self) -> sqlalchemy.engine.Engine:
"""Return a SQL Engine for querying a SQL-based destination."""
return self.get_sql_cache().get_sql_engine()
def get_sql_table_name(self, stream_name: str) -> str:
"""Return the SQL table name of the named stream."""
return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
def get_sql_table(
self,
stream_name: str,
) -> sqlalchemy.Table:
"""Return a SQLAlchemy table object for the named stream."""
return self.get_sql_cache().processor.get_sql_table(stream_name)
def get_dataset(self, stream_name: str) -> CachedDataset:
"""Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
(catalog information) to the `CachedDataset` object via the "Get stream properties"
API: https://reference.airbyte.com/reference/getstreamproperties
"""
return CachedDataset(
self.get_sql_cache(),
stream_name=stream_name,
stream_configuration=False, # Don't look for stream configuration in cache.
)
def get_sql_database_name(self) -> str:
"""Return the SQL database name."""
cache = self.get_sql_cache()
return cache.get_database_name()
def get_sql_schema_name(self) -> str:
"""Return the SQL schema name."""
cache = self.get_sql_cache()
return cache.schema_name
@property
def stream_names(self) -> list[str]:
"""Return the set of stream names."""
return self.connection.stream_names
@final
@property
def streams(
self,
) -> _SyncResultStreams: # pyrefly: ignore[unknown-name]
"""Return a mapping of stream names to `airbyte.CachedDataset` objects.
This is a convenience wrapper around the `stream_names`
property and `get_dataset()` method.
"""
return self._SyncResultStreams(self)
class _SyncResultStreams(Mapping[str, CachedDataset]):
"""A mapping of stream names to cached datasets."""
def __init__(
self,
parent: SyncResult,
/,
) -> None:
self.parent: SyncResult = parent
def __getitem__(self, key: str) -> CachedDataset:
return self.parent.get_dataset(stream_name=key)
def __iter__(self) -> Iterator[str]:
return iter(self.parent.stream_names)
def __len__(self) -> int:
return len(self.parent.stream_names)
__all__ = [
"SyncResult",
"SyncAttempt",
]