Skip to content

Commit 09d36b1

Browse files
feat(cli,mcp): add namespace support to destination smoke tests
- Generate zz_deleteme_yyyymmdd_hhmm namespaces for test isolation - Add --namespace-suffix for concurrent run disambiguation - Add --reuse-namespace for multi-test runs on same namespace - Inject namespace into smoke test source streams and records - Include namespace in DestinationSmokeTestResult Co-Authored-By: AJ Steers <aj@airbyte.io>
1 parent 00ab7ca commit 09d36b1

File tree

4 files changed

+124
-2
lines changed

4 files changed

+124
-2
lines changed

airbyte/_util/destination_smoke_tests.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from __future__ import annotations
1414

1515
import time
16+
from datetime import datetime, timezone
1617
from pathlib import Path
1718
from typing import TYPE_CHECKING, Any
1819

@@ -23,11 +24,37 @@
2324
from airbyte.exceptions import PyAirbyteInputError
2425

2526

27+
NAMESPACE_PREFIX = "zz_deleteme"
28+
"""Prefix for auto-generated smoke test namespaces.
29+
30+
The ``zz_`` prefix sorts last alphabetically; ``deleteme`` signals the
31+
namespace is safe for automated cleanup.
32+
"""
33+
34+
2635
if TYPE_CHECKING:
2736
from airbyte.destinations.base import Destination
2837
from airbyte.sources.base import Source
2938

3039

40+
def generate_namespace(
41+
*,
42+
namespace_suffix: str | None = None,
43+
) -> str:
44+
"""Generate a smoke-test namespace.
45+
46+
Format: ``zz_deleteme_yyyymmdd_hhmm`` with an optional ``_<suffix>``.
47+
The ``zz_`` prefix sorts last alphabetically and the ``deleteme``
48+
token acts as a guard for automated cleanup scripts.
49+
"""
50+
now = datetime.now(tz=timezone.utc)
51+
ts = now.strftime("%Y%m%d_%H%M")
52+
ns = f"{NAMESPACE_PREFIX}_{ts}"
53+
if namespace_suffix:
54+
ns = f"{ns}_{namespace_suffix}"
55+
return ns
56+
57+
3158
class DestinationSmokeTestResult(BaseModel):
3259
"""Result of a destination smoke test run."""
3360

@@ -37,6 +64,9 @@ class DestinationSmokeTestResult(BaseModel):
3764
destination: str
3865
"""The destination connector name."""
3966

67+
namespace: str
68+
"""The namespace used for this smoke test run."""
69+
4070
records_delivered: int
4171
"""Total number of records delivered to the destination."""
4272

@@ -53,6 +83,7 @@ class DestinationSmokeTestResult(BaseModel):
5383
def get_smoke_test_source(
5484
*,
5585
scenarios: str | list[str] = "fast",
86+
namespace: str | None = None,
5687
custom_scenarios: list[dict[str, Any]] | None = None,
5788
custom_scenarios_file: str | None = None,
5889
) -> Source:
@@ -70,6 +101,9 @@ def get_smoke_test_source(
70101
71102
`custom_scenarios` is an optional list of scenario dicts to inject directly.
72103
104+
`namespace` is an optional namespace to set on all streams. When provided,
105+
the destination will write data into this namespace (schema, database, etc.).
106+
73107
`custom_scenarios_file` is an optional path to a JSON or YAML file containing
74108
additional scenario definitions. Each scenario should have `name`, `json_schema`,
75109
and optionally `records` and `primary_key`.
@@ -134,6 +168,9 @@ def get_smoke_test_source(
134168
existing = source_config.get("custom_scenarios", [])
135169
source_config["custom_scenarios"] = existing + file_scenarios
136170

171+
if namespace:
172+
source_config["namespace"] = namespace
173+
137174
return get_source(
138175
name="source-smoke-test",
139176
config=source_config,
@@ -157,6 +194,8 @@ def run_destination_smoke_test(
157194
*,
158195
destination: Destination,
159196
scenarios: str | list[str] = "fast",
197+
namespace_suffix: str | None = None,
198+
reuse_namespace: str | None = None,
160199
custom_scenarios: list[dict[str, Any]] | None = None,
161200
custom_scenarios_file: str | None = None,
162201
) -> DestinationSmokeTestResult:
@@ -177,13 +216,25 @@ def run_destination_smoke_test(
177216
- `'all'`: runs every scenario including `large_batch_stream`.
178217
- A comma-separated string or list of specific scenario names.
179218
219+
`namespace_suffix` is an optional suffix appended to the auto-generated
220+
namespace (e.g. ``zz_deleteme_20260318_2256_mysuffix``).
221+
222+
`reuse_namespace` is an exact namespace string to reuse from a previous
223+
run. When set, no new namespace is generated and cleanup is skipped.
224+
180225
`custom_scenarios` is an optional list of scenario dicts to inject.
181226
182227
`custom_scenarios_file` is an optional path to a JSON/YAML file with
183228
additional scenario definitions.
184229
"""
230+
# Determine namespace
231+
namespace = reuse_namespace or generate_namespace(
232+
namespace_suffix=namespace_suffix,
233+
)
234+
185235
source_obj = get_smoke_test_source(
186236
scenarios=scenarios,
237+
namespace=namespace,
187238
custom_scenarios=custom_scenarios,
188239
custom_scenarios_file=custom_scenarios_file,
189240
)
@@ -214,6 +265,7 @@ def run_destination_smoke_test(
214265
return DestinationSmokeTestResult(
215266
success=success,
216267
destination=destination.name,
268+
namespace=namespace,
217269
records_delivered=records_delivered,
218270
scenarios_requested=scenarios_str,
219271
elapsed_seconds=round(elapsed, 2),

airbyte/cli/pyab.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,26 @@ def sync(
690690
"and 'primary_key'. These are unioned with the predefined scenarios."
691691
),
692692
)
693+
@click.option(
694+
"--namespace-suffix",
695+
type=str,
696+
default=None,
697+
help=(
698+
"Optional suffix appended to the auto-generated namespace. "
699+
"The namespace format is 'zz_deleteme_yyyymmdd_hhmm_{suffix}'. "
700+
"Use this to distinguish concurrent runs."
701+
),
702+
)
703+
@click.option(
704+
"--reuse-namespace",
705+
type=str,
706+
default=None,
707+
help=(
708+
"Exact namespace to reuse from a previous run. "
709+
"When set, no new namespace is generated and cleanup is skipped. "
710+
"Useful for running a second test against an already-populated namespace."
711+
),
712+
)
693713
def destination_smoke_test(
694714
*,
695715
destination: str,
@@ -698,6 +718,8 @@ def destination_smoke_test(
698718
use_python: str | None = None,
699719
scenarios: str = "fast",
700720
custom_scenarios: str | None = None,
721+
namespace_suffix: str | None = None,
722+
reuse_namespace: str | None = None,
701723
) -> None:
702724
"""Run smoke tests against a destination connector.
703725
@@ -723,6 +745,13 @@ def destination_smoke_test(
723745
724746
`pyab destination-smoke-test --destination=destination-snowflake
725747
--config=./secrets/snowflake.json --scenarios=all`
748+
749+
`pyab destination-smoke-test --destination=destination-snowflake
750+
--config=./secrets/snowflake.json --namespace-suffix=run2`
751+
752+
`pyab destination-smoke-test --destination=destination-snowflake
753+
--config=./secrets/snowflake.json
754+
--reuse-namespace=zz_deleteme_20260318_2256`
726755
"""
727756
click.echo("Resolving destination...", file=sys.stderr)
728757
destination_obj = _resolve_destination_job(
@@ -736,6 +765,8 @@ def destination_smoke_test(
736765
result = run_destination_smoke_test(
737766
destination=destination_obj,
738767
scenarios=scenarios,
768+
namespace_suffix=namespace_suffix,
769+
reuse_namespace=reuse_namespace,
739770
custom_scenarios_file=custom_scenarios,
740771
)
741772

airbyte/cli/smoke_test_source/source.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,13 @@
5454

5555
def _build_streams_from_scenarios(
5656
scenarios: list[dict[str, Any]],
57+
namespace: str | None = None,
5758
) -> list[AirbyteStream]:
5859
"""Build AirbyteStream objects from scenario definitions."""
5960
return [
6061
AirbyteStream(
6162
name=scenario["name"],
63+
namespace=namespace,
6264
json_schema=scenario["json_schema"],
6365
supported_sync_modes=[SyncMode.full_refresh],
6466
source_defined_cursor=False,
@@ -174,6 +176,16 @@ def spec(
174176
"items": {"type": "string"},
175177
"default": [],
176178
},
179+
"namespace": {
180+
"type": ["string", "null"],
181+
"title": "Namespace",
182+
"description": (
183+
"Namespace (schema/database) to set on all "
184+
"streams. When provided, the destination will "
185+
"write data into this namespace."
186+
),
187+
"default": None,
188+
},
177189
},
178190
},
179191
)
@@ -320,7 +332,8 @@ def discover(
320332
) -> AirbyteCatalog:
321333
"""Return the catalog with all test scenario streams."""
322334
scenarios = self._get_all_scenarios(config)
323-
streams = _build_streams_from_scenarios(scenarios)
335+
namespace = config.get("namespace")
336+
streams = _build_streams_from_scenarios(scenarios, namespace=namespace)
324337
logger.info(f"Discovered {len(streams)} smoke test streams.")
325338
return AirbyteCatalog(streams=streams)
326339

@@ -368,11 +381,13 @@ def read(
368381
records = get_scenario_records(scenario)
369382
logger.info(f"Emitting {len(records)} records for stream '{stream_name}'.")
370383

384+
namespace = config.get("namespace")
371385
for record in records:
372386
yield AirbyteMessage(
373387
type=Type.RECORD,
374388
record=AirbyteRecordMessage(
375389
stream=stream_name,
390+
namespace=namespace,
376391
data=record,
377392
emitted_at=now_ms,
378393
),

airbyte/mcp/local.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -812,7 +812,7 @@ def run_sql_query(
812812
@mcp_tool(
813813
destructive=True,
814814
)
815-
def destination_smoke_test(
815+
def destination_smoke_test( # noqa: PLR0913, PLR0917
816816
destination_connector_name: Annotated[
817817
str,
818818
Field(
@@ -879,6 +879,28 @@ def destination_smoke_test(
879879
default=None,
880880
),
881881
],
882+
namespace_suffix: Annotated[
883+
str | None,
884+
Field(
885+
description=(
886+
"Optional suffix appended to the auto-generated namespace. "
887+
"The namespace format is 'zz_deleteme_yyyymmdd_hhmm_{suffix}'. "
888+
"Use this to distinguish concurrent runs."
889+
),
890+
default=None,
891+
),
892+
],
893+
reuse_namespace: Annotated[
894+
str | None,
895+
Field(
896+
description=(
897+
"Exact namespace to reuse from a previous run. "
898+
"When set, no new namespace is generated and cleanup is skipped. "
899+
"Useful for running a second test against an already-populated namespace."
900+
),
901+
default=None,
902+
),
903+
],
882904
) -> DestinationSmokeTestResult:
883905
"""Run smoke tests against a destination connector.
884906
@@ -920,6 +942,8 @@ def destination_smoke_test(
920942
return run_destination_smoke_test(
921943
destination=destination_obj,
922944
scenarios=resolved_scenarios,
945+
namespace_suffix=namespace_suffix,
946+
reuse_namespace=reuse_namespace,
923947
custom_scenarios=custom_scenarios,
924948
)
925949

0 commit comments

Comments
 (0)