Skip to content

Commit eda647b

Browse files
feat: Add interfaces for editing cloud source, destination, and connection configs (#841)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent cbc9437 commit eda647b

File tree

4 files changed

+575
-9
lines changed

4 files changed

+575
-9
lines changed

airbyte/_util/api_util.py

Lines changed: 183 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,57 @@ def delete_source(
564564
)
565565

566566

567+
def patch_source(
568+
source_id: str,
569+
*,
570+
api_root: str,
571+
client_id: SecretString,
572+
client_secret: SecretString,
573+
name: str | None = None,
574+
config: models.SourceConfiguration | dict[str, Any] | None = None,
575+
) -> models.SourceResponse:
576+
"""Update/patch a source configuration.
577+
578+
This is a destructive operation that can break existing connections if the
579+
configuration is changed incorrectly.
580+
581+
Args:
582+
source_id: The ID of the source to update
583+
api_root: The API root URL
584+
client_id: Client ID for authentication
585+
client_secret: Client secret for authentication
586+
name: Optional new name for the source
587+
config: Optional new configuration for the source
588+
589+
Returns:
590+
Updated SourceResponse object
591+
"""
592+
airbyte_instance = get_airbyte_server_instance(
593+
client_id=client_id,
594+
client_secret=client_secret,
595+
api_root=api_root,
596+
)
597+
response = airbyte_instance.sources.patch_source(
598+
api.PatchSourceRequest(
599+
source_id=source_id,
600+
source_patch_request=models.SourcePatchRequest(
601+
name=name,
602+
configuration=config,
603+
),
604+
),
605+
)
606+
if status_ok(response.status_code) and response.source_response:
607+
return response.source_response
608+
609+
raise AirbyteError(
610+
message="Could not update source.",
611+
context={
612+
"source_id": source_id,
613+
},
614+
response=response,
615+
)
616+
617+
567618
# Utility function
568619

569620

@@ -701,9 +752,80 @@ def delete_destination(
701752
)
702753

703754

755+
def patch_destination(
756+
destination_id: str,
757+
*,
758+
api_root: str,
759+
client_id: SecretString,
760+
client_secret: SecretString,
761+
name: str | None = None,
762+
config: DestinationConfiguration | dict[str, Any] | None = None,
763+
) -> models.DestinationResponse:
764+
"""Update/patch a destination configuration.
765+
766+
This is a destructive operation that can break existing connections if the
767+
configuration is changed incorrectly.
768+
769+
Args:
770+
destination_id: The ID of the destination to update
771+
api_root: The API root URL
772+
client_id: Client ID for authentication
773+
client_secret: Client secret for authentication
774+
name: Optional new name for the destination
775+
config: Optional new configuration for the destination
776+
777+
Returns:
778+
Updated DestinationResponse object
779+
"""
780+
airbyte_instance = get_airbyte_server_instance(
781+
client_id=client_id,
782+
client_secret=client_secret,
783+
api_root=api_root,
784+
)
785+
response = airbyte_instance.destinations.patch_destination(
786+
api.PatchDestinationRequest(
787+
destination_id=destination_id,
788+
destination_patch_request=models.DestinationPatchRequest(
789+
name=name,
790+
configuration=config,
791+
),
792+
),
793+
)
794+
if status_ok(response.status_code) and response.destination_response:
795+
return response.destination_response
796+
797+
raise AirbyteError(
798+
message="Could not update destination.",
799+
context={
800+
"destination_id": destination_id,
801+
},
802+
response=response,
803+
)
804+
805+
704806
# Create and delete connections
705807

706808

809+
def build_stream_configurations(
810+
stream_names: list[str],
811+
) -> models.StreamConfigurations:
812+
"""Build a StreamConfigurations object from a list of stream names.
813+
814+
This helper creates the proper API model structure for stream configurations.
815+
Used by both connection creation and updates.
816+
817+
Args:
818+
stream_names: List of stream names to include in the configuration
819+
820+
Returns:
821+
StreamConfigurations object ready for API submission
822+
"""
823+
stream_configurations = [
824+
models.StreamConfiguration(name=stream_name) for stream_name in stream_names
825+
]
826+
return models.StreamConfigurations(streams=stream_configurations)
827+
828+
707829
def create_connection( # noqa: PLR0913 # Too many arguments
708830
name: str,
709831
*,
@@ -722,15 +844,7 @@ def create_connection( # noqa: PLR0913 # Too many arguments
722844
client_secret=client_secret,
723845
api_root=api_root,
724846
)
725-
stream_configurations: list[models.StreamConfiguration] = []
726-
if selected_stream_names:
727-
for stream_name in selected_stream_names:
728-
stream_configuration = models.StreamConfiguration(
729-
name=stream_name,
730-
)
731-
stream_configurations.append(stream_configuration)
732-
733-
stream_configurations_obj = models.StreamConfigurations(stream_configurations)
847+
stream_configurations_obj = build_stream_configurations(selected_stream_names)
734848
response = airbyte_instance.connections.create_connection(
735849
models.ConnectionCreateRequest(
736850
name=name,
@@ -815,6 +929,66 @@ def delete_connection(
815929
)
816930

817931

932+
def patch_connection( # noqa: PLR0913 # Too many arguments
933+
connection_id: str,
934+
*,
935+
api_root: str,
936+
client_id: SecretString,
937+
client_secret: SecretString,
938+
name: str | None = None,
939+
configurations: models.StreamConfigurationsInput | None = None,
940+
schedule: models.AirbyteAPIConnectionSchedule | None = None,
941+
prefix: str | None = None,
942+
status: models.ConnectionStatusEnum | None = None,
943+
) -> models.ConnectionResponse:
944+
"""Update/patch a connection configuration.
945+
946+
This is a destructive operation that can break existing connections if the
947+
configuration is changed incorrectly.
948+
949+
Args:
950+
connection_id: The ID of the connection to update
951+
api_root: The API root URL
952+
client_id: Client ID for authentication
953+
client_secret: Client secret for authentication
954+
name: Optional new name for the connection
955+
configurations: Optional new stream configurations
956+
schedule: Optional new sync schedule
957+
prefix: Optional new table prefix
958+
status: Optional new connection status
959+
960+
Returns:
961+
Updated ConnectionResponse object
962+
"""
963+
airbyte_instance = get_airbyte_server_instance(
964+
client_id=client_id,
965+
client_secret=client_secret,
966+
api_root=api_root,
967+
)
968+
response = airbyte_instance.connections.patch_connection(
969+
api.PatchConnectionRequest(
970+
connection_id=connection_id,
971+
connection_patch_request=models.ConnectionPatchRequest(
972+
name=name,
973+
configurations=configurations,
974+
schedule=schedule,
975+
prefix=prefix,
976+
status=status,
977+
),
978+
),
979+
)
980+
if status_ok(response.status_code) and response.connection_response:
981+
return response.connection_response
982+
983+
raise AirbyteError(
984+
message="Could not update connection.",
985+
context={
986+
"connection_id": connection_id,
987+
},
988+
response=response,
989+
)
990+
991+
818992
# Functions for leveraging the Airbyte Config API (may not be supported or stable)
819993

820994

airbyte/cloud/connections.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,68 @@ def get_sync_result(
256256
job_id=job_id,
257257
)
258258

259+
def rename(self, name: str) -> CloudConnection:
260+
"""Rename the connection.
261+
262+
Args:
263+
name: New name for the connection
264+
265+
Returns:
266+
Updated CloudConnection object with refreshed info
267+
"""
268+
updated_response = api_util.patch_connection(
269+
connection_id=self.connection_id,
270+
api_root=self.workspace.api_root,
271+
client_id=self.workspace.client_id,
272+
client_secret=self.workspace.client_secret,
273+
name=name,
274+
)
275+
self._connection_info = updated_response
276+
return self
277+
278+
def set_table_prefix(self, prefix: str) -> CloudConnection:
279+
"""Set the table prefix for the connection.
280+
281+
Args:
282+
prefix: New table prefix to use when syncing to the destination
283+
284+
Returns:
285+
Updated CloudConnection object with refreshed info
286+
"""
287+
updated_response = api_util.patch_connection(
288+
connection_id=self.connection_id,
289+
api_root=self.workspace.api_root,
290+
client_id=self.workspace.client_id,
291+
client_secret=self.workspace.client_secret,
292+
prefix=prefix,
293+
)
294+
self._connection_info = updated_response
295+
return self
296+
297+
def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
298+
"""Set the selected streams for the connection.
299+
300+
This is a destructive operation that can break existing connections if the
301+
stream selection is changed incorrectly. Use with caution.
302+
303+
Args:
304+
stream_names: List of stream names to sync
305+
306+
Returns:
307+
Updated CloudConnection object with refreshed info
308+
"""
309+
configurations = api_util.build_stream_configurations(stream_names)
310+
311+
updated_response = api_util.patch_connection(
312+
connection_id=self.connection_id,
313+
api_root=self.workspace.api_root,
314+
client_id=self.workspace.client_id,
315+
client_secret=self.workspace.client_secret,
316+
configurations=configurations,
317+
)
318+
self._connection_info = updated_response
319+
return self
320+
259321
# Deletions
260322

261323
def permanently_delete(

airbyte/cloud/connectors.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,47 @@ def _fetch_connector_info(self) -> api_models.SourceResponse:
199199
client_secret=self.workspace.client_secret,
200200
)
201201

202+
def rename(self, name: str) -> CloudSource:
203+
"""Rename the source.
204+
205+
Args:
206+
name: New name for the source
207+
208+
Returns:
209+
Updated CloudSource object with refreshed info
210+
"""
211+
updated_response = api_util.patch_source(
212+
source_id=self.connector_id,
213+
api_root=self.workspace.api_root,
214+
client_id=self.workspace.client_id,
215+
client_secret=self.workspace.client_secret,
216+
name=name,
217+
)
218+
self._connector_info = updated_response
219+
return self
220+
221+
def update_config(self, config: dict[str, Any]) -> CloudSource:
222+
"""Update the source configuration.
223+
224+
This is a destructive operation that can break existing connections if the
225+
configuration is changed incorrectly. Use with caution.
226+
227+
Args:
228+
config: New configuration for the source
229+
230+
Returns:
231+
Updated CloudSource object with refreshed info
232+
"""
233+
updated_response = api_util.patch_source(
234+
source_id=self.connector_id,
235+
api_root=self.workspace.api_root,
236+
client_id=self.workspace.client_id,
237+
client_secret=self.workspace.client_secret,
238+
config=config,
239+
)
240+
self._connector_info = updated_response
241+
return self
242+
202243
@classmethod
203244
def _from_source_response(
204245
cls,
@@ -240,6 +281,47 @@ def _fetch_connector_info(self) -> api_models.DestinationResponse:
240281
client_secret=self.workspace.client_secret,
241282
)
242283

284+
def rename(self, name: str) -> CloudDestination:
285+
"""Rename the destination.
286+
287+
Args:
288+
name: New name for the destination
289+
290+
Returns:
291+
Updated CloudDestination object with refreshed info
292+
"""
293+
updated_response = api_util.patch_destination(
294+
destination_id=self.connector_id,
295+
api_root=self.workspace.api_root,
296+
client_id=self.workspace.client_id,
297+
client_secret=self.workspace.client_secret,
298+
name=name,
299+
)
300+
self._connector_info = updated_response
301+
return self
302+
303+
def update_config(self, config: dict[str, Any]) -> CloudDestination:
304+
"""Update the destination configuration.
305+
306+
This is a destructive operation that can break existing connections if the
307+
configuration is changed incorrectly. Use with caution.
308+
309+
Args:
310+
config: New configuration for the destination
311+
312+
Returns:
313+
Updated CloudDestination object with refreshed info
314+
"""
315+
updated_response = api_util.patch_destination(
316+
destination_id=self.connector_id,
317+
api_root=self.workspace.api_root,
318+
client_id=self.workspace.client_id,
319+
client_secret=self.workspace.client_secret,
320+
config=config,
321+
)
322+
self._connector_info = updated_response
323+
return self
324+
243325
@classmethod
244326
def _from_destination_response(
245327
cls,

0 commit comments

Comments
 (0)