diff --git a/.gitignore b/.gitignore index 3bc5b4929..0fc7cd804 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,5 @@ dist .idea .vscode **/__pycache__ +**/.secrets/ +**/secrets/ diff --git a/airbyte_cdk/test/README.md b/airbyte_cdk/test/README.md new file mode 100644 index 000000000..3709e6d44 --- /dev/null +++ b/airbyte_cdk/test/README.md @@ -0,0 +1,80 @@ +# Airbyte CDK Test Extras + +This module provides test utilities and fixtures for Airbyte connectors, including pre-built test suites that connector developers can easily use to run a full suite of tests. + +## Usage + +### Option 1: Using the built-in pytest plugin (recommended) + +The CDK includes a pytest plugin that automatically discovers connectors and their tests, eliminating the need for scaffolding files. To use it: + +1. Install the Airbyte CDK with test extras: + +```bash +poetry add airbyte-cdk[tests] +``` + +Or if you're developing the CDK itself: + +```bash +poetry install --extras tests +``` + +2. Run pytest in your connector directory with auto-discovery enabled: + +```bash +pytest --auto-discover +``` + +If your connector is in a different directory, you can specify it: + +```bash +pytest --auto-discover --connector-dir /path/to/connector +``` + +The plugin will: +- Automatically discover your connector type (source or destination) +- Find your connector class +- Load test scenarios from your acceptance test config file +- Run the appropriate tests + +### Option 2: Creating a minimal test scaffold (traditional approach) + +If you prefer more control over test discovery and execution, you can create a minimal test scaffold: + +1. Create a test file (e.g., `test_connector.py`): + +```python +from airbyte_cdk.test.declarative.test_suites.source_base import SourceTestSuiteBase +from your_connector.source import YourConnector + +class TestYourConnector(SourceTestSuiteBase): + connector = YourConnector + + @classmethod + def create_connector(cls, scenario): + return cls.connector() +``` + +2. Run pytest with the connector option: + +```bash +pytest --run-connector +``` + +## Acceptance Test Config + +The test suites will automatically look for and use an acceptance test config file named either: +- `connector-acceptance-tests.yml` +- `acceptance-test-config.yml` + +The config file is used to: +- Discover test scenarios +- Configure test behavior +- Set expectations for test results + +## Available Test Suites + +- `ConnectorTestSuiteBase`: Base test suite for all connectors +- `SourceTestSuiteBase`: Test suite for source connectors +- `DestinationTestSuiteBase`: Test suite for destination connectors diff --git a/airbyte_cdk/test/pytest_config/plugin.py b/airbyte_cdk/test/pytest_config/plugin.py index b24cdd332..d04a25b1b 100644 --- a/airbyte_cdk/test/pytest_config/plugin.py +++ b/airbyte_cdk/test/pytest_config/plugin.py @@ -1,40 +1,406 @@ +import importlib.util +import inspect +import os +import sys from pathlib import Path +from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Type, TypeVar, Union, cast import pytest +import yaml +from _pytest.config import Config +from _pytest.config.argparsing import Parser +from _pytest.nodes import Item +from _pytest.python import Metafunc, Module +from airbyte_cdk.sources import AbstractSource, Source +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, +) +from airbyte_cdk.test.declarative.models import ConnectorTestScenario +from airbyte_cdk.test.declarative.test_suites.connector_base import ConnectorTestSuiteBase +from airbyte_cdk.test.declarative.test_suites.destination_base import DestinationTestSuiteBase +from airbyte_cdk.test.declarative.test_suites.source_base import SourceTestSuiteBase -def pytest_collect_file(parent, path): - if path.basename == "test_connector.py": - return pytest.Module.from_parent(parent, path=path) +def pytest_collect_file(parent: pytest.Collector, path: Any) -> Optional[Module]: + """Handle file collection for pytest. -def pytest_configure(config): + Args: + parent: The parent collector + path: The path to the file being collected + """ + path_str = str(path) + path_name = os.path.basename(path_str) + + if path_name == "__init__.py": + return None + + if path_name == "test_connector.py": + return pytest.Module.from_parent(parent, path=path) # type: ignore + + connector_dir = os.environ.get("CONNECTOR_DIR") or os.getcwd() + path_parent = os.path.dirname(path_str) + if path_parent == connector_dir and _is_connector_directory(connector_dir): + return ConnectorTestNode.from_parent(parent, path=path) # type: ignore + + return None + + +def pytest_configure(config: Config) -> None: + """Configure pytest.""" config.addinivalue_line("markers", "connector: mark test as a connector test") + config.addinivalue_line("markers", "source: mark test as a source connector test") + config.addinivalue_line("markers", "destination: mark test as a destination connector test") + config.addinivalue_line("markers", "auto_discover: mark test as auto-discovered") -def pytest_addoption(parser): +def pytest_addoption(parser: Parser) -> None: + """Add custom CLI options to pytest.""" parser.addoption( "--run-connector", action="store_true", default=False, help="run connector tests", ) + parser.addoption( + "--connector-dir", + action="store", + default=None, + help="directory containing the connector to test", + ) + parser.addoption( + "--auto-discover", + action="store_true", + default=False, + help="enable automatic discovery of connector tests", + ) + + +class ConnectorTestNode(pytest.File): + """Custom pytest collector for auto-discovered connector tests.""" + + def collect(self) -> Generator[pytest.Item, None, None]: + """Collect test items from a connector directory.""" + connector_dir = os.environ.get("CONNECTOR_DIR") or os.getcwd() + connector_type = _determine_connector_type(connector_dir) + + if connector_type == "source": + test_class = _create_dynamic_source_test_suite(connector_dir) + if test_class: + for name, method in inspect.getmembers(test_class, inspect.isfunction): + if name.startswith("test_"): + yield AutoDiscoveredTestItem.from_parent( + self, name=name, test_class=test_class, test_method=method + ) + elif connector_type == "destination": + test_class = _create_dynamic_destination_test_suite(connector_dir) + if test_class: + for name, method in inspect.getmembers(test_class, inspect.isfunction): + if name.startswith("test_"): + yield AutoDiscoveredTestItem.from_parent( + self, name=name, test_class=test_class, test_method=method + ) + + +T = TypeVar("T") + + +class AutoDiscoveredTestItem(pytest.Item): + """Custom pytest item for auto-discovered tests.""" + + def __init__( + self, + name: str, + parent: pytest.Collector, + test_class: Type[T], + test_method: Callable[..., Any], + ) -> None: + super().__init__(name, parent) + self.test_class = test_class + self.test_method = test_method + self.add_marker(pytest.mark.auto_discover) + + def runtest(self) -> None: + """Run the test.""" + instance = self.test_class() + + get_scenarios = getattr(self.test_class, "get_scenarios", None) + scenarios = ( + get_scenarios() if callable(get_scenarios) else [ConnectorTestScenario(id="default")] + ) + + for scenario in scenarios: + instance_name = f"{self.name}[{scenario.id or 'default'}]" + print(f"Running {instance_name}") + self.test_method(instance, scenario) + + def reportinfo(self) -> Tuple[Path, Optional[int], str]: + """Return test location information.""" + return self.fspath, None, f"{self.name}" + + +def _is_connector_directory(directory_path: str) -> bool: + """Check if a directory is a connector directory.""" + auto_discover = os.environ.get("AUTO_DISCOVER") == "true" + + try: + config = getattr(pytest, "config", None) + if config and config.getoption("--auto-discover", False): + auto_discover = True + except (AttributeError, ValueError): + pass + + if not auto_discover: + return False + + path = Path(directory_path) + + indicator_files = [ + path / "metadata.yaml", + path / "source.py", + path / "destination.py", + path / "manifest.yaml", + ] + + return any(file.exists() for file in indicator_files) + + +def _determine_connector_type(directory_path: str) -> str: + """Determine if a directory contains a source or destination connector.""" + path = Path(directory_path) + + if (path / "source.py").exists(): + return "source" + if (path / "destination.py").exists(): + return "destination" + + metadata_path = path / "metadata.yaml" + if metadata_path.exists(): + with open(metadata_path, "r") as f: + content = f.read() + if "sourceDefinitionId" in content: + return "source" + elif "destinationDefinitionId" in content: + return "destination" + + return "source" + + +def _create_dynamic_source_test_suite(connector_dir: str) -> Optional[Type[Any]]: + """Create a dynamic source test suite class for a discovered connector.""" + connector_path = Path(connector_dir) + + source_file = None + for file in connector_path.glob("**/*.py"): + if file.name == "source.py": + source_file = file + break + + if not source_file: + return None + + try: + module_name = f"discovered_source_{connector_path.name.replace('-', '_')}" + spec = importlib.util.spec_from_file_location(module_name, source_file) + if spec is None: + return None + + module = importlib.util.module_from_spec(spec) + if spec.loader is None: + return None + + spec.loader.exec_module(module) + + source_class = None + for name, obj in inspect.getmembers(module): + if inspect.isclass(obj) and name.startswith("Source"): + source_class = obj + break + + if not source_class: + return None + + class DiscoveredSourceTestSuite(SourceTestSuiteBase): + connector = source_class + working_dir = connector_path + + acceptance_test_config_path = next( + ( + path + for path in [ + connector_path / "connector-acceptance-tests.yml", + connector_path / "acceptance-test-config.yml", + ] + if path.exists() + ), + None, # type: ignore + ) + + @classmethod + def create_connector(cls, scenario: ConnectorTestScenario) -> Any: + return cls.connector() if callable(cls.connector) else None + + @classmethod + def get_scenarios(cls) -> List[ConnectorTestScenario]: + """Get test scenarios from acceptance test config if it exists.""" + if cls.acceptance_test_config_path and cls.acceptance_test_config_path.exists(): + with open(cls.acceptance_test_config_path, "r") as f: + config = yaml.safe_load(f) + + scenarios = [] + if "test_read" in config and "config_path" in config["test_read"]: + config_path = Path(connector_path) / config["test_read"]["config_path"] + if config_path.exists(): + with open(config_path, "r") as f: + config_dict = yaml.safe_load(f) + scenarios.append( + ConnectorTestScenario( + id="default", + config_dict=config_dict, + ) + ) + + if scenarios: + return scenarios + + return [ConnectorTestScenario(id="default")] + + return DiscoveredSourceTestSuite + + except Exception as e: + print(f"Error creating dynamic test suite: {e}") + return None + + +def _create_dynamic_destination_test_suite(connector_dir: str) -> Optional[Type[Any]]: + """Create a dynamic destination test suite class for a discovered connector.""" + connector_path = Path(connector_dir) + + destination_file = None + for file in connector_path.glob("**/*.py"): + if file.name == "destination.py": + destination_file = file + break + + if not destination_file: + return None + + try: + module_name = f"discovered_destination_{connector_path.name.replace('-', '_')}" + spec = importlib.util.spec_from_file_location(module_name, destination_file) + if spec is None: + return None + + module = importlib.util.module_from_spec(spec) + if spec.loader is None: + return None + + spec.loader.exec_module(module) + + destination_class = None + for name, obj in inspect.getmembers(module): + if inspect.isclass(obj) and name.startswith("Destination"): + destination_class = obj + break + + if not destination_class: + return None + + class DiscoveredDestinationTestSuite(DestinationTestSuiteBase): + connector = destination_class + working_dir = connector_path + + acceptance_test_config_path = next( + ( + path + for path in [ + connector_path / "connector-acceptance-tests.yml", + connector_path / "acceptance-test-config.yml", + ] + if path.exists() + ), + None, # type: ignore + ) + + @classmethod + def create_connector(cls, scenario: ConnectorTestScenario) -> Any: + return cls.connector() if callable(cls.connector) else None + + @classmethod + def get_scenarios(cls) -> List[ConnectorTestScenario]: + """Get test scenarios from acceptance test config if it exists.""" + if cls.acceptance_test_config_path and cls.acceptance_test_config_path.exists(): + with open(cls.acceptance_test_config_path, "r") as f: + config = yaml.safe_load(f) + + scenarios = [] + if "test_read" in config and "config_path" in config["test_read"]: + config_path = Path(connector_path) / config["test_read"]["config_path"] + if config_path.exists(): + with open(config_path, "r") as f: + config_dict = yaml.safe_load(f) + scenarios.append( + ConnectorTestScenario( + id="default", + config_dict=config_dict, + ) + ) + + if scenarios: + return scenarios + + return [ConnectorTestScenario(id="default")] + + return DiscoveredDestinationTestSuite + + except Exception as e: + print(f"Error creating dynamic test suite: {e}") + return None + + +def pytest_collection_modifyitems(config: Config, items: List[Item]) -> None: + """Modify collected items based on CLI options.""" + if not config.getoption("--run-connector") and not config.getoption("--auto-discover"): + skip_connector = pytest.mark.skip( + reason="need --run-connector or --auto-discover option to run" + ) + for item in items: + if "connector" in item.keywords or "auto_discover" in item.keywords: + item.add_marker(skip_connector) + + if config.getoption("--auto-discover"): + os.environ["AUTO_DISCOVER"] = "true" + + connector_dir = config.getoption("--connector-dir") + if connector_dir: + os.environ["CONNECTOR_DIR"] = connector_dir + + +def pytest_generate_tests(metafunc: Metafunc) -> None: + """Generate tests from scenarios. + This hook allows for the automatic parametrization of test methods + with scenarios from test classes, without requiring explicit calls + to generate_tests in test files. + """ + if "instance" in metafunc.fixturenames: + test_class = metafunc.cls + if test_class is None: + return -def pytest_collection_modifyitems(config, items): - if config.getoption("--run-connector"): - return - skip_connector = pytest.mark.skip(reason="need --run-connector option to run") - for item in items: - if "connector" in item.keywords: - item.add_marker(skip_connector) + get_scenarios = getattr(test_class, "get_scenarios", None) + if callable(get_scenarios): + scenarios = test_class.get_scenarios() + if scenarios: + ids = [scenario.id or f"scenario_{i}" for i, scenario in enumerate(scenarios)] + metafunc.parametrize("instance", scenarios, ids=ids) -def pytest_runtest_setup(item): - # This hook is called before each test function is executed +def pytest_runtest_setup(item: Item) -> None: + """This hook is called before each test function is executed.""" print(f"Setting up test: {item.name}") -def pytest_runtest_teardown(item, nextitem): - # This hook is called after each test function is executed +def pytest_runtest_teardown(item: Item, nextitem: Optional[Item]) -> None: + """This hook is called after each test function is executed.""" print(f"Tearing down test: {item.name}") diff --git a/poetry.lock b/poetry.lock index 992f7f8f8..c9bd13f1d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -5502,9 +5502,10 @@ type = ["pytest-mypy"] [extras] file-based = ["avro", "fastavro", "markdown", "pdf2image", "pdfminer.six", "pyarrow", "pytesseract", "python-calamine", "python-snappy", "unstructured", "unstructured.pytesseract"] sql = ["sqlalchemy"] +tests = [] vector-db-based = ["cohere", "langchain", "openai", "tiktoken"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.13" -content-hash = "c8731f26643e07136e524d5e0d6e0f5c2229cf63d43bf5644de9f1cf8e565197" +content-hash = "e179fe02d4aea69cff7d86b93989ca033b7a2160a82d4e5e563b46772e8acbc1" diff --git a/pyproject.toml b/pyproject.toml index d236c0b9d..b98aab637 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -110,11 +110,17 @@ deptry = "^0.23.0" file-based = ["avro", "fastavro", "pyarrow", "unstructured", "pdf2image", "pdfminer.six", "unstructured.pytesseract", "pytesseract", "markdown", "python-calamine", "python-snappy"] vector-db-based = ["langchain", "openai", "cohere", "tiktoken"] sql = ["sqlalchemy"] +tests = ["pytest", "pytest-mock", "pytest-cov"] [tool.poetry.scripts] source-declarative-manifest = "airbyte_cdk.cli.source_declarative_manifest:run" +[tool.poetry.plugins.pytest11] +# pytest11 is the entry point group name for pytest plugins +# https://docs.pytest.org/en/7.3.x/how-to/writing_plugins.html#making-your-plugin-installable-by-others +airbyte_cdk = "airbyte_cdk.test.pytest_config.plugin" + [tool.isort] skip = ["__init__.py"] # TODO: Remove after this is fixed: https://github.com/airbytehq/airbyte-python-cdk/issues/12 @@ -252,4 +258,5 @@ DEP004 = [ # TODO: These should probably be declared within a `tests` extra: "pytest", "requests_mock", + "_pytest", ] diff --git a/unit_tests/resources/sample_connector/connector-acceptance-tests.yml b/unit_tests/resources/sample_connector/connector-acceptance-tests.yml new file mode 100644 index 000000000..93370f2bd --- /dev/null +++ b/unit_tests/resources/sample_connector/connector-acceptance-tests.yml @@ -0,0 +1,4 @@ +connector_image: sample-connector +test_read: + config_path: secrets/config.json + empty_streams: [] diff --git a/unit_tests/resources/sample_connector/source.py b/unit_tests/resources/sample_connector/source.py new file mode 100644 index 000000000..bbe3db541 --- /dev/null +++ b/unit_tests/resources/sample_connector/source.py @@ -0,0 +1,17 @@ +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import Stream + + +class SampleStream(Stream): + name = "sample" + + def read_records(self, *args, **kwargs): + yield {"id": 1, "name": "Test"} + + +class SourceSampleConnector(AbstractSource): + def check_connection(self, logger, config): + return True, None + + def streams(self, config): + return [SampleStream()] diff --git a/unit_tests/test/pytest_config/test_plugin.py b/unit_tests/test/pytest_config/test_plugin.py new file mode 100644 index 000000000..a46671601 --- /dev/null +++ b/unit_tests/test/pytest_config/test_plugin.py @@ -0,0 +1,103 @@ +import os +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from airbyte_cdk.test.pytest_config.plugin import ( + _create_dynamic_source_test_suite, + _determine_connector_type, + _is_connector_directory, + pytest_generate_tests, +) + + +class TestPytestPlugin: + def test_is_connector_directory(self, tmpdir): + connector_dir = tmpdir / "connector" + connector_dir.mkdir() + (connector_dir / "metadata.yaml").write_text("sourceDefinitionId: 123", encoding="utf-8") + + with patch.dict(os.environ, {"AUTO_DISCOVER": "true"}): + assert _is_connector_directory(str(connector_dir)) is True + + with patch.dict(os.environ, {"AUTO_DISCOVER": "false"}): + assert _is_connector_directory(str(connector_dir)) is False + + def test_determine_connector_type(self, tmpdir): + source_dir = tmpdir / "source" + source_dir.mkdir() + (source_dir / "source.py").write_text("class SourceTest: pass", encoding="utf-8") + + dest_dir = tmpdir / "destination" + dest_dir.mkdir() + (dest_dir / "destination.py").write_text("class DestinationTest: pass", encoding="utf-8") + + metadata_dir = tmpdir / "metadata" + metadata_dir.mkdir() + (metadata_dir / "metadata.yaml").write_text("sourceDefinitionId: 123", encoding="utf-8") + + assert _determine_connector_type(str(source_dir)) == "source" + assert _determine_connector_type(str(dest_dir)) == "destination" + assert _determine_connector_type(str(metadata_dir)) == "source" + + def test_create_dynamic_source_test_suite(self, tmpdir): + source_dir = tmpdir / "source" + source_dir.mkdir() + source_file = source_dir / "source.py" + source_file.write_text( + """ + from airbyte_cdk.sources import AbstractSource + + class SourceTest(AbstractSource): + def check_connection(self, logger, config): + return True, None + + def streams(self, config): + return [] + """, + encoding="utf-8", + ) + + config_file = source_dir / "connector-acceptance-tests.yml" + config_file.write_text( + """ + test_read: + config_path: config.json + """, + encoding="utf-8", + ) + + (source_dir / "config.json").write_text('{"api_key": "test"}', encoding="utf-8") + + with ( + patch("importlib.util.spec_from_file_location"), + patch("importlib.util.module_from_spec"), + patch("inspect.getmembers", return_value=[("SourceTest", type("SourceTest", (), {}))]), + ): + test_suite_class = _create_dynamic_source_test_suite(str(source_dir)) + + assert test_suite_class is not None + assert hasattr(test_suite_class, "working_dir") + assert test_suite_class.working_dir == source_dir + assert hasattr(test_suite_class, "acceptance_test_config_path") + + def test_pytest_generate_tests(self): + metafunc = MagicMock() + metafunc.fixturenames = ["instance"] + + scenarios = [MagicMock(), MagicMock()] + scenarios[0].id = "scenario1" + scenarios[1].id = "scenario2" + + mock_class = MagicMock() + mock_class.get_scenarios.return_value = scenarios + metafunc.cls = mock_class + + pytest_generate_tests(metafunc) + + metafunc.parametrize.assert_called_once() + args, kwargs = metafunc.parametrize.call_args + assert args[0] == "instance" + assert args[1] == scenarios + assert kwargs["ids"] == ["scenario1", "scenario2"] diff --git a/unit_tests/test/pytest_config/test_plugin_direct.py b/unit_tests/test/pytest_config/test_plugin_direct.py new file mode 100644 index 000000000..fd5adef7c --- /dev/null +++ b/unit_tests/test/pytest_config/test_plugin_direct.py @@ -0,0 +1,35 @@ +import os +from pathlib import Path + +import pytest +from _pytest.monkeypatch import MonkeyPatch + +from airbyte_cdk.test.pytest_config.plugin import ( + _create_dynamic_source_test_suite, + _determine_connector_type, + _is_connector_directory, +) + + +@pytest.mark.integration +def test_plugin_functionality(): + """Test that the plugin can auto-discover connector functionality.""" + sample_connector_path = str( + Path(__file__).parent.parent.parent / "resources" / "sample_connector" + ) + + with MonkeyPatch().context() as mp: + mp.setenv("AUTO_DISCOVER", "true") + assert _is_connector_directory(sample_connector_path) is True + + assert _determine_connector_type(sample_connector_path) == "source" + + test_suite_class = _create_dynamic_source_test_suite(sample_connector_path) + assert test_suite_class is not None + + scenarios = test_suite_class.get_scenarios() + assert len(scenarios) == 1 + assert scenarios[0].id == "default" + + connector = test_suite_class.create_connector(scenarios[0]) + assert connector is not None diff --git a/unit_tests/test/pytest_config/test_plugin_integration.py b/unit_tests/test/pytest_config/test_plugin_integration.py new file mode 100644 index 000000000..11b5027de --- /dev/null +++ b/unit_tests/test/pytest_config/test_plugin_integration.py @@ -0,0 +1,28 @@ +import os +import subprocess +from pathlib import Path + +import pytest + + +@pytest.mark.integration +def test_plugin_auto_discovery(): + """Test that the plugin can auto-discover and run tests.""" + sample_connector_path = Path(__file__).parent.parent.parent / "resources" / "sample_connector" + + result = subprocess.run( + [ + "pytest", + "-xvs", + "--auto-discover", + f"--connector-dir={sample_connector_path}", + ], + capture_output=True, + text=True, + ) + + assert result.returncode == 0, f"Pytest failed: {result.stderr}" + + assert "Running test_connection[default]" in result.stdout + assert "Running test_discover[default]" in result.stdout + assert "2 passed" in result.stdout