Skip to content

Commit 3873093

Browse files
feat: Add preflight check to destination smoke test tool (#1007)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent 65e1c03 commit 3873093

File tree

3 files changed

+260
-11
lines changed

3 files changed

+260
-11
lines changed

airbyte/_util/destination_smoke_tests.py

Lines changed: 235 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from __future__ import annotations
1919

20+
import json
2021
import logging
2122
import time
2223
from datetime import datetime, timezone
@@ -98,6 +99,13 @@ class DestinationSmokeTestResult(BaseModel):
9899
error: str | None = None
99100
"""Error message if the smoke test failed."""
100101

102+
preflight_passed: bool | None = None
103+
"""Whether the preflight `basic_types` check passed.
104+
105+
`True` if the preflight check succeeded, `False` if it failed,
106+
`None` if the preflight check was skipped.
107+
"""
108+
101109
table_statistics: dict[str, TableStatistics] | None = None
102110
"""Map of stream name to table statistics (row counts, columns, stats).
103111
@@ -114,6 +122,14 @@ class DestinationSmokeTestResult(BaseModel):
114122
`None` when readback was not performed.
115123
"""
116124

125+
warnings: list[str] | None = None
126+
"""Non-fatal warnings encountered during the smoke test.
127+
128+
Includes readback errors (e.g. cache connection failures) that
129+
prevented `table_statistics` from being populated, as well as
130+
any other non-fatal issues worth surfacing to the caller.
131+
"""
132+
117133

118134
def get_smoke_test_source(
119135
*,
@@ -249,25 +265,201 @@ def _prepare_destination_config(
249265
destination.set_config(config, validate=False)
250266

251267

268+
def _extract_trace_error_from_log(ex: Exception) -> str | None:
269+
"""Search the connector log file for a TRACE ERROR `internal_message`.
270+
271+
Many Java-based connectors emit a generic user-facing `message` in
272+
their TRACE ERROR output (e.g. "Something went wrong in the connector")
273+
while the actionable detail lives in the `internal_message` field.
274+
PyAirbyte only captures the `message` in the exception chain, so this
275+
helper reads the connector's log file to recover the `internal_message`.
276+
277+
Returns the `internal_message` string if found, or `None`.
278+
"""
279+
# Walk the exception chain looking for a log_file path
280+
log_file: Path | None = None
281+
current: Exception | None = ex
282+
while current is not None:
283+
if hasattr(current, "log_file") and current.log_file is not None:
284+
log_file = current.log_file
285+
break
286+
current = getattr(current, "original_exception", None)
287+
288+
if log_file is None or not log_file.exists():
289+
return None
290+
291+
# Scan the log file in reverse for the last TRACE ERROR with an
292+
# internal_message. We only need the last one (the fatal error).
293+
try:
294+
lines = log_file.read_text(encoding="utf-8", errors="replace").splitlines()
295+
except OSError:
296+
return None
297+
298+
for line in reversed(lines):
299+
# TRACE messages are logged as JSON-serialised Airbyte messages.
300+
# They may be prefixed by a timestamp; look for the JSON payload.
301+
json_start = line.find('{"type":"TRACE"')
302+
if json_start == -1:
303+
continue
304+
try:
305+
payload = json.loads(line[json_start:])
306+
internal = payload.get("trace", {}).get("error", {}).get("internal_message")
307+
if internal:
308+
return str(internal)
309+
except (json.JSONDecodeError, AttributeError):
310+
continue
311+
312+
return None
313+
314+
252315
def _sanitize_error(ex: Exception) -> str:
253-
"""Extract an error message from an exception without leaking secrets.
316+
"""Extract an actionable error message from an exception without leaking secrets.
317+
318+
Resolution order (first match wins):
319+
320+
1. **Connector log file** - The TRACE ERROR `internal_message` from the
321+
connector's log is typically the most specific error available (e.g.
322+
`password authentication failed for user "bob"`). Many Java-based
323+
connectors emit a generic user-facing `message` while the real
324+
detail lives only in `internal_message`.
325+
2. **Exception chain** - Walks `original_exception` to find the deepest
326+
exception with a message more specific than the top-level wrapper.
327+
3. **Top-level exception** - Falls back to `get_message()` or `str()`.
254328
255329
Uses `get_message()` when available (PyAirbyte exceptions) to avoid
256330
including full config/context in the error string.
257331
"""
332+
# 1. Try the connector log file first — it has the most detail.
333+
trace_msg = _extract_trace_error_from_log(ex)
334+
if trace_msg:
335+
return f"{type(ex).__name__}: {trace_msg}"
336+
337+
# 2. Walk original_exception chain to find the deepest specific message.
338+
deepest = ex
339+
while hasattr(deepest, "original_exception") and deepest.original_exception is not None:
340+
deepest = deepest.original_exception
341+
342+
if deepest is not ex:
343+
deep_msg = (
344+
deepest.get_message()
345+
if hasattr(deepest, "get_message")
346+
else str(deepest) # Standard exceptions (ConnectionError, ValueError, etc.)
347+
)
348+
# Only use the deep message if it's more specific than the wrapper
349+
wrapper_msg = ex.get_message() if hasattr(ex, "get_message") else str(ex)
350+
if deep_msg and deep_msg != wrapper_msg:
351+
return f"{type(ex).__name__}: {deep_msg}"
352+
353+
# 3. Fall back to the top-level exception message.
258354
if hasattr(ex, "get_message"):
259355
return f"{type(ex).__name__}: {ex.get_message()}"
260356
return f"{type(ex).__name__}: {ex}"
261357

262358

263-
def run_destination_smoke_test(
359+
PREFLIGHT_SCENARIO = "basic_types"
360+
"""The predefined scenario whose schema/records are reused for the preflight check."""
361+
362+
PREFLIGHT_STREAM_NAME = "_preflight_basic_types"
363+
"""Stream name used by the preflight write.
364+
365+
This is deliberately different from the predefined `basic_types` stream
366+
so that the preflight data lands in its own table and never collides with
367+
the main smoke-test run."""
368+
369+
370+
def _build_preflight_scenario() -> dict[str, Any]:
371+
"""Build the preflight custom scenario.
372+
373+
Returns a scenario dict that mirrors the predefined `basic_types`
374+
scenario but with the stream name set to `PREFLIGHT_STREAM_NAME`
375+
so the preflight data lands in its own table.
376+
377+
The schema and records are defined inline to avoid a circular import
378+
from `airbyte.cli.smoke_test_source._scenarios`.
379+
"""
380+
return {
381+
"name": PREFLIGHT_STREAM_NAME,
382+
"description": f"Preflight check (based on '{PREFLIGHT_SCENARIO}').",
383+
"json_schema": {
384+
"$schema": "http://json-schema.org/draft-07/schema#",
385+
"type": "object",
386+
"properties": {
387+
"id": {"type": "integer"},
388+
"name": {"type": "string"},
389+
"amount": {"type": "number"},
390+
"is_active": {"type": "boolean"},
391+
},
392+
},
393+
"primary_key": [["id"]],
394+
"records": [
395+
{"id": 1, "name": "Alice", "amount": 100.50, "is_active": True},
396+
{"id": 2, "name": "Bob", "amount": 0.0, "is_active": False},
397+
{"id": 3, "name": "", "amount": -99.99, "is_active": True},
398+
],
399+
}
400+
401+
402+
def _run_preflight(
403+
*,
404+
destination: Destination,
405+
namespace: str,
406+
) -> tuple[bool, str | None]:
407+
"""Run the preflight connectivity check.
408+
409+
Writes a small dataset (based on the `basic_types` scenario) using a
410+
dedicated stream name (`PREFLIGHT_STREAM_NAME`) so the preflight
411+
data never collides with the main smoke-test run.
412+
413+
Returns a tuple of `(passed, error_message)`.
414+
415+
* `(True, None)` when the preflight write succeeded.
416+
* `(False, '<sanitized error>')` when the write failed.
417+
418+
Failures are logged but not raised so the caller can return a
419+
structured result that includes the actionable connector error.
420+
"""
421+
logger.info(
422+
"Running preflight check ('%s') for destination '%s'...",
423+
PREFLIGHT_STREAM_NAME,
424+
destination.name,
425+
)
426+
preflight_scenario = _build_preflight_scenario()
427+
preflight_source = get_smoke_test_source(
428+
scenarios="", # No predefined scenarios
429+
namespace=namespace,
430+
custom_scenarios=[preflight_scenario],
431+
)
432+
try:
433+
destination.write(
434+
source_data=preflight_source,
435+
cache=False,
436+
state_cache=False,
437+
)
438+
except Exception as ex:
439+
sanitized = _sanitize_error(ex)
440+
logger.warning(
441+
"Preflight check failed for destination '%s': %s",
442+
destination.name,
443+
sanitized,
444+
)
445+
return False, sanitized
446+
447+
logger.info(
448+
"Preflight check passed for destination '%s'.",
449+
destination.name,
450+
)
451+
return True, None
452+
453+
454+
def run_destination_smoke_test( # noqa: PLR0914
264455
*,
265456
destination: Destination,
266457
scenarios: str | list[str] = "fast",
267458
namespace_suffix: str | None = None,
268459
reuse_namespace: str | None = None,
269460
custom_scenarios: list[dict[str, Any]] | None = None,
270461
custom_scenarios_file: str | None = None,
462+
skip_preflight: bool = False,
271463
) -> DestinationSmokeTestResult:
272464
"""Run a smoke test against a destination connector.
273465
@@ -300,12 +492,42 @@ def run_destination_smoke_test(
300492
301493
`custom_scenarios_file` is an optional path to a JSON/YAML file with
302494
additional scenario definitions.
495+
496+
`skip_preflight` disables the automatic `basic_types` preflight check.
303497
"""
304498
# Determine namespace
305499
namespace = reuse_namespace or generate_namespace(
306500
namespace_suffix=namespace_suffix,
307501
)
308502

503+
# Prepare the destination config for smoke testing (e.g. ensure
504+
# disable_type_dedupe is off so final tables are created for readback).
505+
_prepare_destination_config(destination)
506+
507+
# --- Preflight check ---------------------------------------------------
508+
preflight_passed: bool | None = None
509+
if not skip_preflight:
510+
preflight_passed, preflight_error = _run_preflight(
511+
destination=destination,
512+
namespace=namespace,
513+
)
514+
if not preflight_passed:
515+
return DestinationSmokeTestResult(
516+
success=False,
517+
destination=destination.name,
518+
namespace=namespace,
519+
records_delivered=0,
520+
scenarios_requested=(
521+
",".join(scenarios) if isinstance(scenarios, list) else scenarios
522+
),
523+
elapsed_seconds=0.0,
524+
error=(
525+
f"Preflight check failed for '{PREFLIGHT_STREAM_NAME}': "
526+
f"{preflight_error or 'unknown error'}"
527+
),
528+
preflight_passed=False,
529+
)
530+
309531
source_obj = get_smoke_test_source(
310532
scenarios=scenarios,
311533
namespace=namespace,
@@ -322,12 +544,6 @@ def run_destination_smoke_test(
322544
else:
323545
scenarios_str = scenarios
324546

325-
# Prepare the destination config for smoke testing (e.g. ensure
326-
# disable_type_dedupe is off so final tables are created for readback).
327-
# The catalog namespace on each stream is the primary mechanism that
328-
# directs the destination to write into the test schema.
329-
_prepare_destination_config(destination)
330-
331547
start_time = time.monotonic()
332548
success = False
333549
error_message: str | None = None
@@ -348,6 +564,7 @@ def run_destination_smoke_test(
348564
# Perform readback introspection (runs even on write failure for partial-success support)
349565
table_statistics: dict[str, TableStatistics] | None = None
350566
tables_not_found: dict[str, str] | None = None
567+
readback_warnings: list[str] = []
351568
if destination.is_cache_supported:
352569
try:
353570
cache = destination.get_sql_cache(schema_name=namespace)
@@ -357,10 +574,16 @@ def run_destination_smoke_test(
357574
for name in stream_names
358575
if name not in table_statistics
359576
}
360-
except Exception:
577+
except Exception as readback_ex:
578+
sanitized_readback = _sanitize_error(readback_ex)
579+
readback_msg = (
580+
f"Readback failed for destination '{destination.name}': {sanitized_readback}"
581+
)
582+
readback_warnings.append(readback_msg)
361583
logger.warning(
362-
"Readback failed for destination '%s'.",
584+
"Readback failed for destination '%s': %s",
363585
destination.name,
586+
sanitized_readback,
364587
exc_info=True,
365588
)
366589
else:
@@ -379,6 +602,8 @@ def run_destination_smoke_test(
379602
scenarios_requested=scenarios_str,
380603
elapsed_seconds=round(elapsed, 2),
381604
error=error_message,
605+
preflight_passed=preflight_passed,
382606
table_statistics=table_statistics,
383607
tables_not_found=tables_not_found,
608+
warnings=readback_warnings or None,
384609
)

airbyte/cli/pyab.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -710,7 +710,17 @@ def sync(
710710
"Useful for running a second test against an already-populated namespace."
711711
),
712712
)
713-
def destination_smoke_test(
713+
@click.option(
714+
"--skip-preflight",
715+
is_flag=True,
716+
default=False,
717+
help=(
718+
"Skip the automatic preflight check that runs basic_types before "
719+
"the requested scenarios. Use when you expect basic_types itself to fail "
720+
"or want to save time on repeated runs."
721+
),
722+
)
723+
def destination_smoke_test( # noqa: PLR0913
714724
*,
715725
destination: str,
716726
config: str | None = None,
@@ -720,6 +730,7 @@ def destination_smoke_test(
720730
custom_scenarios: str | None = None,
721731
namespace_suffix: str | None = None,
722732
reuse_namespace: str | None = None,
733+
skip_preflight: bool = False,
723734
) -> None:
724735
"""Run smoke tests against a destination connector.
725736
@@ -770,6 +781,7 @@ def destination_smoke_test(
770781
namespace_suffix=namespace_suffix,
771782
reuse_namespace=reuse_namespace,
772783
custom_scenarios_file=custom_scenarios,
784+
skip_preflight=skip_preflight,
773785
)
774786

775787
click.echo(json.dumps(result.model_dump(), indent=2))

airbyte/mcp/local.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,17 @@ def destination_smoke_test( # noqa: PLR0913, PLR0917
901901
default=None,
902902
),
903903
],
904+
skip_preflight: Annotated[
905+
bool,
906+
Field(
907+
description=(
908+
"Skip the automatic preflight check that runs basic_types before "
909+
"the requested scenarios. Set to true when you expect basic_types "
910+
"itself to fail or want to save time on repeated runs."
911+
),
912+
default=False,
913+
),
914+
],
904915
) -> DestinationSmokeTestResult:
905916
"""Run smoke tests against a destination connector.
906917
@@ -949,6 +960,7 @@ def destination_smoke_test( # noqa: PLR0913, PLR0917
949960
namespace_suffix=namespace_suffix,
950961
reuse_namespace=reuse_namespace,
951962
custom_scenarios=custom_scenarios,
963+
skip_preflight=skip_preflight,
952964
)
953965

954966

0 commit comments

Comments
 (0)