-
Notifications
You must be signed in to change notification settings - Fork 71
Expand file tree
/
Copy path__init__.py
More file actions
83 lines (62 loc) · 2.13 KB
/
__init__.py
File metadata and controls
83 lines (62 loc) · 2.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
## Examples
### Basic Sync Example:
```python
import airbyte as ab
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())
```
### Example Read From Cloud Destination:
If your destination is supported, you can read records directly from the
`SyncResult` object. Currently this is supported in Snowflake and BigQuery only.
```python
# Assuming we've already created a `connection` object...
# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
```
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from airbyte.cloud.connections import CloudConnection
from airbyte.cloud.constants import JobStatusEnum
from airbyte.cloud.credentials import CloudCredentials
from airbyte.cloud.sync_results import SyncResult
from airbyte.cloud.workspaces import CloudWorkspace
# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
if TYPE_CHECKING:
# ruff: noqa: TC004
from airbyte.cloud import connections, constants, credentials, sync_results, workspaces
__all__ = [
# Submodules
"workspaces",
"connections",
"constants",
"credentials",
"sync_results",
# Classes
"CloudWorkspace",
"CloudConnection",
"CloudCredentials",
"SyncResult",
# Enums
"JobStatusEnum",
]