Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 221 additions & 0 deletions airbyte/_util/destination_smoke_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Shared implementation for destination smoke tests.

This module provides the core logic for running smoke tests against destination
connectors. It is used by both the CLI (`pyab destination-smoke-test`) and the
MCP tool (`destination_smoke_test`).

Smoke tests send synthetic data from the built-in smoke test source to a
destination connector and report whether the destination accepted the data
without errors. No readback or comparison is performed.
"""

from __future__ import annotations

import time
from pathlib import Path
from typing import TYPE_CHECKING, Any

import yaml
from pydantic import BaseModel

from airbyte import get_source
from airbyte.exceptions import PyAirbyteInputError


if TYPE_CHECKING:
from airbyte.destinations.base import Destination
from airbyte.sources.base import Source


class DestinationSmokeTestResult(BaseModel):
"""Result of a destination smoke test run."""

success: bool
"""Whether the smoke test passed (destination accepted all data without errors)."""

destination: str
"""The destination connector name."""

records_delivered: int
"""Total number of records delivered to the destination."""

scenarios_requested: str
"""Which scenarios were requested ('all' or a comma-separated list)."""

elapsed_seconds: float
"""Time taken for the smoke test in seconds."""

error: str | None = None
"""Error message if the smoke test failed."""


def get_smoke_test_source(
*,
scenarios: str | list[str] = "fast",
custom_scenarios: list[dict[str, Any]] | None = None,
custom_scenarios_file: str | None = None,
) -> Source:
"""Create a smoke test source with the given configuration.

The smoke test source generates synthetic data across predefined scenarios
that cover common destination failure patterns.

`scenarios` controls which scenarios to run:

- `'fast'` (default): runs all fast (non-high-volume) predefined scenarios,
excluding `large_batch_stream`.
- `'all'`: runs every predefined scenario including `large_batch_stream`.
- A comma-separated string or list of specific scenario names.

`custom_scenarios` is an optional list of scenario dicts to inject directly.

`custom_scenarios_file` is an optional path to a JSON or YAML file containing
additional scenario definitions. Each scenario should have `name`, `json_schema`,
and optionally `records` and `primary_key`.
"""
# Normalize empty list to "fast" (default)
if isinstance(scenarios, list) and not scenarios:
scenarios = "fast"

scenarios_str = ",".join(scenarios) if isinstance(scenarios, list) else scenarios
keyword = scenarios_str.strip().lower()
is_all = keyword == "all"
is_fast = keyword == "fast"

if is_all:
source_config: dict[str, Any] = {
"all_fast_streams": True,
"all_slow_streams": True,
}
elif is_fast:
source_config: dict[str, Any] = {
"all_fast_streams": True,
"all_slow_streams": False,
}
else:
source_config: dict[str, Any] = {
"all_fast_streams": False,
"all_slow_streams": False,
}
if isinstance(scenarios, list):
source_config["scenario_filter"] = [s.strip() for s in scenarios if s.strip()]
else:
source_config["scenario_filter"] = [
s.strip() for s in scenarios.split(",") if s.strip()
]

# Handle custom scenarios passed as a list of dicts (MCP path)
if custom_scenarios:
source_config["custom_scenarios"] = custom_scenarios

# Handle custom scenarios from a file path (CLI path)
if custom_scenarios_file:
custom_path = Path(custom_scenarios_file)
if not custom_path.exists():
raise PyAirbyteInputError(
message="Custom scenarios file not found.",
input_value=str(custom_path),
)
loaded = yaml.safe_load(custom_path.read_text(encoding="utf-8"))
if isinstance(loaded, list):
file_scenarios = loaded
elif isinstance(loaded, dict) and "custom_scenarios" in loaded:
file_scenarios = loaded["custom_scenarios"]
else:
raise PyAirbyteInputError(
message=(
"Custom scenarios file must contain a list of scenarios "
"or a dict with a 'custom_scenarios' key."
),
input_value=str(custom_path),
)
# Merge with any directly-provided custom scenarios
existing = source_config.get("custom_scenarios", [])
source_config["custom_scenarios"] = existing + file_scenarios

return get_source(
name="source-smoke-test",
config=source_config,
streams="*",
local_executable="source-smoke-test",
)


def _sanitize_error(ex: Exception) -> str:
"""Extract an error message from an exception without leaking secrets.

Uses `get_message()` when available (PyAirbyte exceptions) to avoid
including full config/context in the error string.
"""
if hasattr(ex, "get_message"):
return f"{type(ex).__name__}: {ex.get_message()}"
return f"{type(ex).__name__}: {ex}"


def run_destination_smoke_test(
*,
destination: Destination,
scenarios: str | list[str] = "fast",
custom_scenarios: list[dict[str, Any]] | None = None,
custom_scenarios_file: str | None = None,
) -> DestinationSmokeTestResult:
"""Run a smoke test against a destination connector.

Sends synthetic test data from the smoke test source to the specified
destination and returns a structured result.

This function does NOT read back data from the destination or compare
results. It only verifies that the destination accepts the data without
errors.

`destination` is a resolved `Destination` object ready for writing.

`scenarios` controls which predefined scenarios to run:

- `'fast'` (default): runs all fast (non-high-volume) predefined scenarios.
- `'all'`: runs every scenario including `large_batch_stream`.
- A comma-separated string or list of specific scenario names.

`custom_scenarios` is an optional list of scenario dicts to inject.

`custom_scenarios_file` is an optional path to a JSON/YAML file with
additional scenario definitions.
"""
source_obj = get_smoke_test_source(
scenarios=scenarios,
custom_scenarios=custom_scenarios,
custom_scenarios_file=custom_scenarios_file,
)

# Normalize scenarios to a display string
if isinstance(scenarios, list):
scenarios_str = ",".join(scenarios) if scenarios else "fast"
else:
scenarios_str = scenarios

start_time = time.monotonic()
success = False
error_message: str | None = None
records_delivered = 0
try:
write_result = destination.write(
source_data=source_obj,
cache=False,
state_cache=False,
)
records_delivered = write_result.processed_records
success = True
except Exception as ex:
error_message = _sanitize_error(ex)

elapsed = time.monotonic() - start_time

return DestinationSmokeTestResult(
success=success,
destination=destination.name,
records_delivered=records_delivered,
scenarios_requested=scenarios_str,
elapsed_seconds=round(elapsed, 2),
error=error_message,
)
117 changes: 117 additions & 0 deletions airbyte/cli/pyab.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@

from __future__ import annotations

import json
import re
import sys
from pathlib import Path
Expand All @@ -68,6 +69,7 @@
import click
import yaml

from airbyte._util.destination_smoke_tests import run_destination_smoke_test
from airbyte.destinations.util import get_destination, get_noop_destination
from airbyte.exceptions import PyAirbyteInputError
from airbyte.secrets.util import get_secret
Expand Down Expand Up @@ -631,6 +633,120 @@ def sync(
)


@click.command(name="destination-smoke-test")
@click.option(
"--destination",
type=str,
required=True,
help=(
"The destination connector to test. Can be a connector name "
"(e.g. 'destination-snowflake'), a Docker image with tag "
"(e.g. 'airbyte/destination-snowflake:3.14.0'), or a path to a local executable."
),
)
@click.option(
"--config",
type=str,
help="The destination configuration. " + CONFIG_HELP,
)
@click.option(
"--pip-url",
type=str,
help="Optional pip URL for the destination (Python connectors only). " + PIP_URL_HELP,
)
@click.option(
"--use-python",
type=str,
help=(
"Python interpreter specification. Use 'true' for current Python, "
"'false' for Docker, a path for specific interpreter, or a version "
"string for uv-managed Python (e.g., '3.11', 'python3.12')."
),
)
@click.option(
"--scenarios",
type=str,
default="fast",
help=(
"Which smoke test scenarios to run. "
"Use 'fast' (default) for all fast predefined scenarios "
"(excludes large_batch_stream), 'all' for every predefined scenario "
"including large batch, or provide a comma-separated list of scenario names. "
"Available scenarios: basic_types, timestamp_types, "
"large_decimals_and_numbers, nested_json_objects, null_handling, "
"column_naming_edge_cases, table_naming_edge_cases, "
"CamelCaseStreamName, wide_table_50_columns, empty_stream, "
"single_record_stream, unicode_and_special_strings, "
"schema_with_no_primary_key, long_column_names, large_batch_stream."
),
)
@click.option(
"--custom-scenarios",
type=str,
default=None,
help=(
"Path to a JSON or YAML file containing additional custom test scenarios. "
"Each scenario should define 'name', 'json_schema', and optionally 'records' "
"and 'primary_key'. These are unioned with the predefined scenarios."
),
)
def destination_smoke_test(
*,
destination: str,
config: str | None = None,
pip_url: str | None = None,
use_python: str | None = None,
scenarios: str = "fast",
custom_scenarios: str | None = None,
) -> None:
"""Run smoke tests against a destination connector.

Sends synthetic test data from the smoke test source to the specified
destination and reports success or failure. The smoke test source
generates data across predefined scenarios covering common destination
failure patterns: type variations, null handling, naming edge cases,
schema variations, and batch sizes.

This command does NOT read back data from the destination or compare
results. It only verifies that the destination accepts the data without
errors.

Usage examples:

`pyab destination-smoke-test --destination=destination-dev-null`

`pyab destination-smoke-test --destination=destination-snowflake
--config=./secrets/snowflake.json`

`pyab destination-smoke-test --destination=destination-motherduck
--scenarios=basic_types,null_handling`

`pyab destination-smoke-test --destination=destination-snowflake
--config=./secrets/snowflake.json --scenarios=all`
"""
click.echo("Resolving destination...", file=sys.stderr)
destination_obj = _resolve_destination_job(
destination=destination,
config=config,
pip_url=pip_url,
use_python=use_python,
)

click.echo("Running destination smoke test...", file=sys.stderr)
result = run_destination_smoke_test(
destination=destination_obj,
scenarios=scenarios,
custom_scenarios_file=custom_scenarios,
)

click.echo(json.dumps(result.model_dump(), indent=2))

if not result.success:
if result.error:
click.echo(f"Smoke test FAILED: {result.error}", file=sys.stderr)
sys.exit(1)


@click.group()
def cli() -> None:
"""@private PyAirbyte CLI."""
Expand All @@ -640,6 +756,7 @@ def cli() -> None:
cli.add_command(validate)
cli.add_command(benchmark)
cli.add_command(sync)
cli.add_command(destination_smoke_test)

if __name__ == "__main__":
cli()
Loading
Loading