diff --git a/.github/workflows/connector-tests.yml b/.github/workflows/connector-tests.yml index 7fcc06a65..2eda73dda 100644 --- a/.github/workflows/connector-tests.yml +++ b/.github/workflows/connector-tests.yml @@ -75,7 +75,8 @@ jobs: - connector: source-google-drive cdk_extra: file-based - connector: destination-motherduck - cdk_extra: sql + # For now, we mark as 'n/a' to always test this connector + cdk_extra: n/a # change to 'sql' to test less often # source-amplitude failing for unrelated issue "date too far back" # e.g. https://github.com/airbytehq/airbyte-python-cdk/actions/runs/16053716569/job/45302638848?pr=639 # - connector: source-amplitude diff --git a/.github/workflows/python_lint.yml b/.github/workflows/python_lint.yml index e1d6647d2..cef83ef04 100644 --- a/.github/workflows/python_lint.yml +++ b/.github/workflows/python_lint.yml @@ -26,7 +26,7 @@ jobs: - name: Install dependencies run: poetry install --all-extras - # Job-specifc step(s): + # Job-specific step(s): - name: Run lint check run: poetry run ruff check . @@ -49,9 +49,9 @@ jobs: - name: Install dependencies run: poetry install --all-extras - # Job-specifc step(s): + # Job-specific step(s): - name: Check code format - run: poetry run ruff format --check . + run: poetry run ruff format --diff . mypy-check: name: MyPy Check diff --git a/airbyte_cdk/cli/airbyte_cdk/_connector.py b/airbyte_cdk/cli/airbyte_cdk/_connector.py index 0d3240ab4..2042e0173 100644 --- a/airbyte_cdk/cli/airbyte_cdk/_connector.py +++ b/airbyte_cdk/cli/airbyte_cdk/_connector.py @@ -67,7 +67,7 @@ TEST_FILE_TEMPLATE = ''' # Copyright (c) 2025 Airbyte, Inc., all rights reserved. -"""FAST Airbyte Standard Tests for the {connector_name} source.""" +"""FAST Airbyte Standard Tests for the {connector_name} connector.""" #from airbyte_cdk.test.standard_tests import {base_class_name} from airbyte_cdk.test.standard_tests.util import create_connector_test_suite @@ -81,11 +81,13 @@ connector_directory=Path(), ) +# Uncomment the following lines to create a custom test suite class: +# # class TestSuite({base_class_name}): -# """Test suite for the {connector_name} source. - -# This class inherits from SourceTestSuiteBase and implements all of the tests in the suite. - +# """Test suite for the `{connector_name}` connector. +# +# This class inherits from `{base_class_name}` and implements all of the tests in the suite. +# # As long as the class name starts with "Test", pytest will automatically discover and run the # tests in this class. # """ diff --git a/airbyte_cdk/test/entrypoint_wrapper.py b/airbyte_cdk/test/entrypoint_wrapper.py index 09a3fc75e..c6e4dd9e4 100644 --- a/airbyte_cdk/test/entrypoint_wrapper.py +++ b/airbyte_cdk/test/entrypoint_wrapper.py @@ -21,6 +21,7 @@ import traceback from collections import deque from collections.abc import Generator, Mapping +from dataclasses import dataclass from io import StringIO from pathlib import Path from typing import Any, List, Literal, Optional, Union, final, overload @@ -50,6 +51,21 @@ from airbyte_cdk.test.models.scenario import ExpectedOutcome +@dataclass +class AirbyteEntrypointException(Exception): + """Exception raised for errors in the AirbyteEntrypoint execution. + + Used to provide details of an Airbyte connector execution failure in the output + captured in an `EntrypointOutput` object. Use `EntrypointOutput.as_exception()` to + convert it to an exception. + + Example Usage: + output = EntrypointOutput(...) + if output.errors: + raise output.as_exception() + """ + + class EntrypointOutput: """A class to encapsulate the output of an Airbyte connector's execution. @@ -67,6 +83,7 @@ def __init__( messages: list[str] | None = None, uncaught_exception: Optional[BaseException] = None, *, + command: list[str] | None = None, message_file: Path | None = None, ) -> None: if messages is None and message_file is None: @@ -74,6 +91,7 @@ def __init__( if messages is not None and message_file is not None: raise ValueError("Only one of messages or message_file can be provided") + self._command = command self._messages: list[AirbyteMessage] | None = None self._message_file: Path | None = message_file if messages: @@ -182,6 +200,39 @@ def analytics_messages(self) -> List[AirbyteMessage]: def errors(self) -> List[AirbyteMessage]: return self._get_trace_message_by_trace_type(TraceType.ERROR) + def get_formatted_error_message(self) -> str: + """Returns a human-readable error message with the contents. + + If there are no errors, returns an empty string. + """ + errors = self.errors + if not errors: + # If there are no errors, return an empty string. + return "" + + result = "Failed to run airbyte command" + result += ": " + " ".join(self._command) if self._command else "." + result += "\n" + "\n".join( + [str(error.trace.error).replace("\\n", "\n") for error in errors if error.trace], + ) + return result + + def as_exception(self) -> AirbyteEntrypointException: + """Convert the output to an exception.""" + return AirbyteEntrypointException(self.get_formatted_error_message()) + + def raise_if_errors( + self, + ) -> None: + """Raise an exception if there are errors in the output. + + Otherwise, do nothing. + """ + if not self.errors: + return None + + raise self.as_exception() + @property def catalog(self) -> AirbyteMessage: catalog = self.get_message_by_types([Type.CATALOG]) diff --git a/airbyte_cdk/test/standard_tests/_job_runner.py b/airbyte_cdk/test/standard_tests/_job_runner.py index 8f4174b1a..ddd293ffa 100644 --- a/airbyte_cdk/test/standard_tests/_job_runner.py +++ b/airbyte_cdk/test/standard_tests/_job_runner.py @@ -21,26 +21,6 @@ ) -def _errors_to_str( - entrypoint_output: entrypoint_wrapper.EntrypointOutput, -) -> str: - """Convert errors from entrypoint output to a string.""" - if not entrypoint_output.errors: - # If there are no errors, return an empty string. - return "" - - return "\n" + "\n".join( - [ - str(error.trace.error).replace( - "\\n", - "\n", - ) - for error in entrypoint_output.errors - if error.trace - ], - ) - - @runtime_checkable class IConnector(Protocol): """A connector that can be run in a test scenario. @@ -125,9 +105,7 @@ def run_test_job( expected_outcome=test_scenario.expected_outcome, ) if result.errors and test_scenario.expected_outcome.expect_success(): - raise AssertionError( - f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result) - ) + raise result.as_exception() if verb == "check": # Check is expected to fail gracefully without an exception. @@ -137,7 +115,7 @@ def run_test_job( "Expected exactly one CONNECTION_STATUS message. Got " f"{len(result.connection_status_messages)}:\n" + "\n".join([str(msg) for msg in result.connection_status_messages]) - + _errors_to_str(result) + + result.get_formatted_error_message() ) if test_scenario.expected_outcome.expect_exception(): conn_status = result.connection_status_messages[0].connectionStatus @@ -161,7 +139,8 @@ def run_test_job( if test_scenario.expected_outcome.expect_success(): assert not result.errors, ( - f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result) + f"Test job failed with {len(result.errors)} error(s): \n" + + result.get_formatted_error_message() ) return result diff --git a/airbyte_cdk/test/standard_tests/connector_base.py b/airbyte_cdk/test/standard_tests/connector_base.py index b945f1572..3c87281c9 100644 --- a/airbyte_cdk/test/standard_tests/connector_base.py +++ b/airbyte_cdk/test/standard_tests/connector_base.py @@ -45,7 +45,7 @@ def connector(cls) -> type[IConnector] | Callable[[], IConnector] | None: specific connector class to be tested. """ connector_root = cls.get_connector_root_dir() - connector_name = connector_root.absolute().name + connector_name = cls.connector_name expected_module_name = connector_name.replace("-", "_").lower() expected_class_name = connector_name.replace("-", "_").title().replace("_", "") diff --git a/airbyte_cdk/test/standard_tests/docker_base.py b/airbyte_cdk/test/standard_tests/docker_base.py index c3ee1f060..68bb4cf11 100644 --- a/airbyte_cdk/test/standard_tests/docker_base.py +++ b/airbyte_cdk/test/standard_tests/docker_base.py @@ -11,7 +11,7 @@ from dataclasses import asdict from pathlib import Path from subprocess import CompletedProcess, SubprocessError -from typing import Literal +from typing import Literal, cast import orjson import pytest @@ -25,19 +25,18 @@ DestinationSyncMode, SyncMode, ) -from airbyte_cdk.models.airbyte_protocol_serializers import ( - AirbyteCatalogSerializer, - AirbyteStreamSerializer, -) from airbyte_cdk.models.connector_metadata import MetadataFile from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput from airbyte_cdk.test.models import ConnectorTestScenario -from airbyte_cdk.test.utils.reading import catalog from airbyte_cdk.utils.connector_paths import ( ACCEPTANCE_TEST_CONFIG, find_connector_root, ) -from airbyte_cdk.utils.docker import build_connector_image, run_docker_command +from airbyte_cdk.utils.docker import ( + build_connector_image, + run_docker_airbyte_command, + run_docker_command, +) class DockerConnectorTestSuite: @@ -55,6 +54,17 @@ def get_connector_root_dir(cls) -> Path: """Get the root directory of the connector.""" return find_connector_root([cls.get_test_class_dir(), Path.cwd()]) + @classproperty + def connector_name(self) -> str: + """Get the name of the connector.""" + connector_root = self.get_connector_root_dir() + return connector_root.absolute().name + + @classmethod + def is_destination_connector(cls) -> bool: + """Check if the connector is a destination.""" + return cast(str, cls.connector_name).startswith("destination-") + @classproperty def acceptance_test_config_path(cls) -> Path: """Get the path to the acceptance test config file.""" @@ -145,23 +155,16 @@ def test_docker_image_build_and_spec( no_verify=False, ) - try: - result: CompletedProcess[str] = run_docker_command( - [ - "docker", - "run", - "--rm", - connector_image, - "spec", - ], - check=True, # Raise an error if the command fails - capture_stderr=True, - capture_stdout=True, - ) - except SubprocessError as ex: - raise AssertionError( - f"Failed to run `spec` command in docker image {connector_image!r}. Error: {ex!s}" - ) from None + _ = run_docker_airbyte_command( + [ + "docker", + "run", + "--rm", + connector_image, + "spec", + ], + raise_if_errors=True, + ) @pytest.mark.skipif( shutil.which("docker") is None, @@ -203,7 +206,7 @@ def test_docker_image_build_and_check( with scenario.with_temp_config_file( connector_root=connector_root, ) as temp_config_file: - _ = run_docker_command( + _ = run_docker_airbyte_command( [ "docker", "run", @@ -215,9 +218,7 @@ def test_docker_image_build_and_check( "--config", container_config_path, ], - check=True, # Raise an error if the command fails - capture_stderr=True, - capture_stdout=True, + raise_if_errors=True, ) @pytest.mark.skipif( @@ -242,6 +243,9 @@ def test_docker_image_build_and_read( the local docker image cache using `docker image prune -a` command. - If the --connector-image arg is provided, it will be used instead of building the image. """ + if self.is_destination_connector(): + pytest.skip("Skipping read test for destination connector.") + if scenario.expected_outcome.expect_exception(): pytest.skip("Skipping (expected to fail).") @@ -295,7 +299,7 @@ def test_docker_image_build_and_read( ) as temp_dir_str, ): temp_dir = Path(temp_dir_str) - discover_result = run_docker_command( + discover_result = run_docker_airbyte_command( [ "docker", "run", @@ -307,20 +311,12 @@ def test_docker_image_build_and_read( "--config", container_config_path, ], - check=True, # Raise an error if the command fails - capture_stderr=True, - capture_stdout=True, + raise_if_errors=True, ) - parsed_output = EntrypointOutput(messages=discover_result.stdout.splitlines()) - try: - catalog_message = parsed_output.catalog # Get catalog message - assert catalog_message.catalog is not None, "Catalog message missing catalog." - discovered_catalog: AirbyteCatalog = parsed_output.catalog.catalog - except Exception as ex: - raise AssertionError( - f"Failed to load discovered catalog from {discover_result.stdout}. " - f"Error: {ex!s}" - ) from None + + catalog_message = discover_result.catalog # Get catalog message + assert catalog_message.catalog is not None, "Catalog message missing catalog." + discovered_catalog: AirbyteCatalog = catalog_message.catalog if not discovered_catalog.streams: raise ValueError( f"Discovered catalog for connector '{connector_name}' is empty. " @@ -355,7 +351,7 @@ def test_docker_image_build_and_read( configured_catalog_path.write_text( orjson.dumps(asdict(configured_catalog)).decode("utf-8") ) - read_result: CompletedProcess[str] = run_docker_command( + read_result: EntrypointOutput = run_docker_airbyte_command( [ "docker", "run", @@ -371,18 +367,5 @@ def test_docker_image_build_and_read( "--catalog", container_catalog_path, ], - check=False, - capture_stderr=True, - capture_stdout=True, + raise_if_errors=True, ) - if read_result.returncode != 0: - raise AssertionError( - f"Failed to run `read` command in docker image {connector_image!r}. " - "\n-----------------" - f"EXIT CODE: {read_result.returncode}\n" - "STDERR:\n" - f"{read_result.stderr}\n" - f"STDOUT:\n" - f"{read_result.stdout}\n" - "\n-----------------" - ) from None diff --git a/airbyte_cdk/test/standard_tests/util.py b/airbyte_cdk/test/standard_tests/util.py index cb65c5260..fdbd02263 100644 --- a/airbyte_cdk/test/standard_tests/util.py +++ b/airbyte_cdk/test/standard_tests/util.py @@ -10,6 +10,7 @@ from airbyte_cdk.test.standard_tests.declarative_sources import ( DeclarativeSourceTestSuite, ) +from airbyte_cdk.test.standard_tests.destination_base import DestinationTestSuiteBase from airbyte_cdk.test.standard_tests.docker_base import DockerConnectorTestSuite from airbyte_cdk.test.standard_tests.source_base import SourceTestSuiteBase from airbyte_cdk.utils.connector_paths import ( @@ -17,15 +18,6 @@ find_connector_root_from_name, ) -TEST_CLASS_MAPPING: dict[ - Literal["python", "manifest-only", "java"], - type[DockerConnectorTestSuite], -] = { - "python": SourceTestSuiteBase, - "manifest-only": DeclarativeSourceTestSuite, - "java": DockerConnectorTestSuite, -} - def create_connector_test_suite( *, @@ -46,7 +38,7 @@ def create_connector_test_suite( # By here, we know that connector_directory is not None # but connector_name is None. Set the connector_name. assert connector_directory is not None, "connector_directory should not be None here." - connector_name = connector_directory.name + connector_name = connector_directory.absolute().name metadata_yaml_path = connector_directory / METADATA_YAML if not metadata_yaml_path.exists(): @@ -55,17 +47,25 @@ def create_connector_test_suite( ) metadata_dict: dict[str, Any] = yaml.safe_load(metadata_yaml_path.read_text()) metadata_tags = metadata_dict["data"].get("tags", []) - for language_option in TEST_CLASS_MAPPING: - if f"language:{language_option}" in metadata_tags: - language = language_option - test_suite_class = TEST_CLASS_MAPPING[language] - break - else: + language_tags: list[str] = [tag for tag in metadata_tags if tag.startswith("language:")] + if not language_tags: raise ValueError( - f"Unsupported connector type. " - f"Supported language values are: {', '.join(TEST_CLASS_MAPPING.keys())}. " + f"Metadata YAML file '{metadata_yaml_path}' does not contain a 'language' tag. " + "Please ensure the metadata file is correctly configured." f"Found tags: {', '.join(metadata_tags)}" ) + language = language_tags[0].split(":")[1] + + if language == "java": + test_suite_class = DockerConnectorTestSuite + elif language == "manifest-only": + test_suite_class = DeclarativeSourceTestSuite + elif language == "python" and connector_name.startswith("source-"): + test_suite_class = SourceTestSuiteBase + elif language == "python" and connector_name.startswith("destination-"): + test_suite_class = DestinationTestSuiteBase + else: + raise ValueError(f"Unsupported language for connector '{connector_name}': {language}") subclass_overrides: dict[str, Any] = { "get_connector_root_dir": classmethod(lambda cls: connector_directory), diff --git a/airbyte_cdk/utils/docker.py b/airbyte_cdk/utils/docker.py index 88050d99f..24467ff3c 100644 --- a/airbyte_cdk/utils/docker.py +++ b/airbyte_cdk/utils/docker.py @@ -17,6 +17,7 @@ import requests from airbyte_cdk.models.connector_metadata import ConnectorLanguage, MetadataFile +from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput from airbyte_cdk.utils.connector_paths import resolve_airbyte_repo_root @@ -94,7 +95,7 @@ def _build_image( try: run_docker_command( docker_args, - check=True, + raise_if_errors=True, capture_stderr=True, ) except subprocess.CalledProcessError as e: @@ -131,7 +132,7 @@ def _tag_image( try: run_docker_command( docker_args, - check=True, + raise_if_errors=True, capture_stderr=True, ) except subprocess.CalledProcessError as e: @@ -278,11 +279,14 @@ def build_connector_image( new_tags=[base_tag], ) if not no_verify: - if verify_connector_image(base_tag): + success, error_message = verify_connector_image(base_tag) + if success: click.echo(f"Build and verification completed successfully: {base_tag}") return base_tag - click.echo(f"Built image failed verification: {base_tag}", err=True) + click.echo( + f"Built image failed verification: {base_tag}\nError was:{error_message}", err=True + ) sys.exit(1) click.echo(f"Build completed successfully: {base_tag}") @@ -388,12 +392,16 @@ def get_dockerfile_templates( def run_docker_command( cmd: list[str], *, - check: bool = True, + raise_if_errors: bool = True, capture_stdout: bool | Path = False, capture_stderr: bool | Path = False, ) -> subprocess.CompletedProcess[str]: """Run a Docker command as a subprocess. + Note: When running Airbyte verbs such as `spec`, `discover`, `read`, etc., + use `run_docker_airbyte_command` instead to get an `EntrypointOutput` object as + the return value and to better handle exceptions sent as messages. + Args: cmd: The command to run as a list of strings. check: If True, raises an exception if the command fails. If False, the caller is @@ -432,13 +440,59 @@ def run_docker_command( completed_process: subprocess.CompletedProcess[str] = subprocess.run( cmd, text=True, - check=check, + check=raise_if_errors, stderr=stderr, stdout=stdout, ) return completed_process +def run_docker_airbyte_command( + cmd: list[str], + *, + raise_if_errors: bool = False, +) -> EntrypointOutput: + """Run an Airbyte command inside a Docker container. + + This wraps the `run_docker_command` function to process its results and + return an `EntrypointOutput` object. + + Args: + cmd: The command to run as a list of strings. + raise_if_errors: If True, raises an exception if the command fails. If False, the caller is + responsible for checking the for errors. + + Returns: + The output of the command as an `EntrypointOutput` object. + """ + process_result = run_docker_command( + cmd, + capture_stdout=True, + capture_stderr=True, + raise_if_errors=False, # We want to handle failures ourselves. + ) + result_output = EntrypointOutput( + command=cmd, + messages=process_result.stdout.splitlines(), + uncaught_exception=( + subprocess.CalledProcessError( + cmd=cmd, + returncode=process_result.returncode, + output=process_result.stdout, + stderr=process_result.stderr, + ) + if process_result.returncode != 0 + else None + ), + ) + if raise_if_errors: + # If check is True, we raise an exception if there are errors. + # This will do nothing if there are no errors. + result_output.raise_if_errors() + + return result_output + + def verify_docker_installation() -> bool: """Verify Docker is installed and running.""" try: @@ -450,7 +504,7 @@ def verify_docker_installation() -> bool: def verify_connector_image( image_name: str, -) -> bool: +) -> tuple[bool, str]: """Verify the built image by running the spec command. Args: @@ -461,37 +515,27 @@ def verify_connector_image( """ logger.info(f"Verifying image {image_name} with 'spec' command...") - cmd = ["docker", "run", "--rm", image_name, "spec"] - try: - result = run_docker_command( - cmd, - check=True, - capture_stderr=True, - capture_stdout=True, + result = run_docker_airbyte_command( + ["docker", "run", "--rm", image_name, "spec"], ) - # check that the output is valid JSON - if result.stdout: - found_spec_output = False - for line in result.stdout.split("\n"): - if line.strip(): - try: - # Check if the line is a valid JSON object - msg = json.loads(line) - if isinstance(msg, dict) and "type" in msg and msg["type"] == "SPEC": - found_spec_output = True - - except json.JSONDecodeError as e: - logger.warning(f"Invalid JSON output from spec command: {e}: {line}") - - if not found_spec_output: - logger.error("No valid JSON output found for spec command.") - return False - else: - logger.error("No output from spec command.") - return False - except subprocess.CalledProcessError as e: - logger.error(f"Image verification failed: {e.stderr}") - return False + if result.errors: + err_msg = result.get_formatted_error_message() + logger.error(err_msg) + return False, err_msg + + spec_messages = result.spec_messages + if not spec_messages: + err_msg = ( + "The container failed to produce valid output for the `spec` command.\nLog output:\n" + + str(result.logs) + ) + logger.error(err_msg) + return False, err_msg + + except Exception as ex: + err_msg = f"Unexpected error during image verification: {ex}" + logger.error(err_msg) + return False, err_msg - return True + return True, ""