Skip to content

Commit 065e737

Browse files
fix(sql): Add fallback to source_defined_primary_key in CatalogProvider
- Fix CatalogProvider.get_primary_keys() to fall back to stream.source_defined_primary_key when primary_key is empty/None - Add comprehensive unit tests covering all fallback scenarios - Resolves bug where destinations ignore source-defined primary keys when configured primary key is not set - Affects all destinations using CDK SQL processor base classes Co-Authored-By: AJ Steers <[email protected]>
1 parent 4629a12 commit 065e737

File tree

2 files changed

+166
-2
lines changed

2 files changed

+166
-2
lines changed

airbyte_cdk/sql/shared/catalog_providers.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,13 @@ def get_primary_keys(
121121
stream_name: str,
122122
) -> list[str]:
123123
"""Return the primary keys for the given stream."""
124-
pks = self.get_configured_stream_info(stream_name).primary_key
124+
configured_stream = self.get_configured_stream_info(stream_name)
125+
pks = configured_stream.primary_key
126+
125127
if not pks:
126-
return []
128+
pks = configured_stream.stream.source_defined_primary_key
129+
if not pks:
130+
return []
127131

128132
normalized_pks: list[list[str]] = [
129133
[LowerCaseNormalizer.normalize(c) for c in pk] for pk in pks
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
from unittest.mock import Mock
2+
3+
import pytest
4+
5+
from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream
6+
from airbyte_cdk.sql.shared.catalog_providers import CatalogProvider
7+
8+
9+
class TestCatalogProvider:
10+
"""Test cases for CatalogProvider.get_primary_keys() method."""
11+
12+
def test_get_primary_keys_uses_configured_primary_key_when_set(self):
13+
"""Test that configured primary_key is used when set."""
14+
stream = AirbyteStream(
15+
name="test_stream",
16+
json_schema={"type": "object", "properties": {"id": {"type": "string"}}},
17+
supported_sync_modes=["full_refresh"],
18+
source_defined_primary_key=[["source_id"]],
19+
)
20+
configured_stream = ConfiguredAirbyteStream(
21+
stream=stream,
22+
sync_mode="full_refresh",
23+
destination_sync_mode="overwrite",
24+
primary_key=[["configured_id"]],
25+
)
26+
catalog = ConfiguredAirbyteCatalog(streams=[configured_stream])
27+
28+
provider = CatalogProvider(catalog)
29+
result = provider.get_primary_keys("test_stream")
30+
31+
assert result == ["configured_id"]
32+
33+
def test_get_primary_keys_falls_back_to_source_defined_when_configured_empty(self):
34+
"""Test that source_defined_primary_key is used when primary_key is empty."""
35+
stream = AirbyteStream(
36+
name="test_stream",
37+
json_schema={"type": "object", "properties": {"id": {"type": "string"}}},
38+
supported_sync_modes=["full_refresh"],
39+
source_defined_primary_key=[["source_id"]],
40+
)
41+
configured_stream = ConfiguredAirbyteStream(
42+
stream=stream,
43+
sync_mode="full_refresh",
44+
destination_sync_mode="overwrite",
45+
primary_key=[], # Empty configured primary key
46+
)
47+
catalog = ConfiguredAirbyteCatalog(streams=[configured_stream])
48+
49+
provider = CatalogProvider(catalog)
50+
result = provider.get_primary_keys("test_stream")
51+
52+
assert result == ["source_id"]
53+
54+
def test_get_primary_keys_falls_back_to_source_defined_when_configured_none(self):
55+
"""Test that source_defined_primary_key is used when primary_key is None."""
56+
stream = AirbyteStream(
57+
name="test_stream",
58+
json_schema={"type": "object", "properties": {"id": {"type": "string"}}},
59+
supported_sync_modes=["full_refresh"],
60+
source_defined_primary_key=[["source_id"]],
61+
)
62+
configured_stream = ConfiguredAirbyteStream(
63+
stream=stream,
64+
sync_mode="full_refresh",
65+
destination_sync_mode="overwrite",
66+
primary_key=None, # None configured primary key
67+
)
68+
catalog = ConfiguredAirbyteCatalog(streams=[configured_stream])
69+
70+
provider = CatalogProvider(catalog)
71+
result = provider.get_primary_keys("test_stream")
72+
73+
assert result == ["source_id"]
74+
75+
def test_get_primary_keys_returns_empty_when_both_empty(self):
76+
"""Test that empty list is returned when both primary keys are empty."""
77+
stream = AirbyteStream(
78+
name="test_stream",
79+
json_schema={"type": "object", "properties": {"id": {"type": "string"}}},
80+
supported_sync_modes=["full_refresh"],
81+
source_defined_primary_key=[], # Empty source-defined primary key
82+
)
83+
configured_stream = ConfiguredAirbyteStream(
84+
stream=stream,
85+
sync_mode="full_refresh",
86+
destination_sync_mode="overwrite",
87+
primary_key=[], # Empty configured primary key
88+
)
89+
catalog = ConfiguredAirbyteCatalog(streams=[configured_stream])
90+
91+
provider = CatalogProvider(catalog)
92+
result = provider.get_primary_keys("test_stream")
93+
94+
assert result == []
95+
96+
def test_get_primary_keys_returns_empty_when_both_none(self):
97+
"""Test that empty list is returned when both primary keys are None."""
98+
stream = AirbyteStream(
99+
name="test_stream",
100+
json_schema={"type": "object", "properties": {"id": {"type": "string"}}},
101+
supported_sync_modes=["full_refresh"],
102+
source_defined_primary_key=None, # None source-defined primary key
103+
)
104+
configured_stream = ConfiguredAirbyteStream(
105+
stream=stream,
106+
sync_mode="full_refresh",
107+
destination_sync_mode="overwrite",
108+
primary_key=None, # None configured primary key
109+
)
110+
catalog = ConfiguredAirbyteCatalog(streams=[configured_stream])
111+
112+
provider = CatalogProvider(catalog)
113+
result = provider.get_primary_keys("test_stream")
114+
115+
assert result == []
116+
117+
def test_get_primary_keys_handles_composite_keys_from_source_defined(self):
118+
"""Test that composite primary keys work correctly with source-defined fallback."""
119+
stream = AirbyteStream(
120+
name="test_stream",
121+
json_schema={
122+
"type": "object",
123+
"properties": {"id1": {"type": "string"}, "id2": {"type": "string"}},
124+
},
125+
supported_sync_modes=["full_refresh"],
126+
source_defined_primary_key=[["id1"], ["id2"]], # Composite primary key
127+
)
128+
configured_stream = ConfiguredAirbyteStream(
129+
stream=stream,
130+
sync_mode="full_refresh",
131+
destination_sync_mode="overwrite",
132+
primary_key=[], # Empty configured primary key
133+
)
134+
catalog = ConfiguredAirbyteCatalog(streams=[configured_stream])
135+
136+
provider = CatalogProvider(catalog)
137+
result = provider.get_primary_keys("test_stream")
138+
139+
assert result == ["id1", "id2"]
140+
141+
def test_get_primary_keys_normalizes_case_for_source_defined(self):
142+
"""Test that primary keys from source-defined are normalized to lowercase."""
143+
stream = AirbyteStream(
144+
name="test_stream",
145+
json_schema={"type": "object", "properties": {"ID": {"type": "string"}}},
146+
supported_sync_modes=["full_refresh"],
147+
source_defined_primary_key=[["ID"]], # Uppercase primary key
148+
)
149+
configured_stream = ConfiguredAirbyteStream(
150+
stream=stream,
151+
sync_mode="full_refresh",
152+
destination_sync_mode="overwrite",
153+
primary_key=[], # Empty configured primary key
154+
)
155+
catalog = ConfiguredAirbyteCatalog(streams=[configured_stream])
156+
157+
provider = CatalogProvider(catalog)
158+
result = provider.get_primary_keys("test_stream")
159+
160+
assert result == ["id"]

0 commit comments

Comments
 (0)