Skip to content

Commit be8d806

Browse files
feat(sql): Add guard statements for primary key validation in merge operations
- Replace 'or []' coalescing with explicit guard statements in merge methods - Raise AirbyteInternalError when no primary keys available for merge operations - Addresses @aaronsteers feedback on code clarity and error handling - Ensures merge operations fail fast when primary keys are missing Co-Authored-By: AJ Steers <[email protected]>
1 parent 65e8e87 commit be8d806

File tree

1 file changed

+13
-5
lines changed

1 file changed

+13
-5
lines changed

airbyte_cdk/sql/shared/sql_processor.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -666,10 +666,13 @@ def _merge_temp_table_to_final_table(
666666
"""
667667
nl = "\n"
668668
columns = {self._quote_identifier(c) for c in self._get_sql_column_definitions(stream_name)}
669-
pk_columns = {
670-
self._quote_identifier(c)
671-
for c in (self.catalog_provider.get_primary_keys(stream_name) or [])
672-
}
669+
primary_keys = self.catalog_provider.get_primary_keys(stream_name)
670+
if not primary_keys:
671+
raise exc.AirbyteInternalError(
672+
message="Cannot merge tables without primary keys. Primary keys are required for merge operations.",
673+
context={"stream_name": stream_name},
674+
)
675+
pk_columns = {self._quote_identifier(c) for c in primary_keys}
673676
non_pk_columns = columns - pk_columns
674677
join_clause = f"{nl} AND ".join(f"tmp.{pk_col} = final.{pk_col}" for pk_col in pk_columns)
675678
set_clause = f"{nl} , ".join(f"{col} = tmp.{col}" for col in non_pk_columns)
@@ -725,7 +728,12 @@ def _emulated_merge_temp_table_to_final_table(
725728
"""
726729
final_table = self._get_table_by_name(final_table_name)
727730
temp_table = self._get_table_by_name(temp_table_name)
728-
pk_columns = self.catalog_provider.get_primary_keys(stream_name) or []
731+
pk_columns = self.catalog_provider.get_primary_keys(stream_name)
732+
if not pk_columns:
733+
raise exc.AirbyteInternalError(
734+
message="Cannot merge tables without primary keys. Primary keys are required for merge operations.",
735+
context={"stream_name": stream_name},
736+
)
729737

730738
columns_to_update: set[str] = self._get_sql_column_definitions(
731739
stream_name=stream_name

0 commit comments

Comments
 (0)