|
9 | 9 | class TestCatalogProvider: |
10 | 10 | """Test cases for CatalogProvider.get_primary_keys() method.""" |
11 | 11 |
|
12 | | - def test_get_primary_keys_uses_configured_primary_key_when_set(self): |
13 | | - """Test that configured primary_key is used when set.""" |
14 | | - stream = AirbyteStream( |
15 | | - name="test_stream", |
16 | | - json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, |
17 | | - supported_sync_modes=["full_refresh"], |
18 | | - source_defined_primary_key=[["source_id"]], |
19 | | - ) |
20 | | - configured_stream = ConfiguredAirbyteStream( |
21 | | - stream=stream, |
22 | | - sync_mode="full_refresh", |
23 | | - destination_sync_mode="overwrite", |
24 | | - primary_key=[["configured_id"]], |
25 | | - ) |
26 | | - catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) |
27 | | - |
28 | | - provider = CatalogProvider(catalog) |
29 | | - result = provider.get_primary_keys("test_stream") |
30 | | - |
31 | | - assert result == ["configured_id"] |
32 | | - |
33 | | - def test_get_primary_keys_falls_back_to_source_defined_when_configured_empty(self): |
34 | | - """Test that source_defined_primary_key is used when primary_key is empty.""" |
35 | | - stream = AirbyteStream( |
36 | | - name="test_stream", |
37 | | - json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, |
38 | | - supported_sync_modes=["full_refresh"], |
39 | | - source_defined_primary_key=[["source_id"]], |
40 | | - ) |
41 | | - configured_stream = ConfiguredAirbyteStream( |
42 | | - stream=stream, |
43 | | - sync_mode="full_refresh", |
44 | | - destination_sync_mode="overwrite", |
45 | | - primary_key=[], # Empty configured primary key |
46 | | - ) |
47 | | - catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) |
48 | | - |
49 | | - provider = CatalogProvider(catalog) |
50 | | - result = provider.get_primary_keys("test_stream") |
51 | | - |
52 | | - assert result == ["source_id"] |
53 | | - |
54 | | - def test_get_primary_keys_falls_back_to_source_defined_when_configured_none(self): |
55 | | - """Test that source_defined_primary_key is used when primary_key is None.""" |
56 | | - stream = AirbyteStream( |
57 | | - name="test_stream", |
58 | | - json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, |
59 | | - supported_sync_modes=["full_refresh"], |
60 | | - source_defined_primary_key=[["source_id"]], |
| 12 | + @pytest.mark.parametrize( |
| 13 | + "configured_primary_key,source_defined_primary_key,expected_result,test_description", |
| 14 | + [ |
| 15 | + (["configured_id"], ["source_id"], ["configured_id"], "uses configured when both set"), |
| 16 | + ([], ["source_id"], ["source_id"], "falls back to source when configured empty"), |
| 17 | + (None, ["source_id"], ["source_id"], "falls back to source when configured None"), |
| 18 | + ([], [], [], "returns empty when both empty"), |
| 19 | + (None, None, [], "returns empty when both None"), |
| 20 | + ([], ["id1", "id2"], ["id1", "id2"], "handles composite keys from source"), |
| 21 | + ], |
| 22 | + ) |
| 23 | + def test_get_primary_keys_parametrized( |
| 24 | + self, configured_primary_key, source_defined_primary_key, expected_result, test_description |
| 25 | + ): |
| 26 | + """Test primary key fallback logic with various input combinations.""" |
| 27 | + configured_pk_wrapped = ( |
| 28 | + None |
| 29 | + if configured_primary_key is None |
| 30 | + else [[pk] for pk in configured_primary_key] |
| 31 | + if configured_primary_key |
| 32 | + else [] |
61 | 33 | ) |
62 | | - configured_stream = ConfiguredAirbyteStream( |
63 | | - stream=stream, |
64 | | - sync_mode="full_refresh", |
65 | | - destination_sync_mode="overwrite", |
66 | | - primary_key=None, # None configured primary key |
| 34 | + source_pk_wrapped = ( |
| 35 | + None |
| 36 | + if source_defined_primary_key is None |
| 37 | + else [[pk] for pk in source_defined_primary_key] |
| 38 | + if source_defined_primary_key |
| 39 | + else [] |
67 | 40 | ) |
68 | | - catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) |
69 | 41 |
|
70 | | - provider = CatalogProvider(catalog) |
71 | | - result = provider.get_primary_keys("test_stream") |
72 | | - |
73 | | - assert result == ["source_id"] |
74 | | - |
75 | | - def test_get_primary_keys_returns_empty_when_both_empty(self): |
76 | | - """Test that empty list is returned when both primary keys are empty.""" |
77 | | - stream = AirbyteStream( |
78 | | - name="test_stream", |
79 | | - json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, |
80 | | - supported_sync_modes=["full_refresh"], |
81 | | - source_defined_primary_key=[], # Empty source-defined primary key |
82 | | - ) |
83 | | - configured_stream = ConfiguredAirbyteStream( |
84 | | - stream=stream, |
85 | | - sync_mode="full_refresh", |
86 | | - destination_sync_mode="overwrite", |
87 | | - primary_key=[], # Empty configured primary key |
88 | | - ) |
89 | | - catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) |
90 | | - |
91 | | - provider = CatalogProvider(catalog) |
92 | | - result = provider.get_primary_keys("test_stream") |
93 | | - |
94 | | - assert result == [] |
95 | | - |
96 | | - def test_get_primary_keys_returns_empty_when_both_none(self): |
97 | | - """Test that empty list is returned when both primary keys are None.""" |
98 | | - stream = AirbyteStream( |
99 | | - name="test_stream", |
100 | | - json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, |
101 | | - supported_sync_modes=["full_refresh"], |
102 | | - source_defined_primary_key=None, # None source-defined primary key |
103 | | - ) |
104 | | - configured_stream = ConfiguredAirbyteStream( |
105 | | - stream=stream, |
106 | | - sync_mode="full_refresh", |
107 | | - destination_sync_mode="overwrite", |
108 | | - primary_key=None, # None configured primary key |
109 | | - ) |
110 | | - catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) |
111 | | - |
112 | | - provider = CatalogProvider(catalog) |
113 | | - result = provider.get_primary_keys("test_stream") |
114 | | - |
115 | | - assert result == [] |
116 | | - |
117 | | - def test_get_primary_keys_handles_composite_keys_from_source_defined(self): |
118 | | - """Test that composite primary keys work correctly with source-defined fallback.""" |
119 | 42 | stream = AirbyteStream( |
120 | 43 | name="test_stream", |
121 | 44 | json_schema={ |
122 | 45 | "type": "object", |
123 | | - "properties": {"id1": {"type": "string"}, "id2": {"type": "string"}}, |
| 46 | + "properties": { |
| 47 | + "id": {"type": "string"}, |
| 48 | + "id1": {"type": "string"}, |
| 49 | + "id2": {"type": "string"}, |
| 50 | + }, |
124 | 51 | }, |
125 | 52 | supported_sync_modes=["full_refresh"], |
126 | | - source_defined_primary_key=[["id1"], ["id2"]], # Composite primary key |
127 | | - ) |
128 | | - configured_stream = ConfiguredAirbyteStream( |
129 | | - stream=stream, |
130 | | - sync_mode="full_refresh", |
131 | | - destination_sync_mode="overwrite", |
132 | | - primary_key=[], # Empty configured primary key |
133 | | - ) |
134 | | - catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) |
135 | | - |
136 | | - provider = CatalogProvider(catalog) |
137 | | - result = provider.get_primary_keys("test_stream") |
138 | | - |
139 | | - assert result == ["id1", "id2"] |
140 | | - |
141 | | - def test_get_primary_keys_normalizes_case_for_source_defined(self): |
142 | | - """Test that primary keys from source-defined are normalized to lowercase.""" |
143 | | - stream = AirbyteStream( |
144 | | - name="test_stream", |
145 | | - json_schema={"type": "object", "properties": {"ID": {"type": "string"}}}, |
146 | | - supported_sync_modes=["full_refresh"], |
147 | | - source_defined_primary_key=[["ID"]], # Uppercase primary key |
| 53 | + source_defined_primary_key=source_pk_wrapped, |
148 | 54 | ) |
149 | 55 | configured_stream = ConfiguredAirbyteStream( |
150 | 56 | stream=stream, |
151 | 57 | sync_mode="full_refresh", |
152 | 58 | destination_sync_mode="overwrite", |
153 | | - primary_key=[], # Empty configured primary key |
| 59 | + primary_key=configured_pk_wrapped, |
154 | 60 | ) |
155 | 61 | catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) |
156 | 62 |
|
157 | 63 | provider = CatalogProvider(catalog) |
158 | 64 | result = provider.get_primary_keys("test_stream") |
159 | 65 |
|
160 | | - assert result == ["id"] |
| 66 | + assert result == expected_result |
0 commit comments