From 065e7379ead8c13409d57130b6f38f0e3095ecff Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 27 Jun 2025 22:43:01 +0000 Subject: [PATCH 01/25] fix(sql): Add fallback to source_defined_primary_key in CatalogProvider - Fix CatalogProvider.get_primary_keys() to fall back to stream.source_defined_primary_key when primary_key is empty/None - Add comprehensive unit tests covering all fallback scenarios - Resolves bug where destinations ignore source-defined primary keys when configured primary key is not set - Affects all destinations using CDK SQL processor base classes Co-Authored-By: AJ Steers --- airbyte_cdk/sql/shared/catalog_providers.py | 8 +- .../sql/shared/test_catalog_providers.py | 160 ++++++++++++++++++ 2 files changed, 166 insertions(+), 2 deletions(-) create mode 100644 unit_tests/sql/shared/test_catalog_providers.py diff --git a/airbyte_cdk/sql/shared/catalog_providers.py b/airbyte_cdk/sql/shared/catalog_providers.py index 80713a35a..b13f36b4c 100644 --- a/airbyte_cdk/sql/shared/catalog_providers.py +++ b/airbyte_cdk/sql/shared/catalog_providers.py @@ -121,9 +121,13 @@ def get_primary_keys( stream_name: str, ) -> list[str]: """Return the primary keys for the given stream.""" - pks = self.get_configured_stream_info(stream_name).primary_key + configured_stream = self.get_configured_stream_info(stream_name) + pks = configured_stream.primary_key + if not pks: - return [] + pks = configured_stream.stream.source_defined_primary_key + if not pks: + return [] normalized_pks: list[list[str]] = [ [LowerCaseNormalizer.normalize(c) for c in pk] for pk in pks diff --git a/unit_tests/sql/shared/test_catalog_providers.py b/unit_tests/sql/shared/test_catalog_providers.py new file mode 100644 index 000000000..f353c85fa --- /dev/null +++ b/unit_tests/sql/shared/test_catalog_providers.py @@ -0,0 +1,160 @@ +from unittest.mock import Mock + +import pytest + +from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream +from airbyte_cdk.sql.shared.catalog_providers import CatalogProvider + + +class TestCatalogProvider: + """Test cases for CatalogProvider.get_primary_keys() method.""" + + def test_get_primary_keys_uses_configured_primary_key_when_set(self): + """Test that configured primary_key is used when set.""" + stream = AirbyteStream( + name="test_stream", + json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, + supported_sync_modes=["full_refresh"], + source_defined_primary_key=[["source_id"]], + ) + configured_stream = ConfiguredAirbyteStream( + stream=stream, + sync_mode="full_refresh", + destination_sync_mode="overwrite", + primary_key=[["configured_id"]], + ) + catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) + + provider = CatalogProvider(catalog) + result = provider.get_primary_keys("test_stream") + + assert result == ["configured_id"] + + def test_get_primary_keys_falls_back_to_source_defined_when_configured_empty(self): + """Test that source_defined_primary_key is used when primary_key is empty.""" + stream = AirbyteStream( + name="test_stream", + json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, + supported_sync_modes=["full_refresh"], + source_defined_primary_key=[["source_id"]], + ) + configured_stream = ConfiguredAirbyteStream( + stream=stream, + sync_mode="full_refresh", + destination_sync_mode="overwrite", + primary_key=[], # Empty configured primary key + ) + catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) + + provider = CatalogProvider(catalog) + result = provider.get_primary_keys("test_stream") + + assert result == ["source_id"] + + def test_get_primary_keys_falls_back_to_source_defined_when_configured_none(self): + """Test that source_defined_primary_key is used when primary_key is None.""" + stream = AirbyteStream( + name="test_stream", + json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, + supported_sync_modes=["full_refresh"], + source_defined_primary_key=[["source_id"]], + ) + configured_stream = ConfiguredAirbyteStream( + stream=stream, + sync_mode="full_refresh", + destination_sync_mode="overwrite", + primary_key=None, # None configured primary key + ) + catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) + + provider = CatalogProvider(catalog) + result = provider.get_primary_keys("test_stream") + + assert result == ["source_id"] + + def test_get_primary_keys_returns_empty_when_both_empty(self): + """Test that empty list is returned when both primary keys are empty.""" + stream = AirbyteStream( + name="test_stream", + json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, + supported_sync_modes=["full_refresh"], + source_defined_primary_key=[], # Empty source-defined primary key + ) + configured_stream = ConfiguredAirbyteStream( + stream=stream, + sync_mode="full_refresh", + destination_sync_mode="overwrite", + primary_key=[], # Empty configured primary key + ) + catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) + + provider = CatalogProvider(catalog) + result = provider.get_primary_keys("test_stream") + + assert result == [] + + def test_get_primary_keys_returns_empty_when_both_none(self): + """Test that empty list is returned when both primary keys are None.""" + stream = AirbyteStream( + name="test_stream", + json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, + supported_sync_modes=["full_refresh"], + source_defined_primary_key=None, # None source-defined primary key + ) + configured_stream = ConfiguredAirbyteStream( + stream=stream, + sync_mode="full_refresh", + destination_sync_mode="overwrite", + primary_key=None, # None configured primary key + ) + catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) + + provider = CatalogProvider(catalog) + result = provider.get_primary_keys("test_stream") + + assert result == [] + + def test_get_primary_keys_handles_composite_keys_from_source_defined(self): + """Test that composite primary keys work correctly with source-defined fallback.""" + stream = AirbyteStream( + name="test_stream", + json_schema={ + "type": "object", + "properties": {"id1": {"type": "string"}, "id2": {"type": "string"}}, + }, + supported_sync_modes=["full_refresh"], + source_defined_primary_key=[["id1"], ["id2"]], # Composite primary key + ) + configured_stream = ConfiguredAirbyteStream( + stream=stream, + sync_mode="full_refresh", + destination_sync_mode="overwrite", + primary_key=[], # Empty configured primary key + ) + catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) + + provider = CatalogProvider(catalog) + result = provider.get_primary_keys("test_stream") + + assert result == ["id1", "id2"] + + def test_get_primary_keys_normalizes_case_for_source_defined(self): + """Test that primary keys from source-defined are normalized to lowercase.""" + stream = AirbyteStream( + name="test_stream", + json_schema={"type": "object", "properties": {"ID": {"type": "string"}}}, + supported_sync_modes=["full_refresh"], + source_defined_primary_key=[["ID"]], # Uppercase primary key + ) + configured_stream = ConfiguredAirbyteStream( + stream=stream, + sync_mode="full_refresh", + destination_sync_mode="overwrite", + primary_key=[], # Empty configured primary key + ) + catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) + + provider = CatalogProvider(catalog) + result = provider.get_primary_keys("test_stream") + + assert result == ["id"] From 821848b47ab68320227254f804a9d05062a136c7 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 28 Jun 2025 05:22:15 +0000 Subject: [PATCH 02/25] refactor(sql): Simplify primary key fallback logic with one-liner Address @aaronsteers feedback to use more concise approach: pks = configured_stream.primary_key or configured_stream.stream.source_defined_primary_key or [] This maintains exact same functionality while being more readable. Co-Authored-By: AJ Steers --- airbyte_cdk/sql/shared/catalog_providers.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sql/shared/catalog_providers.py b/airbyte_cdk/sql/shared/catalog_providers.py index b13f36b4c..f1894fa59 100644 --- a/airbyte_cdk/sql/shared/catalog_providers.py +++ b/airbyte_cdk/sql/shared/catalog_providers.py @@ -122,12 +122,11 @@ def get_primary_keys( ) -> list[str]: """Return the primary keys for the given stream.""" configured_stream = self.get_configured_stream_info(stream_name) - pks = configured_stream.primary_key - - if not pks: - pks = configured_stream.stream.source_defined_primary_key - if not pks: - return [] + pks = ( + configured_stream.primary_key + or configured_stream.stream.source_defined_primary_key + or [] + ) normalized_pks: list[list[str]] = [ [LowerCaseNormalizer.normalize(c) for c in pk] for pk in pks From 565aa6403c994f6e8c64bca81751247abc9b9105 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 28 Jun 2025 22:46:21 +0000 Subject: [PATCH 03/25] refactor(test): Parametrize catalog provider tests to reduce duplication - Consolidate 7 individual test methods into 1 parametrized test with 6 scenarios - Use list[str] format for parameters with wrapping logic in test - Remove case normalization test since CatalogProvider doesn't handle normalization - Reduce code from ~150 lines to ~50 lines while maintaining full coverage Co-Authored-By: AJ Steers --- .../sql/shared/test_catalog_providers.py | 164 ++++-------------- 1 file changed, 35 insertions(+), 129 deletions(-) diff --git a/unit_tests/sql/shared/test_catalog_providers.py b/unit_tests/sql/shared/test_catalog_providers.py index f353c85fa..ae2ec6245 100644 --- a/unit_tests/sql/shared/test_catalog_providers.py +++ b/unit_tests/sql/shared/test_catalog_providers.py @@ -9,152 +9,58 @@ class TestCatalogProvider: """Test cases for CatalogProvider.get_primary_keys() method.""" - def test_get_primary_keys_uses_configured_primary_key_when_set(self): - """Test that configured primary_key is used when set.""" - stream = AirbyteStream( - name="test_stream", - json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, - supported_sync_modes=["full_refresh"], - source_defined_primary_key=[["source_id"]], - ) - configured_stream = ConfiguredAirbyteStream( - stream=stream, - sync_mode="full_refresh", - destination_sync_mode="overwrite", - primary_key=[["configured_id"]], - ) - catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) - - provider = CatalogProvider(catalog) - result = provider.get_primary_keys("test_stream") - - assert result == ["configured_id"] - - def test_get_primary_keys_falls_back_to_source_defined_when_configured_empty(self): - """Test that source_defined_primary_key is used when primary_key is empty.""" - stream = AirbyteStream( - name="test_stream", - json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, - supported_sync_modes=["full_refresh"], - source_defined_primary_key=[["source_id"]], - ) - configured_stream = ConfiguredAirbyteStream( - stream=stream, - sync_mode="full_refresh", - destination_sync_mode="overwrite", - primary_key=[], # Empty configured primary key - ) - catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) - - provider = CatalogProvider(catalog) - result = provider.get_primary_keys("test_stream") - - assert result == ["source_id"] - - def test_get_primary_keys_falls_back_to_source_defined_when_configured_none(self): - """Test that source_defined_primary_key is used when primary_key is None.""" - stream = AirbyteStream( - name="test_stream", - json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, - supported_sync_modes=["full_refresh"], - source_defined_primary_key=[["source_id"]], + @pytest.mark.parametrize( + "configured_primary_key,source_defined_primary_key,expected_result,test_description", + [ + (["configured_id"], ["source_id"], ["configured_id"], "uses configured when both set"), + ([], ["source_id"], ["source_id"], "falls back to source when configured empty"), + (None, ["source_id"], ["source_id"], "falls back to source when configured None"), + ([], [], [], "returns empty when both empty"), + (None, None, [], "returns empty when both None"), + ([], ["id1", "id2"], ["id1", "id2"], "handles composite keys from source"), + ], + ) + def test_get_primary_keys_parametrized( + self, configured_primary_key, source_defined_primary_key, expected_result, test_description + ): + """Test primary key fallback logic with various input combinations.""" + configured_pk_wrapped = ( + None + if configured_primary_key is None + else [[pk] for pk in configured_primary_key] + if configured_primary_key + else [] ) - configured_stream = ConfiguredAirbyteStream( - stream=stream, - sync_mode="full_refresh", - destination_sync_mode="overwrite", - primary_key=None, # None configured primary key + source_pk_wrapped = ( + None + if source_defined_primary_key is None + else [[pk] for pk in source_defined_primary_key] + if source_defined_primary_key + else [] ) - catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) - provider = CatalogProvider(catalog) - result = provider.get_primary_keys("test_stream") - - assert result == ["source_id"] - - def test_get_primary_keys_returns_empty_when_both_empty(self): - """Test that empty list is returned when both primary keys are empty.""" - stream = AirbyteStream( - name="test_stream", - json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, - supported_sync_modes=["full_refresh"], - source_defined_primary_key=[], # Empty source-defined primary key - ) - configured_stream = ConfiguredAirbyteStream( - stream=stream, - sync_mode="full_refresh", - destination_sync_mode="overwrite", - primary_key=[], # Empty configured primary key - ) - catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) - - provider = CatalogProvider(catalog) - result = provider.get_primary_keys("test_stream") - - assert result == [] - - def test_get_primary_keys_returns_empty_when_both_none(self): - """Test that empty list is returned when both primary keys are None.""" - stream = AirbyteStream( - name="test_stream", - json_schema={"type": "object", "properties": {"id": {"type": "string"}}}, - supported_sync_modes=["full_refresh"], - source_defined_primary_key=None, # None source-defined primary key - ) - configured_stream = ConfiguredAirbyteStream( - stream=stream, - sync_mode="full_refresh", - destination_sync_mode="overwrite", - primary_key=None, # None configured primary key - ) - catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) - - provider = CatalogProvider(catalog) - result = provider.get_primary_keys("test_stream") - - assert result == [] - - def test_get_primary_keys_handles_composite_keys_from_source_defined(self): - """Test that composite primary keys work correctly with source-defined fallback.""" stream = AirbyteStream( name="test_stream", json_schema={ "type": "object", - "properties": {"id1": {"type": "string"}, "id2": {"type": "string"}}, + "properties": { + "id": {"type": "string"}, + "id1": {"type": "string"}, + "id2": {"type": "string"}, + }, }, supported_sync_modes=["full_refresh"], - source_defined_primary_key=[["id1"], ["id2"]], # Composite primary key - ) - configured_stream = ConfiguredAirbyteStream( - stream=stream, - sync_mode="full_refresh", - destination_sync_mode="overwrite", - primary_key=[], # Empty configured primary key - ) - catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) - - provider = CatalogProvider(catalog) - result = provider.get_primary_keys("test_stream") - - assert result == ["id1", "id2"] - - def test_get_primary_keys_normalizes_case_for_source_defined(self): - """Test that primary keys from source-defined are normalized to lowercase.""" - stream = AirbyteStream( - name="test_stream", - json_schema={"type": "object", "properties": {"ID": {"type": "string"}}}, - supported_sync_modes=["full_refresh"], - source_defined_primary_key=[["ID"]], # Uppercase primary key + source_defined_primary_key=source_pk_wrapped, ) configured_stream = ConfiguredAirbyteStream( stream=stream, sync_mode="full_refresh", destination_sync_mode="overwrite", - primary_key=[], # Empty configured primary key + primary_key=configured_pk_wrapped, ) catalog = ConfiguredAirbyteCatalog(streams=[configured_stream]) provider = CatalogProvider(catalog) result = provider.get_primary_keys("test_stream") - assert result == ["id"] + assert result == expected_result From 07de856fedb8d564b8a2d83a57d9893acad22b33 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 28 Jun 2025 22:54:39 +0000 Subject: [PATCH 04/25] docs(sql): Expand docstring for get_primary_keys to explain fallback behavior - Add explanation that method uses primary_key if set explicitly in configured catalog - Otherwise falls back to source_defined_primary_key if set - Addresses GitHub comment from @aaronsteers Co-Authored-By: AJ Steers --- airbyte_cdk/sql/shared/catalog_providers.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sql/shared/catalog_providers.py b/airbyte_cdk/sql/shared/catalog_providers.py index f1894fa59..e2b46e015 100644 --- a/airbyte_cdk/sql/shared/catalog_providers.py +++ b/airbyte_cdk/sql/shared/catalog_providers.py @@ -120,7 +120,11 @@ def get_primary_keys( self, stream_name: str, ) -> list[str]: - """Return the primary keys for the given stream.""" + """Return the primary keys for the given stream. + + We will use `primary_key` if it is set explicitly in the configured catalog, + otherwise we will fall back to `source_defined_primary_key`, if set. + """ configured_stream = self.get_configured_stream_info(stream_name) pks = ( configured_stream.primary_key From b4aa7df9285963bf719a8bc72374b856acee7328 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 28 Jun 2025 23:05:28 +0000 Subject: [PATCH 05/25] fix(format): Apply ruff formatting to docstring changes - Fix Ruff Format Check CI failure - Apply proper formatting to expanded docstring in get_primary_keys method Co-Authored-By: AJ Steers --- airbyte_cdk/sql/shared/catalog_providers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sql/shared/catalog_providers.py b/airbyte_cdk/sql/shared/catalog_providers.py index e2b46e015..d2d2a44dd 100644 --- a/airbyte_cdk/sql/shared/catalog_providers.py +++ b/airbyte_cdk/sql/shared/catalog_providers.py @@ -121,8 +121,8 @@ def get_primary_keys( stream_name: str, ) -> list[str]: """Return the primary keys for the given stream. - - We will use `primary_key` if it is set explicitly in the configured catalog, + + We will use `primary_key` if it is set explicitly in the configured catalog, otherwise we will fall back to `source_defined_primary_key`, if set. """ configured_stream = self.get_configured_stream_info(stream_name) From 65e8e8767eab4ea9d560b950ab82814a66a0f921 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 28 Jun 2025 23:44:27 +0000 Subject: [PATCH 06/25] feat(sql): Prioritize source_defined_primary_key and return None when unset - Reverse priority order to favor source_defined_primary_key over primary_key - Return None when neither primary key field is set - Update all callers to handle None gracefully with 'pks or []' coalescing - Update unit tests to reflect new behavior and priority order - Addresses @aaronsteers feedback on primary key fallback logic Co-Authored-By: AJ Steers --- airbyte_cdk/sql/shared/catalog_providers.py | 14 +++++------- airbyte_cdk/sql/shared/sql_processor.py | 5 +++-- .../sql/shared/test_catalog_providers.py | 22 ++++++++++++++----- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/airbyte_cdk/sql/shared/catalog_providers.py b/airbyte_cdk/sql/shared/catalog_providers.py index d2d2a44dd..ac7dce7e7 100644 --- a/airbyte_cdk/sql/shared/catalog_providers.py +++ b/airbyte_cdk/sql/shared/catalog_providers.py @@ -119,18 +119,16 @@ def get_stream_properties( def get_primary_keys( self, stream_name: str, - ) -> list[str]: + ) -> list[str] | None: """Return the primary keys for the given stream. - We will use `primary_key` if it is set explicitly in the configured catalog, - otherwise we will fall back to `source_defined_primary_key`, if set. + We return `source_defined_primary_key` if set, or `primary_key` otherwise. If both are set, we assume they should not should differ, since Airbyte data integrity constraints do not permit overruling a source's pre-defined primary keys. If neither is set, we return `None`. """ configured_stream = self.get_configured_stream_info(stream_name) - pks = ( - configured_stream.primary_key - or configured_stream.stream.source_defined_primary_key - or [] - ) + pks = configured_stream.stream.source_defined_primary_key or configured_stream.primary_key + + if not pks: + return None normalized_pks: list[list[str]] = [ [LowerCaseNormalizer.normalize(c) for c in pk] for pk in pks diff --git a/airbyte_cdk/sql/shared/sql_processor.py b/airbyte_cdk/sql/shared/sql_processor.py index a53925206..86a0ac13f 100644 --- a/airbyte_cdk/sql/shared/sql_processor.py +++ b/airbyte_cdk/sql/shared/sql_processor.py @@ -667,7 +667,8 @@ def _merge_temp_table_to_final_table( nl = "\n" columns = {self._quote_identifier(c) for c in self._get_sql_column_definitions(stream_name)} pk_columns = { - self._quote_identifier(c) for c in self.catalog_provider.get_primary_keys(stream_name) + self._quote_identifier(c) + for c in (self.catalog_provider.get_primary_keys(stream_name) or []) } non_pk_columns = columns - pk_columns join_clause = f"{nl} AND ".join(f"tmp.{pk_col} = final.{pk_col}" for pk_col in pk_columns) @@ -724,7 +725,7 @@ def _emulated_merge_temp_table_to_final_table( """ final_table = self._get_table_by_name(final_table_name) temp_table = self._get_table_by_name(temp_table_name) - pk_columns = self.catalog_provider.get_primary_keys(stream_name) + pk_columns = self.catalog_provider.get_primary_keys(stream_name) or [] columns_to_update: set[str] = self._get_sql_column_definitions( stream_name=stream_name diff --git a/unit_tests/sql/shared/test_catalog_providers.py b/unit_tests/sql/shared/test_catalog_providers.py index ae2ec6245..af66ba974 100644 --- a/unit_tests/sql/shared/test_catalog_providers.py +++ b/unit_tests/sql/shared/test_catalog_providers.py @@ -12,11 +12,23 @@ class TestCatalogProvider: @pytest.mark.parametrize( "configured_primary_key,source_defined_primary_key,expected_result,test_description", [ - (["configured_id"], ["source_id"], ["configured_id"], "uses configured when both set"), - ([], ["source_id"], ["source_id"], "falls back to source when configured empty"), - (None, ["source_id"], ["source_id"], "falls back to source when configured None"), - ([], [], [], "returns empty when both empty"), - (None, None, [], "returns empty when both None"), + (["configured_id"], ["source_id"], ["source_id"], "prioritizes source when both set"), + ([], ["source_id"], ["source_id"], "uses source when configured empty"), + (None, ["source_id"], ["source_id"], "uses source when configured None"), + ( + ["configured_id"], + [], + ["configured_id"], + "falls back to configured when source empty", + ), + ( + ["configured_id"], + None, + ["configured_id"], + "falls back to configured when source None", + ), + ([], [], None, "returns None when both empty"), + (None, None, None, "returns None when both None"), ([], ["id1", "id2"], ["id1", "id2"], "handles composite keys from source"), ], ) From be8d8066e99621c0aa7c132f8a374d64170369bb Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 29 Jun 2025 00:03:19 +0000 Subject: [PATCH 07/25] feat(sql): Add guard statements for primary key validation in merge operations - Replace 'or []' coalescing with explicit guard statements in merge methods - Raise AirbyteInternalError when no primary keys available for merge operations - Addresses @aaronsteers feedback on code clarity and error handling - Ensures merge operations fail fast when primary keys are missing Co-Authored-By: AJ Steers --- airbyte_cdk/sql/shared/sql_processor.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sql/shared/sql_processor.py b/airbyte_cdk/sql/shared/sql_processor.py index 86a0ac13f..238ff6c69 100644 --- a/airbyte_cdk/sql/shared/sql_processor.py +++ b/airbyte_cdk/sql/shared/sql_processor.py @@ -666,10 +666,13 @@ def _merge_temp_table_to_final_table( """ nl = "\n" columns = {self._quote_identifier(c) for c in self._get_sql_column_definitions(stream_name)} - pk_columns = { - self._quote_identifier(c) - for c in (self.catalog_provider.get_primary_keys(stream_name) or []) - } + primary_keys = self.catalog_provider.get_primary_keys(stream_name) + if not primary_keys: + raise exc.AirbyteInternalError( + message="Cannot merge tables without primary keys. Primary keys are required for merge operations.", + context={"stream_name": stream_name}, + ) + pk_columns = {self._quote_identifier(c) for c in primary_keys} non_pk_columns = columns - pk_columns join_clause = f"{nl} AND ".join(f"tmp.{pk_col} = final.{pk_col}" for pk_col in pk_columns) set_clause = f"{nl} , ".join(f"{col} = tmp.{col}" for col in non_pk_columns) @@ -725,7 +728,12 @@ def _emulated_merge_temp_table_to_final_table( """ final_table = self._get_table_by_name(final_table_name) temp_table = self._get_table_by_name(temp_table_name) - pk_columns = self.catalog_provider.get_primary_keys(stream_name) or [] + pk_columns = self.catalog_provider.get_primary_keys(stream_name) + if not pk_columns: + raise exc.AirbyteInternalError( + message="Cannot merge tables without primary keys. Primary keys are required for merge operations.", + context={"stream_name": stream_name}, + ) columns_to_update: set[str] = self._get_sql_column_definitions( stream_name=stream_name From 8d1ecaea1a323ab5dd97f0d54011d238def8634c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 29 Jun 2025 00:05:20 +0000 Subject: [PATCH 08/25] fix(format): Break long docstring line to meet line length requirements - Split long docstring line in get_primary_keys method across multiple lines - Addresses @aaronsteers feedback on line length issue in PR comment - Maintains readability while complying with formatting standards Co-Authored-By: AJ Steers --- airbyte_cdk/sql/shared/catalog_providers.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sql/shared/catalog_providers.py b/airbyte_cdk/sql/shared/catalog_providers.py index ac7dce7e7..6014d5f49 100644 --- a/airbyte_cdk/sql/shared/catalog_providers.py +++ b/airbyte_cdk/sql/shared/catalog_providers.py @@ -122,7 +122,9 @@ def get_primary_keys( ) -> list[str] | None: """Return the primary keys for the given stream. - We return `source_defined_primary_key` if set, or `primary_key` otherwise. If both are set, we assume they should not should differ, since Airbyte data integrity constraints do not permit overruling a source's pre-defined primary keys. If neither is set, we return `None`. + We return `source_defined_primary_key` if set, or `primary_key` otherwise. If both are set, + we assume they should not should differ, since Airbyte data integrity constraints do not + permit overruling a source's pre-defined primary keys. If neither is set, we return `None`. """ configured_stream = self.get_configured_stream_info(stream_name) pks = configured_stream.stream.source_defined_primary_key or configured_stream.primary_key From 625cd1ea4661ad5c60f40ed3bac24cc3eb98b7af Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 3 Jul 2025 12:40:19 -0700 Subject: [PATCH 09/25] fix(cherry-pick me): improve messaging for 'could not import module' error --- airbyte_cdk/test/standard_tests/connector_base.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/test/standard_tests/connector_base.py b/airbyte_cdk/test/standard_tests/connector_base.py index 588b7d0bd..b945f1572 100644 --- a/airbyte_cdk/test/standard_tests/connector_base.py +++ b/airbyte_cdk/test/standard_tests/connector_base.py @@ -59,7 +59,16 @@ def connector(cls) -> type[IConnector] | Callable[[], IConnector] | None: try: module = importlib.import_module(expected_module_name) except ModuleNotFoundError as e: - raise ImportError(f"Could not import module '{expected_module_name}'.") from e + raise ImportError( + f"Could not import module '{expected_module_name}'. " + "Please ensure you are running from within the connector's virtual environment, " + "for instance by running `poetry run airbyte-cdk connector test` from the " + "connector directory. If the issue persists, check that the connector " + f"module matches the expected module name '{expected_module_name}' and that the " + f"connector class matches the expected class name '{expected_class_name}'. " + "Alternatively, you can run `airbyte-cdk image test` to run a subset of tests " + "against the connector's image." + ) from e finally: # Change back to the original working directory os.chdir(cwd_snapshot) From 74240ab0c72f89653dc1ea229a50992c1e92ceb3 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 3 Jul 2025 21:14:27 +0000 Subject: [PATCH 10/25] docs(sql): Clarify that get_primary_keys returns column names, not values - Updated docstring to specify 'primary key column names' instead of just 'primary keys' - Added Returns section explaining the method returns column names or None - Addresses @dbgold17's clarifying question about terminology in GitHub comment Co-Authored-By: AJ Steers --- airbyte_cdk/sql/shared/catalog_providers.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sql/shared/catalog_providers.py b/airbyte_cdk/sql/shared/catalog_providers.py index 6014d5f49..5b145f649 100644 --- a/airbyte_cdk/sql/shared/catalog_providers.py +++ b/airbyte_cdk/sql/shared/catalog_providers.py @@ -120,11 +120,14 @@ def get_primary_keys( self, stream_name: str, ) -> list[str] | None: - """Return the primary keys for the given stream. + """Return the primary key column names for the given stream. We return `source_defined_primary_key` if set, or `primary_key` otherwise. If both are set, we assume they should not should differ, since Airbyte data integrity constraints do not permit overruling a source's pre-defined primary keys. If neither is set, we return `None`. + + Returns: + A list of column names that constitute the primary key, or None if no primary key is defined. """ configured_stream = self.get_configured_stream_info(stream_name) pks = configured_stream.stream.source_defined_primary_key or configured_stream.primary_key From 820d9e84ff894910f957e6e0fa0f3eb62a17d476 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 3 Jul 2025 15:44:06 -0700 Subject: [PATCH 11/25] tests: improve fast test outputs, skip 'read' tests for destinations --- airbyte_cdk/test/entrypoint_wrapper.py | 51 ++++++++++ .../test/standard_tests/_job_runner.py | 29 +----- .../test/standard_tests/connector_base.py | 3 +- .../test/standard_tests/docker_base.py | 95 ++++++++----------- airbyte_cdk/utils/docker.py | 95 +++++++++++++------ 5 files changed, 162 insertions(+), 111 deletions(-) diff --git a/airbyte_cdk/test/entrypoint_wrapper.py b/airbyte_cdk/test/entrypoint_wrapper.py index 09a3fc75e..9eb21763d 100644 --- a/airbyte_cdk/test/entrypoint_wrapper.py +++ b/airbyte_cdk/test/entrypoint_wrapper.py @@ -50,6 +50,21 @@ from airbyte_cdk.test.models.scenario import ExpectedOutcome +@dataclass +class AirbyteEntrypointException(Exception): + """Exception raised for errors in the AirbyteEntrypoint execution. + + Used to provide details of an Airbyte connector execution failure in the output + captured in an `EntrypointOutput` object. Use `EntrypointOutput.as_exception()` to + convert it to an exception. + + Example Usage: + output = EntrypointOutput(...) + if output.errors: + raise output.as_exception("An error occurred during the connector execution.") + """ + + class EntrypointOutput: """A class to encapsulate the output of an Airbyte connector's execution. @@ -67,6 +82,7 @@ def __init__( messages: list[str] | None = None, uncaught_exception: Optional[BaseException] = None, *, + command: list[str] | None = None, message_file: Path | None = None, ) -> None: if messages is None and message_file is None: @@ -74,6 +90,7 @@ def __init__( if messages is not None and message_file is not None: raise ValueError("Only one of messages or message_file can be provided") + self._command = command self._messages: list[AirbyteMessage] | None = None self._message_file: Path | None = message_file if messages: @@ -182,6 +199,40 @@ def analytics_messages(self) -> List[AirbyteMessage]: def errors(self) -> List[AirbyteMessage]: return self._get_trace_message_by_trace_type(TraceType.ERROR) + def get_formatted_error_message(self) -> str: + """Returns a human-readable error message with the contents. + + If there are no errors, returns an empty string. + """ + errors = self.errors + if not errors: + # If there are no errors, return an empty string. + return "" + + result = "Failed to run airbyte command" + result += ": " + " ".join(self._command) if self._command else "." + result += "\n" + "\n".join( + [str(error.trace.error).replace("\\n", "\n") for error in errors if error.trace], + ) + return result + + def as_exception(self) -> AirbyteEntrypointException: + """Convert the output to an exception with a custom message. + + This is useful for raising an exception in tests or other scenarios where you want to + provide a specific error message. + """ + return AirbyteEntrypointException(self.get_formatted_error_message()) + + def raise_if_errors( + self, + ) -> None: + """Raise an exception if there are errors in the output.""" + if not self.errors: + return None + + raise self.as_exception() + @property def catalog(self) -> AirbyteMessage: catalog = self.get_message_by_types([Type.CATALOG]) diff --git a/airbyte_cdk/test/standard_tests/_job_runner.py b/airbyte_cdk/test/standard_tests/_job_runner.py index 8f4174b1a..ddd293ffa 100644 --- a/airbyte_cdk/test/standard_tests/_job_runner.py +++ b/airbyte_cdk/test/standard_tests/_job_runner.py @@ -21,26 +21,6 @@ ) -def _errors_to_str( - entrypoint_output: entrypoint_wrapper.EntrypointOutput, -) -> str: - """Convert errors from entrypoint output to a string.""" - if not entrypoint_output.errors: - # If there are no errors, return an empty string. - return "" - - return "\n" + "\n".join( - [ - str(error.trace.error).replace( - "\\n", - "\n", - ) - for error in entrypoint_output.errors - if error.trace - ], - ) - - @runtime_checkable class IConnector(Protocol): """A connector that can be run in a test scenario. @@ -125,9 +105,7 @@ def run_test_job( expected_outcome=test_scenario.expected_outcome, ) if result.errors and test_scenario.expected_outcome.expect_success(): - raise AssertionError( - f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result) - ) + raise result.as_exception() if verb == "check": # Check is expected to fail gracefully without an exception. @@ -137,7 +115,7 @@ def run_test_job( "Expected exactly one CONNECTION_STATUS message. Got " f"{len(result.connection_status_messages)}:\n" + "\n".join([str(msg) for msg in result.connection_status_messages]) - + _errors_to_str(result) + + result.get_formatted_error_message() ) if test_scenario.expected_outcome.expect_exception(): conn_status = result.connection_status_messages[0].connectionStatus @@ -161,7 +139,8 @@ def run_test_job( if test_scenario.expected_outcome.expect_success(): assert not result.errors, ( - f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result) + f"Test job failed with {len(result.errors)} error(s): \n" + + result.get_formatted_error_message() ) return result diff --git a/airbyte_cdk/test/standard_tests/connector_base.py b/airbyte_cdk/test/standard_tests/connector_base.py index b945f1572..376b7c7fe 100644 --- a/airbyte_cdk/test/standard_tests/connector_base.py +++ b/airbyte_cdk/test/standard_tests/connector_base.py @@ -44,8 +44,7 @@ def connector(cls) -> type[IConnector] | Callable[[], IConnector] | None: This assumes a python connector and should be overridden by subclasses to provide the specific connector class to be tested. """ - connector_root = cls.get_connector_root_dir() - connector_name = connector_root.absolute().name + connector_name = cls.connector_name expected_module_name = connector_name.replace("-", "_").lower() expected_class_name = connector_name.replace("-", "_").title().replace("_", "") diff --git a/airbyte_cdk/test/standard_tests/docker_base.py b/airbyte_cdk/test/standard_tests/docker_base.py index c3ee1f060..de8c9df71 100644 --- a/airbyte_cdk/test/standard_tests/docker_base.py +++ b/airbyte_cdk/test/standard_tests/docker_base.py @@ -25,19 +25,18 @@ DestinationSyncMode, SyncMode, ) -from airbyte_cdk.models.airbyte_protocol_serializers import ( - AirbyteCatalogSerializer, - AirbyteStreamSerializer, -) from airbyte_cdk.models.connector_metadata import MetadataFile from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput from airbyte_cdk.test.models import ConnectorTestScenario -from airbyte_cdk.test.utils.reading import catalog from airbyte_cdk.utils.connector_paths import ( ACCEPTANCE_TEST_CONFIG, find_connector_root, ) -from airbyte_cdk.utils.docker import build_connector_image, run_docker_command +from airbyte_cdk.utils.docker import ( + build_connector_image, + run_docker_airbyte_command, + run_docker_command, +) class DockerConnectorTestSuite: @@ -55,6 +54,17 @@ def get_connector_root_dir(cls) -> Path: """Get the root directory of the connector.""" return find_connector_root([cls.get_test_class_dir(), Path.cwd()]) + @classproperty + def connector_name(self) -> str: + """Get the name of the connector.""" + connector_root = self.get_connector_root_dir() + return connector_root.absolute().name + + @classmethod + def is_destination_connector(cls) -> bool: + """Check if the connector is a destination.""" + return cls.connector_name.startswith("destination-") + @classproperty def acceptance_test_config_path(cls) -> Path: """Get the path to the acceptance test config file.""" @@ -145,23 +155,16 @@ def test_docker_image_build_and_spec( no_verify=False, ) - try: - result: CompletedProcess[str] = run_docker_command( - [ - "docker", - "run", - "--rm", - connector_image, - "spec", - ], - check=True, # Raise an error if the command fails - capture_stderr=True, - capture_stdout=True, - ) - except SubprocessError as ex: - raise AssertionError( - f"Failed to run `spec` command in docker image {connector_image!r}. Error: {ex!s}" - ) from None + _ = run_docker_airbyte_command( + [ + "docker", + "run", + "--rm", + connector_image, + "spec", + ], + check=True, + ) @pytest.mark.skipif( shutil.which("docker") is None, @@ -203,7 +206,7 @@ def test_docker_image_build_and_check( with scenario.with_temp_config_file( connector_root=connector_root, ) as temp_config_file: - _ = run_docker_command( + _ = run_docker_airbyte_command( [ "docker", "run", @@ -215,9 +218,7 @@ def test_docker_image_build_and_check( "--config", container_config_path, ], - check=True, # Raise an error if the command fails - capture_stderr=True, - capture_stdout=True, + check=True, ) @pytest.mark.skipif( @@ -242,6 +243,9 @@ def test_docker_image_build_and_read( the local docker image cache using `docker image prune -a` command. - If the --connector-image arg is provided, it will be used instead of building the image. """ + if self.is_destination_connector(): + pytest.skip("Skipping read test for destination connector.") + if scenario.expected_outcome.expect_exception(): pytest.skip("Skipping (expected to fail).") @@ -295,7 +299,7 @@ def test_docker_image_build_and_read( ) as temp_dir_str, ): temp_dir = Path(temp_dir_str) - discover_result = run_docker_command( + discover_result = run_docker_airbyte_command( [ "docker", "run", @@ -307,20 +311,12 @@ def test_docker_image_build_and_read( "--config", container_config_path, ], - check=True, # Raise an error if the command fails - capture_stderr=True, - capture_stdout=True, + check=True, ) - parsed_output = EntrypointOutput(messages=discover_result.stdout.splitlines()) - try: - catalog_message = parsed_output.catalog # Get catalog message - assert catalog_message.catalog is not None, "Catalog message missing catalog." - discovered_catalog: AirbyteCatalog = parsed_output.catalog.catalog - except Exception as ex: - raise AssertionError( - f"Failed to load discovered catalog from {discover_result.stdout}. " - f"Error: {ex!s}" - ) from None + + catalog_message = discover_result.catalog # Get catalog message + assert catalog_message.catalog is not None, "Catalog message missing catalog." + discovered_catalog: AirbyteCatalog = catalog_message.catalog if not discovered_catalog.streams: raise ValueError( f"Discovered catalog for connector '{connector_name}' is empty. " @@ -355,7 +351,7 @@ def test_docker_image_build_and_read( configured_catalog_path.write_text( orjson.dumps(asdict(configured_catalog)).decode("utf-8") ) - read_result: CompletedProcess[str] = run_docker_command( + read_result: EntrypointOutput = run_docker_airbyte_command( [ "docker", "run", @@ -371,18 +367,5 @@ def test_docker_image_build_and_read( "--catalog", container_catalog_path, ], - check=False, - capture_stderr=True, - capture_stdout=True, + check=True, ) - if read_result.returncode != 0: - raise AssertionError( - f"Failed to run `read` command in docker image {connector_image!r}. " - "\n-----------------" - f"EXIT CODE: {read_result.returncode}\n" - "STDERR:\n" - f"{read_result.stderr}\n" - f"STDOUT:\n" - f"{read_result.stdout}\n" - "\n-----------------" - ) from None diff --git a/airbyte_cdk/utils/docker.py b/airbyte_cdk/utils/docker.py index 88050d99f..591921d41 100644 --- a/airbyte_cdk/utils/docker.py +++ b/airbyte_cdk/utils/docker.py @@ -17,6 +17,8 @@ import requests from airbyte_cdk.models.connector_metadata import ConnectorLanguage, MetadataFile +from airbyte_cdk.sources.declarative import spec +from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput from airbyte_cdk.utils.connector_paths import resolve_airbyte_repo_root @@ -394,6 +396,10 @@ def run_docker_command( ) -> subprocess.CompletedProcess[str]: """Run a Docker command as a subprocess. + Note: When running Airbyte verbs such as `spec`, `discover`, `read`, etc., + use `run_docker_airbyte_command` instead to get an `EntrypointOutput` object as + the return value and to better handle exceptions sent as messages. + Args: cmd: The command to run as a list of strings. check: If True, raises an exception if the command fails. If False, the caller is @@ -439,6 +445,52 @@ def run_docker_command( return completed_process +def run_docker_airbyte_command( + cmd: list[str], + *, + check: bool = False, +) -> EntrypointOutput: + """Run an Airbyte command inside a Docker container. + + This wraps the `run_docker_command` function to process its results and + return an `EntrypointOutput` object. + + Args: + cmd: The command to run as a list of strings. + check: If True, raises an exception if the command fails. If False, the caller is + responsible for checking the for errors. + + Returns: + The output of the command as an `EntrypointOutput` object. + """ + process_result = run_docker_command( + cmd, + capture_stdout=True, + capture_stderr=True, + check=False, # We want to handle failures ourselves. + ) + result_output = EntrypointOutput( + command=cmd, + messages=process_result.stdout.splitlines(), + uncaught_exception=( + subprocess.CalledProcessError( + cmd=cmd, + returncode=process_result.returncode, + output=process_result.stdout, + stderr=process_result.stderr, + ) + if process_result.returncode != 0 + else None + ), + ) + if check: + # If check is True, we raise an exception if there are errors. + # This will do nothing if there are no errors. + result_output.raise_if_errors() + + return result_output + + def verify_docker_installation() -> bool: """Verify Docker is installed and running.""" try: @@ -461,37 +513,24 @@ def verify_connector_image( """ logger.info(f"Verifying image {image_name} with 'spec' command...") - cmd = ["docker", "run", "--rm", image_name, "spec"] - try: - result = run_docker_command( - cmd, - check=True, - capture_stderr=True, - capture_stdout=True, + result = run_docker_airbyte_command( + ["docker", "run", "--rm", image_name, "spec"], ) - # check that the output is valid JSON - if result.stdout: - found_spec_output = False - for line in result.stdout.split("\n"): - if line.strip(): - try: - # Check if the line is a valid JSON object - msg = json.loads(line) - if isinstance(msg, dict) and "type" in msg and msg["type"] == "SPEC": - found_spec_output = True - - except json.JSONDecodeError as e: - logger.warning(f"Invalid JSON output from spec command: {e}: {line}") - - if not found_spec_output: - logger.error("No valid JSON output found for spec command.") - return False - else: - logger.error("No output from spec command.") + if not result.errors: + logger.error(result.get_formatted_error_message()) return False - except subprocess.CalledProcessError as e: - logger.error(f"Image verification failed: {e.stderr}") + + spec_messages = result.spec_messages + if not spec_messages: + logger.error( + "The container failed to produce valid output for the `spec` command.\nLog output:\n" + + str(result.logs) + ) + return False + + except Exception as ex: + logger.error(f"Unexpected error during image verification: {ex}") return False return True From efd9d7dba3d37ce00778721cd6f19c0b317d00c4 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Thu, 3 Jul 2025 15:48:37 -0700 Subject: [PATCH 12/25] Apply suggestions from code review --- airbyte_cdk/test/entrypoint_wrapper.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/airbyte_cdk/test/entrypoint_wrapper.py b/airbyte_cdk/test/entrypoint_wrapper.py index 9eb21763d..7fdb52d0a 100644 --- a/airbyte_cdk/test/entrypoint_wrapper.py +++ b/airbyte_cdk/test/entrypoint_wrapper.py @@ -61,7 +61,7 @@ class AirbyteEntrypointException(Exception): Example Usage: output = EntrypointOutput(...) if output.errors: - raise output.as_exception("An error occurred during the connector execution.") + raise output.as_exception() """ @@ -217,17 +217,16 @@ def get_formatted_error_message(self) -> str: return result def as_exception(self) -> AirbyteEntrypointException: - """Convert the output to an exception with a custom message. - - This is useful for raising an exception in tests or other scenarios where you want to - provide a specific error message. - """ + """Convert the output to an exception.""" return AirbyteEntrypointException(self.get_formatted_error_message()) def raise_if_errors( self, ) -> None: - """Raise an exception if there are errors in the output.""" + """Raise an exception if there are errors in the output. + + Otherwise, do nothing. + """ if not self.errors: return None From aefed0ddf52a5b361779bacc61df1c47ca5c5077 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 3 Jul 2025 16:43:24 -0700 Subject: [PATCH 13/25] fix missing import --- airbyte_cdk/test/entrypoint_wrapper.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte_cdk/test/entrypoint_wrapper.py b/airbyte_cdk/test/entrypoint_wrapper.py index 7fdb52d0a..c6e4dd9e4 100644 --- a/airbyte_cdk/test/entrypoint_wrapper.py +++ b/airbyte_cdk/test/entrypoint_wrapper.py @@ -21,6 +21,7 @@ import traceback from collections import deque from collections.abc import Generator, Mapping +from dataclasses import dataclass from io import StringIO from pathlib import Path from typing import Any, List, Literal, Optional, Union, final, overload From 0839137ffb24ecc2a7967a31bb32a2f5bac9fdc5 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 3 Jul 2025 16:45:53 -0700 Subject: [PATCH 14/25] apply suggestion: rename to 'raise_if_errors' --- airbyte_cdk/test/standard_tests/docker_base.py | 8 ++++---- airbyte_cdk/utils/docker.py | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/airbyte_cdk/test/standard_tests/docker_base.py b/airbyte_cdk/test/standard_tests/docker_base.py index de8c9df71..332879e64 100644 --- a/airbyte_cdk/test/standard_tests/docker_base.py +++ b/airbyte_cdk/test/standard_tests/docker_base.py @@ -163,7 +163,7 @@ def test_docker_image_build_and_spec( connector_image, "spec", ], - check=True, + raise_if_errors=True, ) @pytest.mark.skipif( @@ -218,7 +218,7 @@ def test_docker_image_build_and_check( "--config", container_config_path, ], - check=True, + raise_if_errors=True, ) @pytest.mark.skipif( @@ -311,7 +311,7 @@ def test_docker_image_build_and_read( "--config", container_config_path, ], - check=True, + raise_if_errors=True, ) catalog_message = discover_result.catalog # Get catalog message @@ -367,5 +367,5 @@ def test_docker_image_build_and_read( "--catalog", container_catalog_path, ], - check=True, + raise_if_errors=True, ) diff --git a/airbyte_cdk/utils/docker.py b/airbyte_cdk/utils/docker.py index 591921d41..13db9358b 100644 --- a/airbyte_cdk/utils/docker.py +++ b/airbyte_cdk/utils/docker.py @@ -448,7 +448,7 @@ def run_docker_command( def run_docker_airbyte_command( cmd: list[str], *, - check: bool = False, + raise_if_errors: bool = False, ) -> EntrypointOutput: """Run an Airbyte command inside a Docker container. @@ -457,7 +457,7 @@ def run_docker_airbyte_command( Args: cmd: The command to run as a list of strings. - check: If True, raises an exception if the command fails. If False, the caller is + raise_if_errors: If True, raises an exception if the command fails. If False, the caller is responsible for checking the for errors. Returns: @@ -483,7 +483,7 @@ def run_docker_airbyte_command( else None ), ) - if check: + if raise_if_errors: # If check is True, we raise an exception if there are errors. # This will do nothing if there are no errors. result_output.raise_if_errors() From 3d14724c03a8b66c1a7e7637c20d8f9eae44d2d7 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 3 Jul 2025 16:48:30 -0700 Subject: [PATCH 15/25] renames (round two) --- airbyte_cdk/utils/docker.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/utils/docker.py b/airbyte_cdk/utils/docker.py index 13db9358b..1059c5e18 100644 --- a/airbyte_cdk/utils/docker.py +++ b/airbyte_cdk/utils/docker.py @@ -96,7 +96,7 @@ def _build_image( try: run_docker_command( docker_args, - check=True, + raise_if_errors=True, capture_stderr=True, ) except subprocess.CalledProcessError as e: @@ -133,7 +133,7 @@ def _tag_image( try: run_docker_command( docker_args, - check=True, + raise_if_errors=True, capture_stderr=True, ) except subprocess.CalledProcessError as e: @@ -390,7 +390,7 @@ def get_dockerfile_templates( def run_docker_command( cmd: list[str], *, - check: bool = True, + raise_if_errors: bool = True, capture_stdout: bool | Path = False, capture_stderr: bool | Path = False, ) -> subprocess.CompletedProcess[str]: @@ -438,7 +438,7 @@ def run_docker_command( completed_process: subprocess.CompletedProcess[str] = subprocess.run( cmd, text=True, - check=check, + check=raise_if_errors, stderr=stderr, stdout=stdout, ) @@ -467,7 +467,7 @@ def run_docker_airbyte_command( cmd, capture_stdout=True, capture_stderr=True, - check=False, # We want to handle failures ourselves. + raise_if_errors=False, # We want to handle failures ourselves. ) result_output = EntrypointOutput( command=cmd, From 623723fdc8b6de67383184b630ad4eccd1b0cb2e Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 3 Jul 2025 16:55:47 -0700 Subject: [PATCH 16/25] fix missing declaration --- airbyte_cdk/test/standard_tests/connector_base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte_cdk/test/standard_tests/connector_base.py b/airbyte_cdk/test/standard_tests/connector_base.py index 376b7c7fe..3c87281c9 100644 --- a/airbyte_cdk/test/standard_tests/connector_base.py +++ b/airbyte_cdk/test/standard_tests/connector_base.py @@ -44,6 +44,7 @@ def connector(cls) -> type[IConnector] | Callable[[], IConnector] | None: This assumes a python connector and should be overridden by subclasses to provide the specific connector class to be tested. """ + connector_root = cls.get_connector_root_dir() connector_name = cls.connector_name expected_module_name = connector_name.replace("-", "_").lower() From 4a23c353c7df8d981bd7be9fafe358bab2461e6b Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 3 Jul 2025 17:18:12 -0700 Subject: [PATCH 17/25] fix lint issue --- airbyte_cdk/test/standard_tests/docker_base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/test/standard_tests/docker_base.py b/airbyte_cdk/test/standard_tests/docker_base.py index 332879e64..68bb4cf11 100644 --- a/airbyte_cdk/test/standard_tests/docker_base.py +++ b/airbyte_cdk/test/standard_tests/docker_base.py @@ -11,7 +11,7 @@ from dataclasses import asdict from pathlib import Path from subprocess import CompletedProcess, SubprocessError -from typing import Literal +from typing import Literal, cast import orjson import pytest @@ -63,7 +63,7 @@ def connector_name(self) -> str: @classmethod def is_destination_connector(cls) -> bool: """Check if the connector is a destination.""" - return cls.connector_name.startswith("destination-") + return cast(str, cls.connector_name).startswith("destination-") @classproperty def acceptance_test_config_path(cls) -> Path: From 6966f6dfeb2ebbbc15d8c112795a437429893fca Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 3 Jul 2025 17:41:18 -0700 Subject: [PATCH 18/25] fix verify, improve message output --- airbyte_cdk/utils/docker.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/airbyte_cdk/utils/docker.py b/airbyte_cdk/utils/docker.py index 1059c5e18..182715609 100644 --- a/airbyte_cdk/utils/docker.py +++ b/airbyte_cdk/utils/docker.py @@ -280,11 +280,14 @@ def build_connector_image( new_tags=[base_tag], ) if not no_verify: - if verify_connector_image(base_tag): + success, error_message = verify_connector_image(base_tag) + if success: click.echo(f"Build and verification completed successfully: {base_tag}") return base_tag - click.echo(f"Built image failed verification: {base_tag}", err=True) + click.echo( + f"Built image failed verification: {base_tag}\nError was:{error_message}", err=True + ) sys.exit(1) click.echo(f"Build completed successfully: {base_tag}") @@ -502,7 +505,7 @@ def verify_docker_installation() -> bool: def verify_connector_image( image_name: str, -) -> bool: +) -> tuple[bool, str]: """Verify the built image by running the spec command. Args: @@ -517,20 +520,23 @@ def verify_connector_image( result = run_docker_airbyte_command( ["docker", "run", "--rm", image_name, "spec"], ) - if not result.errors: - logger.error(result.get_formatted_error_message()) - return False + if result.errors: + err_msg = result.get_formatted_error_message() + logger.error(err_msg) + return False, err_msg spec_messages = result.spec_messages if not spec_messages: - logger.error( + err_msg = ( "The container failed to produce valid output for the `spec` command.\nLog output:\n" + str(result.logs) ) - return False + logger.error(err_msg) + return False, err_msg except Exception as ex: - logger.error(f"Unexpected error during image verification: {ex}") - return False + err_msg = f"Unexpected error during image verification: {ex}" + logger.error(err_msg) + return False, err_msg - return True + return True, "" From 51e2ad738b3688a96ebdf5cda9f27c542ffcbc97 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 3 Jul 2025 17:50:05 -0700 Subject: [PATCH 19/25] more verbose format output --- .github/workflows/python_lint.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/python_lint.yml b/.github/workflows/python_lint.yml index e1d6647d2..cef83ef04 100644 --- a/.github/workflows/python_lint.yml +++ b/.github/workflows/python_lint.yml @@ -26,7 +26,7 @@ jobs: - name: Install dependencies run: poetry install --all-extras - # Job-specifc step(s): + # Job-specific step(s): - name: Run lint check run: poetry run ruff check . @@ -49,9 +49,9 @@ jobs: - name: Install dependencies run: poetry install --all-extras - # Job-specifc step(s): + # Job-specific step(s): - name: Check code format - run: poetry run ruff format --check . + run: poetry run ruff format --diff . mypy-check: name: MyPy Check From 10f39d2eb1b22ece0aeba39de595e9da243d182d Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 3 Jul 2025 17:52:55 -0700 Subject: [PATCH 20/25] remove extra space --- airbyte_cdk/sql/shared/catalog_providers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sql/shared/catalog_providers.py b/airbyte_cdk/sql/shared/catalog_providers.py index 5b145f649..d9016a37d 100644 --- a/airbyte_cdk/sql/shared/catalog_providers.py +++ b/airbyte_cdk/sql/shared/catalog_providers.py @@ -125,7 +125,7 @@ def get_primary_keys( We return `source_defined_primary_key` if set, or `primary_key` otherwise. If both are set, we assume they should not should differ, since Airbyte data integrity constraints do not permit overruling a source's pre-defined primary keys. If neither is set, we return `None`. - + Returns: A list of column names that constitute the primary key, or None if no primary key is defined. """ From 66df8380d7c2d34c99725994207aa6c96969fa0c Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Thu, 3 Jul 2025 18:11:02 -0700 Subject: [PATCH 21/25] always test destination-motherduck --- .github/workflows/connector-tests.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/connector-tests.yml b/.github/workflows/connector-tests.yml index d8f31af09..767338149 100644 --- a/.github/workflows/connector-tests.yml +++ b/.github/workflows/connector-tests.yml @@ -75,7 +75,8 @@ jobs: - connector: source-google-drive cdk_extra: file-based - connector: destination-motherduck - cdk_extra: sql + # For now, we mark as 'n/a' to always test this connector + cdk_extra: n/a # change to 'sql' to test less often - connector: source-amplitude cdk_extra: n/a - connector: source-intercom From ff9e8427aa753543fc4b43a3ffa29238380264b7 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 3 Jul 2025 18:45:45 -0700 Subject: [PATCH 22/25] use proper destination test suite class --- airbyte_cdk/cli/airbyte_cdk/_connector.py | 12 ++++---- airbyte_cdk/test/standard_tests/util.py | 34 +++++++++++------------ 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/airbyte_cdk/cli/airbyte_cdk/_connector.py b/airbyte_cdk/cli/airbyte_cdk/_connector.py index 0d3240ab4..2042e0173 100644 --- a/airbyte_cdk/cli/airbyte_cdk/_connector.py +++ b/airbyte_cdk/cli/airbyte_cdk/_connector.py @@ -67,7 +67,7 @@ TEST_FILE_TEMPLATE = ''' # Copyright (c) 2025 Airbyte, Inc., all rights reserved. -"""FAST Airbyte Standard Tests for the {connector_name} source.""" +"""FAST Airbyte Standard Tests for the {connector_name} connector.""" #from airbyte_cdk.test.standard_tests import {base_class_name} from airbyte_cdk.test.standard_tests.util import create_connector_test_suite @@ -81,11 +81,13 @@ connector_directory=Path(), ) +# Uncomment the following lines to create a custom test suite class: +# # class TestSuite({base_class_name}): -# """Test suite for the {connector_name} source. - -# This class inherits from SourceTestSuiteBase and implements all of the tests in the suite. - +# """Test suite for the `{connector_name}` connector. +# +# This class inherits from `{base_class_name}` and implements all of the tests in the suite. +# # As long as the class name starts with "Test", pytest will automatically discover and run the # tests in this class. # """ diff --git a/airbyte_cdk/test/standard_tests/util.py b/airbyte_cdk/test/standard_tests/util.py index cb65c5260..1e8ea04de 100644 --- a/airbyte_cdk/test/standard_tests/util.py +++ b/airbyte_cdk/test/standard_tests/util.py @@ -10,6 +10,7 @@ from airbyte_cdk.test.standard_tests.declarative_sources import ( DeclarativeSourceTestSuite, ) +from airbyte_cdk.test.standard_tests.destination_base import DestinationTestSuiteBase from airbyte_cdk.test.standard_tests.docker_base import DockerConnectorTestSuite from airbyte_cdk.test.standard_tests.source_base import SourceTestSuiteBase from airbyte_cdk.utils.connector_paths import ( @@ -17,15 +18,6 @@ find_connector_root_from_name, ) -TEST_CLASS_MAPPING: dict[ - Literal["python", "manifest-only", "java"], - type[DockerConnectorTestSuite], -] = { - "python": SourceTestSuiteBase, - "manifest-only": DeclarativeSourceTestSuite, - "java": DockerConnectorTestSuite, -} - def create_connector_test_suite( *, @@ -55,17 +47,25 @@ def create_connector_test_suite( ) metadata_dict: dict[str, Any] = yaml.safe_load(metadata_yaml_path.read_text()) metadata_tags = metadata_dict["data"].get("tags", []) - for language_option in TEST_CLASS_MAPPING: - if f"language:{language_option}" in metadata_tags: - language = language_option - test_suite_class = TEST_CLASS_MAPPING[language] - break - else: + language_tag = next((tag for tag in metadata_tags if tag.startswith("language:")), None) + if not language_tag: raise ValueError( - f"Unsupported connector type. " - f"Supported language values are: {', '.join(TEST_CLASS_MAPPING.keys())}. " + f"Metadata YAML file '{metadata_yaml_path}' does not contain a 'language' tag. " + "Please ensure the metadata file is correctly configured." f"Found tags: {', '.join(metadata_tags)}" ) + language = language_tag.split(":", 1)[1] + + if language == "java": + test_suite_class = DockerConnectorTestSuite + elif language == "manifest-only": + test_suite_class = DeclarativeSourceTestSuite + elif language == "python" and connector_name.startswith("source-"): + test_suite_class = SourceTestSuiteBase + elif language == "python" and connector_name.startswith("destination-"): + test_suite_class = DestinationTestSuiteBase + else: + raise ValueError("Unsupported language for connector '{connector_name}': {language}") subclass_overrides: dict[str, Any] = { "get_connector_root_dir": classmethod(lambda cls: connector_directory), From b0f28b7958f15ec4664adb94d41dc61e7e62487b Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 3 Jul 2025 18:46:26 -0700 Subject: [PATCH 23/25] remove unused import --- airbyte_cdk/utils/docker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte_cdk/utils/docker.py b/airbyte_cdk/utils/docker.py index 182715609..24467ff3c 100644 --- a/airbyte_cdk/utils/docker.py +++ b/airbyte_cdk/utils/docker.py @@ -17,7 +17,6 @@ import requests from airbyte_cdk.models.connector_metadata import ConnectorLanguage, MetadataFile -from airbyte_cdk.sources.declarative import spec from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput from airbyte_cdk.utils.connector_paths import resolve_airbyte_repo_root From 0d8c317b19f647afce4617c33c8921ee57468534 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 3 Jul 2025 18:51:42 -0700 Subject: [PATCH 24/25] fix language tags resolution --- airbyte_cdk/test/standard_tests/util.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/test/standard_tests/util.py b/airbyte_cdk/test/standard_tests/util.py index 1e8ea04de..0a3250834 100644 --- a/airbyte_cdk/test/standard_tests/util.py +++ b/airbyte_cdk/test/standard_tests/util.py @@ -47,14 +47,14 @@ def create_connector_test_suite( ) metadata_dict: dict[str, Any] = yaml.safe_load(metadata_yaml_path.read_text()) metadata_tags = metadata_dict["data"].get("tags", []) - language_tag = next((tag for tag in metadata_tags if tag.startswith("language:")), None) - if not language_tag: + language_tags: list[str] = [tag for tag in metadata_tags if tag.startswith("language:")] + if not language_tags: raise ValueError( f"Metadata YAML file '{metadata_yaml_path}' does not contain a 'language' tag. " "Please ensure the metadata file is correctly configured." f"Found tags: {', '.join(metadata_tags)}" ) - language = language_tag.split(":", 1)[1] + language = language_tags[0].split(":")[1] if language == "java": test_suite_class = DockerConnectorTestSuite @@ -65,7 +65,7 @@ def create_connector_test_suite( elif language == "python" and connector_name.startswith("destination-"): test_suite_class = DestinationTestSuiteBase else: - raise ValueError("Unsupported language for connector '{connector_name}': {language}") + raise ValueError(f"Unsupported language for connector '{connector_name}': {language}") subclass_overrides: dict[str, Any] = { "get_connector_root_dir": classmethod(lambda cls: connector_directory), From 0dd3cd9a626babaf3c4c229856b39ff46bcad969 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Thu, 3 Jul 2025 18:55:13 -0700 Subject: [PATCH 25/25] use absolute path to detect name --- airbyte_cdk/test/standard_tests/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/test/standard_tests/util.py b/airbyte_cdk/test/standard_tests/util.py index 0a3250834..fdbd02263 100644 --- a/airbyte_cdk/test/standard_tests/util.py +++ b/airbyte_cdk/test/standard_tests/util.py @@ -38,7 +38,7 @@ def create_connector_test_suite( # By here, we know that connector_directory is not None # but connector_name is None. Set the connector_name. assert connector_directory is not None, "connector_directory should not be None here." - connector_name = connector_directory.name + connector_name = connector_directory.absolute().name metadata_yaml_path = connector_directory / METADATA_YAML if not metadata_yaml_path.exists():