Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 209 additions & 0 deletions airbyte/cli/pyab.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@

from __future__ import annotations

import json
import re
import sys
import time
from pathlib import Path
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -631,6 +633,212 @@ def sync(
)


def _get_smoke_test_source(
*,
scenario_filter: str | None = None,
include_large_batch: bool = False,
custom_scenarios: str | None = None,
) -> Source:
"""Create a smoke test source with the given configuration.

The smoke test source generates synthetic data across predefined scenarios
that cover common destination failure patterns.

Args:
scenario_filter: Optional comma-separated list of scenario names to include.
If not provided, all fast (non-high-volume) scenarios are included.
include_large_batch: Whether to include the large_batch_stream scenario.
custom_scenarios: Optional path to a JSON or YAML file containing additional
scenario definitions. Each scenario should have 'name', 'json_schema',
and optionally 'records' and 'primary_key'.
"""
source_config: dict[str, Any] = {
"all_fast_streams": True,
"all_slow_streams": include_large_batch,
}
if scenario_filter:
source_config["scenario_filter"] = [s.strip() for s in scenario_filter.split(",")]
if custom_scenarios:
custom_path = Path(custom_scenarios)
if not custom_path.exists():
raise PyAirbyteInputError(
message="Custom scenarios file not found.",
input_value=str(custom_path),
)
loaded = yaml.safe_load(custom_path.read_text(encoding="utf-8"))
if isinstance(loaded, list):
source_config["custom_scenarios"] = loaded
elif isinstance(loaded, dict) and "custom_scenarios" in loaded:
source_config["custom_scenarios"] = loaded["custom_scenarios"]
else:
raise PyAirbyteInputError(
message=(
"Custom scenarios file must contain a list of scenarios "
"or a dict with a 'custom_scenarios' key."
),
input_value=str(custom_path),
)

return get_source(
name="source-smoke-test",
config=source_config,
streams="*",
local_executable="source-smoke-test",
)


@click.command(name="destination-smoke-test")
@click.option(
"--destination",
type=str,
required=True,
help=(
"The destination connector to test. Can be a connector name "
"(e.g. 'destination-snowflake'), a Docker image with tag "
"(e.g. 'airbyte/destination-snowflake:3.14.0'), or a path to a local executable."
),
)
@click.option(
"--config",
type=str,
help="The destination configuration. " + CONFIG_HELP,
)
@click.option(
"--pip-url",
type=str,
help="Optional pip URL for the destination (Python connectors only). " + PIP_URL_HELP,
)
@click.option(
"--use-python",
type=str,
help=(
"Python interpreter specification. Use 'true' for current Python, "
"'false' for Docker, a path for specific interpreter, or a version "
"string for uv-managed Python (e.g., '3.11', 'python3.12')."
),
)
@click.option(
"--scenario-filter",
type=str,
default=None,
help=(
"Comma-separated list of smoke test scenario names to run. "
"If not provided, all fast (non-high-volume) scenarios are included. "
"Available scenarios include: basic_types_stream, timestamp_stream, "
"large_decimal_stream, nested_json_stream, null_values_stream, "
"column_name_edge_cases, table_name_with_dots, table_name_with_spaces, "
"CamelCaseStream, wide_table_50_cols, empty_stream, single_record_stream, "
"unicode_special_strings, no_pk_stream, long_column_names."
),
)
@click.option(
"--include-large-batch",
is_flag=True,
default=False,
help="Include the large_batch_stream scenario (1000 records by default).",
)
@click.option(
"--custom-scenarios",
type=str,
default=None,
help=(
"Path to a JSON or YAML file containing additional custom test scenarios. "
"Each scenario should define 'name', 'json_schema', and optionally 'records' "
"and 'primary_key'. These are unioned with the predefined scenarios."
),
)
def destination_smoke_test(
*,
destination: str,
config: str | None = None,
pip_url: str | None = None,
use_python: str | None = None,
scenario_filter: str | None = None,
include_large_batch: bool = False,
custom_scenarios: str | None = None,
) -> None:
r"""Run smoke tests against a destination connector.

Sends synthetic test data from the smoke test source to the specified
destination and reports success or failure. The smoke test source
generates data across predefined scenarios covering common destination
failure patterns: type variations, null handling, naming edge cases,
schema variations, and batch sizes.

This command does NOT read back data from the destination or compare
results. It only verifies that the destination accepts the data without
errors.

Examples:
\b
# Test with a Docker destination:
pyab destination-smoke-test \\
--destination=airbyte/destination-dev-null:latest

\b
# Test with config file:
pyab destination-smoke-test \\
--destination=destination-snowflake \\
--config=./secrets/snowflake.json

\b
# Run only specific scenarios:
pyab destination-smoke-test \\
--destination=destination-motherduck \\
--config='{motherduck_api_key: "SECRET:MOTHERDUCK_API_KEY"}' \\
--scenario-filter=basic_types_stream,null_values_stream
"""
click.echo("Resolving destination...", file=sys.stderr)
destination_obj = _resolve_destination_job(
destination=destination,
config=config,
pip_url=pip_url,
use_python=use_python,
)

click.echo("Creating smoke test source...", file=sys.stderr)
source_obj = _get_smoke_test_source(
scenario_filter=scenario_filter,
include_large_batch=include_large_batch,
custom_scenarios=custom_scenarios,
)

click.echo("Running destination smoke test...", file=sys.stderr)
start_time = time.monotonic()
success = False
error_message: str | None = None
records_delivered = 0
try:
write_result = destination_obj.write(
source_data=source_obj,
cache=False,
state_cache=False,
)
records_delivered = write_result.processed_records
success = True
except Exception as ex:
error_message = str(ex)
click.echo(f"Smoke test FAILED: {ex}", file=sys.stderr)

elapsed = time.monotonic() - start_time

result = {
"success": success,
"destination": _get_connector_name(destination),
"records_delivered": records_delivered,
"scenarios_requested": scenario_filter or "all_fast",
"include_large_batch": include_large_batch,
"elapsed_seconds": round(elapsed, 2),
}
if error_message:
result["error"] = error_message

click.echo(json.dumps(result, indent=2))

if not success:
sys.exit(1)


@click.group()
def cli() -> None:
"""@private PyAirbyte CLI."""
Expand All @@ -640,6 +848,7 @@ def cli() -> None:
cli.add_command(validate)
cli.add_command(benchmark)
cli.add_command(sync)
cli.add_command(destination_smoke_test)

if __name__ == "__main__":
cli()
Loading
Loading