Skip to content

Commit 02ee440

Browse files
aaronsteersdevin-ai-integration[bot]octavia-squidington-iii
authored
feat: add pagination support and name attribute for cloud objects: CloudConnection, CloudSource, and CloudDestination (#782)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: octavia-squidington-iii <contact@airbyte.com>
1 parent 6009b69 commit 02ee440

File tree

5 files changed

+216
-77
lines changed

5 files changed

+216
-77
lines changed

airbyte/_util/api_util.py

Lines changed: 104 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -150,25 +150,34 @@ def list_connections(
150150
client_secret=client_secret,
151151
api_root=api_root,
152152
)
153-
response = airbyte_instance.connections.list_connections(
154-
api.ListConnectionsRequest(
155-
workspace_ids=[workspace_id],
156-
),
157-
)
158-
159-
if not status_ok(response.status_code) and response.connections_response:
160-
raise AirbyteError(
161-
context={
162-
"workspace_id": workspace_id,
163-
"response": response,
164-
}
153+
result: list[models.ConnectionResponse] = []
154+
has_more = True
155+
offset, page_size = 0, 100
156+
while has_more:
157+
response = airbyte_instance.connections.list_connections(
158+
api.ListConnectionsRequest(
159+
workspace_ids=[workspace_id],
160+
offset=offset,
161+
limit=page_size,
162+
),
165163
)
166-
assert response.connections_response is not None
167-
return [
168-
connection
169-
for connection in response.connections_response.data
170-
if name_filter(connection.name)
171-
]
164+
has_more = bool(response.connections_response and response.connections_response.next)
165+
offset += page_size
166+
167+
if not status_ok(response.status_code) and response.connections_response:
168+
raise AirbyteError(
169+
context={
170+
"workspace_id": workspace_id,
171+
"response": response,
172+
}
173+
)
174+
assert response.connections_response is not None
175+
result += [
176+
connection
177+
for connection in response.connections_response.data
178+
if name_filter(connection.name)
179+
]
180+
return result
172181

173182

174183
def list_workspaces(
@@ -192,24 +201,32 @@ def list_workspaces(
192201
client_secret=client_secret,
193202
api_root=api_root,
194203
)
204+
result: list[models.WorkspaceResponse] = []
205+
has_more = True
206+
offset, page_size = 0, 100
207+
while has_more:
208+
response: api.ListWorkspacesResponse = airbyte_instance.workspaces.list_workspaces(
209+
api.ListWorkspacesRequest(workspace_ids=[workspace_id], offset=offset, limit=page_size),
210+
)
211+
has_more = bool(response.workspaces_response and response.workspaces_response.next)
212+
offset += page_size
195213

196-
response: api.ListWorkspacesResponse = airbyte_instance.workspaces.list_workspaces(
197-
api.ListWorkspacesRequest(
198-
workspace_ids=[workspace_id],
199-
),
200-
)
214+
if not status_ok(response.status_code) and response.workspaces_response:
215+
raise AirbyteError(
216+
context={
217+
"workspace_id": workspace_id,
218+
"response": response,
219+
}
220+
)
201221

202-
if not status_ok(response.status_code) and response.workspaces_response:
203-
raise AirbyteError(
204-
context={
205-
"workspace_id": workspace_id,
206-
"response": response,
207-
}
208-
)
209-
assert response.workspaces_response is not None
210-
return [
211-
workspace for workspace in response.workspaces_response.data if name_filter(workspace.name)
212-
]
222+
assert response.workspaces_response is not None
223+
result += [
224+
workspace
225+
for workspace in response.workspaces_response.data
226+
if name_filter(workspace.name)
227+
]
228+
229+
return result
213230

214231

215232
def list_sources(
@@ -233,21 +250,31 @@ def list_sources(
233250
client_secret=client_secret,
234251
api_root=api_root,
235252
)
236-
response: api.ListSourcesResponse = airbyte_instance.sources.list_sources(
237-
api.ListSourcesRequest(
238-
workspace_ids=[workspace_id],
239-
),
240-
)
241-
242-
if not status_ok(response.status_code) and response.sources_response:
243-
raise AirbyteError(
244-
context={
245-
"workspace_id": workspace_id,
246-
"response": response,
247-
}
253+
result: list[models.SourceResponse] = []
254+
has_more = True
255+
offset, page_size = 0, 100
256+
while has_more:
257+
response: api.ListSourcesResponse = airbyte_instance.sources.list_sources(
258+
api.ListSourcesRequest(
259+
workspace_ids=[workspace_id],
260+
offset=offset,
261+
limit=page_size,
262+
),
248263
)
249-
assert response.sources_response is not None
250-
return [source for source in response.sources_response.data if name_filter(source.name)]
264+
has_more = bool(response.sources_response and response.sources_response.next)
265+
offset += page_size
266+
267+
if not status_ok(response.status_code) and response.sources_response:
268+
raise AirbyteError(
269+
context={
270+
"workspace_id": workspace_id,
271+
"response": response,
272+
}
273+
)
274+
assert response.sources_response is not None
275+
result += [source for source in response.sources_response.data if name_filter(source.name)]
276+
277+
return result
251278

252279

253280
def list_destinations(
@@ -271,25 +298,35 @@ def list_destinations(
271298
client_secret=client_secret,
272299
api_root=api_root,
273300
)
274-
response = airbyte_instance.destinations.list_destinations(
275-
api.ListDestinationsRequest(
276-
workspace_ids=[workspace_id],
277-
),
278-
)
279-
280-
if not status_ok(response.status_code) and response.destinations_response:
281-
raise AirbyteError(
282-
context={
283-
"workspace_id": workspace_id,
284-
"response": response,
285-
}
301+
result: list[models.DestinationResponse] = []
302+
has_more = True
303+
offset, page_size = 0, 100
304+
while has_more:
305+
response = airbyte_instance.destinations.list_destinations(
306+
api.ListDestinationsRequest(
307+
workspace_ids=[workspace_id],
308+
offset=offset,
309+
limit=page_size,
310+
),
286311
)
287-
assert response.destinations_response is not None
288-
return [
289-
destination
290-
for destination in response.destinations_response.data
291-
if name_filter(destination.name)
292-
]
312+
has_more = bool(response.destinations_response and response.destinations_response.next)
313+
offset += page_size
314+
315+
if not status_ok(response.status_code) and response.destinations_response:
316+
raise AirbyteError(
317+
context={
318+
"workspace_id": workspace_id,
319+
"response": response,
320+
}
321+
)
322+
assert response.destinations_response is not None
323+
result += [
324+
destination
325+
for destination in response.destinations_response.data
326+
if name_filter(destination.name)
327+
]
328+
329+
return result
293330

294331

295332
# Get and run connections
@@ -369,7 +406,7 @@ def run_connection(
369406
def get_job_logs(
370407
workspace_id: str,
371408
connection_id: str,
372-
limit: int = 20,
409+
limit: int = 100,
373410
*,
374411
api_root: str,
375412
client_id: SecretString,

airbyte/cloud/connections.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,35 @@ def _fetch_connection_info(self) -> ConnectionResponse:
6464
client_secret=self.workspace.client_secret,
6565
)
6666

67+
@classmethod
68+
def _from_connection_response(
69+
cls,
70+
workspace: CloudWorkspace,
71+
connection_response: ConnectionResponse,
72+
) -> CloudConnection:
73+
"""Create a CloudConnection from a ConnectionResponse."""
74+
result = cls(
75+
workspace=workspace,
76+
connection_id=connection_response.connection_id,
77+
source=connection_response.source_id,
78+
destination=connection_response.destination_id,
79+
)
80+
result._connection_info = connection_response # noqa: SLF001 # Accessing Non-Public API
81+
return result
82+
6783
# Properties
6884

85+
@property
86+
def name(self) -> str | None:
87+
"""Get the display name of the connection, if available.
88+
89+
E.g. "My Postgres to Snowflake", not the connection ID.
90+
"""
91+
if not self._connection_info:
92+
self._connection_info = self._fetch_connection_info()
93+
94+
return self._connection_info.name
95+
6996
@property
7097
def source_id(self) -> str:
7198
"""The ID of the source."""

airbyte/cloud/connectors.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
from dataclasses import dataclass
4444
from typing import TYPE_CHECKING, ClassVar, Literal
4545

46+
from airbyte_api import models as api_models # noqa: TC002
47+
4648
from airbyte._util import api_util
4749

4850

@@ -99,6 +101,27 @@ def __init__(
99101
self.connector_id = connector_id
100102
"""The ID of the connector."""
101103

104+
self._connector_info: api_models.SourceResponse | api_models.DestinationResponse | None = (
105+
None
106+
)
107+
"""The connection info object. (Cached.)"""
108+
109+
@property
110+
def name(self) -> str | None:
111+
"""Get the display name of the connector, if available.
112+
113+
E.g. "My Postgres Source", not the canonical connector name ("source-postgres").
114+
"""
115+
if not self._connector_info:
116+
self._connector_info = self._fetch_connector_info()
117+
118+
return self._connector_info.name
119+
120+
@abc.abstractmethod
121+
def _fetch_connector_info(self) -> api_models.SourceResponse | api_models.DestinationResponse:
122+
"""Populate the connector with data from the API."""
123+
...
124+
102125
@property
103126
def connector_url(self) -> str:
104127
"""Get the web URL of the source connector."""
@@ -164,6 +187,32 @@ def source_id(self) -> str:
164187
"""
165188
return self.connector_id
166189

190+
def _fetch_connector_info(self) -> api_models.SourceResponse:
191+
"""Populate the source with data from the API."""
192+
return api_util.get_source(
193+
source_id=self.connector_id,
194+
api_root=self.workspace.api_root,
195+
client_id=self.workspace.client_id,
196+
client_secret=self.workspace.client_secret,
197+
)
198+
199+
@classmethod
200+
def _from_source_response(
201+
cls,
202+
workspace: CloudWorkspace,
203+
source_response: api_models.SourceResponse,
204+
) -> CloudSource:
205+
"""Internal factory method.
206+
207+
Creates a CloudSource object from a REST API SourceResponse object.
208+
"""
209+
result = cls(
210+
workspace=workspace,
211+
connector_id=source_response.source_id,
212+
)
213+
result._connector_info = source_response # noqa: SLF001 # Accessing Non-Public API
214+
return result
215+
167216

168217
class CloudDestination(CloudConnector):
169218
"""A cloud destination is a destination that is deployed on Airbyte Cloud."""
@@ -178,3 +227,29 @@ def destination_id(self) -> str:
178227
This is an alias for `connector_id`.
179228
"""
180229
return self.connector_id
230+
231+
def _fetch_connector_info(self) -> api_models.DestinationResponse:
232+
"""Populate the destination with data from the API."""
233+
return api_util.get_destination(
234+
destination_id=self.connector_id,
235+
api_root=self.workspace.api_root,
236+
client_id=self.workspace.client_id,
237+
client_secret=self.workspace.client_secret,
238+
)
239+
240+
@classmethod
241+
def _from_destination_response(
242+
cls,
243+
workspace: CloudWorkspace,
244+
destination_response: api_models.DestinationResponse,
245+
) -> CloudDestination:
246+
"""Internal factory method.
247+
248+
Creates a CloudDestination object from a REST API DestinationResponse object.
249+
"""
250+
result = cls(
251+
workspace=workspace,
252+
connector_id=destination_response.destination_id,
253+
)
254+
result._connector_info = destination_response # noqa: SLF001 # Accessing Non-Public API
255+
return result

airbyte/cloud/workspaces.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -389,11 +389,9 @@ def list_connections(
389389
client_secret=self.client_secret,
390390
)
391391
return [
392-
CloudConnection(
392+
CloudConnection._from_connection_response( # noqa: SLF001 (non-public API)
393393
workspace=self,
394-
connection_id=connection.connection_id,
395-
source=None,
396-
destination=None,
394+
connection_response=connection,
397395
)
398396
for connection in connections
399397
if name is None or connection.name == name
@@ -418,9 +416,9 @@ def list_sources(
418416
client_secret=self.client_secret,
419417
)
420418
return [
421-
CloudSource(
419+
CloudSource._from_source_response( # noqa: SLF001 (non-public API)
422420
workspace=self,
423-
connector_id=source.source_id,
421+
source_response=source,
424422
)
425423
for source in sources
426424
if name is None or source.name == name
@@ -445,9 +443,9 @@ def list_destinations(
445443
client_secret=self.client_secret,
446444
)
447445
return [
448-
CloudDestination(
446+
CloudDestination._from_destination_response( # noqa: SLF001 (non-public API)
449447
workspace=self,
450-
connector_id=destination.destination_id,
448+
destination_response=destination,
451449
)
452450
for destination in destinations
453451
if name is None or destination.name == name

0 commit comments

Comments
 (0)