Skip to content

Commit 65e8e87

Browse files
feat(sql): Prioritize source_defined_primary_key and return None when unset
- Reverse priority order to favor source_defined_primary_key over primary_key - Return None when neither primary key field is set - Update all callers to handle None gracefully with 'pks or []' coalescing - Update unit tests to reflect new behavior and priority order - Addresses @aaronsteers feedback on primary key fallback logic Co-Authored-By: AJ Steers <[email protected]>
1 parent b4aa7df commit 65e8e87

File tree

3 files changed

+26
-15
lines changed

3 files changed

+26
-15
lines changed

airbyte_cdk/sql/shared/catalog_providers.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -119,18 +119,16 @@ def get_stream_properties(
119119
def get_primary_keys(
120120
self,
121121
stream_name: str,
122-
) -> list[str]:
122+
) -> list[str] | None:
123123
"""Return the primary keys for the given stream.
124124
125-
We will use `primary_key` if it is set explicitly in the configured catalog,
126-
otherwise we will fall back to `source_defined_primary_key`, if set.
125+
We return `source_defined_primary_key` if set, or `primary_key` otherwise. If both are set, we assume they should not should differ, since Airbyte data integrity constraints do not permit overruling a source's pre-defined primary keys. If neither is set, we return `None`.
127126
"""
128127
configured_stream = self.get_configured_stream_info(stream_name)
129-
pks = (
130-
configured_stream.primary_key
131-
or configured_stream.stream.source_defined_primary_key
132-
or []
133-
)
128+
pks = configured_stream.stream.source_defined_primary_key or configured_stream.primary_key
129+
130+
if not pks:
131+
return None
134132

135133
normalized_pks: list[list[str]] = [
136134
[LowerCaseNormalizer.normalize(c) for c in pk] for pk in pks

airbyte_cdk/sql/shared/sql_processor.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,8 @@ def _merge_temp_table_to_final_table(
667667
nl = "\n"
668668
columns = {self._quote_identifier(c) for c in self._get_sql_column_definitions(stream_name)}
669669
pk_columns = {
670-
self._quote_identifier(c) for c in self.catalog_provider.get_primary_keys(stream_name)
670+
self._quote_identifier(c)
671+
for c in (self.catalog_provider.get_primary_keys(stream_name) or [])
671672
}
672673
non_pk_columns = columns - pk_columns
673674
join_clause = f"{nl} AND ".join(f"tmp.{pk_col} = final.{pk_col}" for pk_col in pk_columns)
@@ -724,7 +725,7 @@ def _emulated_merge_temp_table_to_final_table(
724725
"""
725726
final_table = self._get_table_by_name(final_table_name)
726727
temp_table = self._get_table_by_name(temp_table_name)
727-
pk_columns = self.catalog_provider.get_primary_keys(stream_name)
728+
pk_columns = self.catalog_provider.get_primary_keys(stream_name) or []
728729

729730
columns_to_update: set[str] = self._get_sql_column_definitions(
730731
stream_name=stream_name

unit_tests/sql/shared/test_catalog_providers.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,23 @@ class TestCatalogProvider:
1212
@pytest.mark.parametrize(
1313
"configured_primary_key,source_defined_primary_key,expected_result,test_description",
1414
[
15-
(["configured_id"], ["source_id"], ["configured_id"], "uses configured when both set"),
16-
([], ["source_id"], ["source_id"], "falls back to source when configured empty"),
17-
(None, ["source_id"], ["source_id"], "falls back to source when configured None"),
18-
([], [], [], "returns empty when both empty"),
19-
(None, None, [], "returns empty when both None"),
15+
(["configured_id"], ["source_id"], ["source_id"], "prioritizes source when both set"),
16+
([], ["source_id"], ["source_id"], "uses source when configured empty"),
17+
(None, ["source_id"], ["source_id"], "uses source when configured None"),
18+
(
19+
["configured_id"],
20+
[],
21+
["configured_id"],
22+
"falls back to configured when source empty",
23+
),
24+
(
25+
["configured_id"],
26+
None,
27+
["configured_id"],
28+
"falls back to configured when source None",
29+
),
30+
([], [], None, "returns None when both empty"),
31+
(None, None, None, "returns None when both None"),
2032
([], ["id1", "id2"], ["id1", "id2"], "handles composite keys from source"),
2133
],
2234
)

0 commit comments

Comments
 (0)