feat(cli,mcp): add destination-smoke-test command and MCP tool#998
Conversation
Adds a new `pyab destination-smoke-test` CLI command and a corresponding `destination_smoke_test` MCP tool that send synthetic smoke test data to a destination connector and report success/failure. Both use the existing SourceSmokeTest (15 predefined scenarios) via PyAirbyte's Destination.write() orchestration. Supports: - Destination resolution (name, Docker image, local executable) - Destination config via file, inline YAML, or secret reference - Scenario filtering via --scenario-filter - Large batch inclusion via --include-large-batch - Custom scenario injection via --custom-scenarios (CLI) or custom_scenarios param (MCP) - Structured JSON output with success/failure, record counts, timing, errors Co-Authored-By: AJ Steers <aj@airbyte.io>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
|
Note 📝 PR Converted to Draft More info...Thank you for creating this PR. As a policy to protect our engineers' time, Airbyte requires all PRs to be created first in draft status. Your PR has been automatically converted to draft status in respect for this policy. As soon as your PR is ready for formal review, you can proceed to convert the PR to "ready for review" status by clicking the "Ready for review" button at the bottom of the PR page. To skip draft status in future PRs, please include |
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1773861379-destination-smoke-test' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1773861379-destination-smoke-test'PR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
📚 Show Repo GuidanceHelpful ResourcesCommunity SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
…ings, sanitize errors, destructive=True Co-Authored-By: AJ Steers <aj@airbyte.io>
📝 WalkthroughWalkthroughAdds a destination smoke-test feature: new utilities to build a smoke-test Source and run writes to a resolved Destination, plus CLI and MCP entrypoints that execute the test, return a structured JSON result (success, records_delivered, elapsed_seconds, error), and exit non‑zero on failure. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant CLI as CLI / MCP
participant Source as SmokeTest Source
participant Destination as Destination Connector
participant Result as Result Builder
User->>CLI: invoke destination-smoke-test(dest, config, scenarios)
CLI->>Destination: resolve & initialize destination (config / docker image)
CLI->>Source: construct smoke source (scenarios / custom)
CLI->>CLI: start timer
Source->>Destination: stream test records (write)
Destination-->>Source: acknowledge / record counts
Source-->>CLI: finish (records_delivered / error)
CLI->>CLI: stop timer
CLI->>Result: assemble DestinationSmokeTestResult
Result-->>User: print JSON and exit (non‑zero if failure)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Would you like me to flag and remove the duplicate 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (4)
airbyte/mcp/local.py (3)
971-976: Duplicate call toresolve_list_of_strings
resolve_list_of_strings(scenario_filter)is already called at line 940 and stored inresolved_filter. Could we reuse that result here instead of calling the function again, wdyt?♻️ Proposed fix to reuse the resolved filter
scenarios_str: str - if scenario_filter: - resolved = resolve_list_of_strings(scenario_filter) - scenarios_str = ",".join(resolved) if resolved else "all_fast" + if resolved_filter: + scenarios_str = ",".join(resolved_filter) else: scenarios_str = "all_fast"Note: You'll also need to move
resolved_filterdeclaration to before the try block so it's accessible here, or refactor to compute it once at the start.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/mcp/local.py` around lines 971 - 976, The code is redundantly calling resolve_list_of_strings(scenario_filter) twice; compute resolved_filter once (using resolve_list_of_strings(scenario_filter)) before the try block so it’s in scope, then replace the second call with that variable when building scenarios_str (set scenarios_str = ",".join(resolved_filter) if resolved_filter else "all_fast"); ensure any moved declaration still has correct error handling or fallbacks and that references to scenario_filter, resolved_filter, resolve_list_of_strings, and scenarios_str are updated accordingly.
915-920: Missingconfig_spec_jsonschemain config resolution - is this intentional?Other MCP tools in this file (e.g.,
validate_connector_config,list_source_streams) passconfig_spec_jsonschematoresolve_connector_configfor schema-based validation. This function omits it. Is this intentional because the destination connector validates the config downstream anyway, or would it be worth fetching the spec first for early validation, wdyt?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/mcp/local.py` around lines 915 - 920, The call to resolve_connector_config is missing the config_spec_jsonschema argument used elsewhere for early schema validation; update the call in this block to pass the connector spec JSON schema (as other functions like validate_connector_config and list_source_streams do) by fetching or reusing the destination spec (e.g., via the same get_*_spec helper used elsewhere) and supply it as config_spec_jsonschema to resolve_connector_config so the destination config is validated against its JSON schema before continuing.
966-968: Consider including traceback in error for debugging?Other MCP tools in this file capture
traceback.format_exc()for debugging purposes. The current approach only capturesstr(ex). For debugging connector issues, a full traceback might be helpful. Would it be worth adding it to the error field or logging it separately, wdyt?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/mcp/local.py` around lines 966 - 968, In the except Exception as ex block where error_message = str(ex), change handling to include the full traceback for debugging: import traceback at top (if not present), call traceback.format_exc() inside the except and either append or replace error_message with that value (e.g., error_message = f"{str(ex)}\n{traceback.format_exc()}") or log traceback separately via the same logger used elsewhere; update references to error_message so the stored/returned error includes the traceback for connector debugging.airbyte/cli/pyab.py (1)
825-836: Minor inconsistency with MCP result structureThe CLI result dict conditionally includes the
errorkey only on failure (lines 833-834), while the MCPDestinationSmokeTestResultmodel always includeserror(defaulting toNone). This slight schema difference could affect consumers expecting a consistent structure. Would it make sense to always include"error": Nonefor consistency with the MCP tool, or is the current behavior preferred for CLI output, wdyt?♻️ Optional: Always include error key for consistency
result = { "success": success, "destination": _get_connector_name(destination), "records_delivered": records_delivered, "scenarios_requested": scenario_filter or "all_fast", "include_large_batch": include_large_batch, "elapsed_seconds": round(elapsed, 2), + "error": error_message, } - if error_message: - result["error"] = error_message🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/cli/pyab.py` around lines 825 - 836, The CLI builds a result dict in the smoke test flow that only adds the "error" key when error_message is truthy, which differs from the MCP DestinationSmokeTestResult that always includes error (possibly None); update the result construction around the result = {...} block so it always sets result["error"] = error_message (which may be None) before echoing via click.echo(json.dumps(result, indent=2)), ensuring the CLI output structure matches DestinationSmokeTestResult and keeping symbols like result, error_message, DestinationSmokeTestResult, and click.echo in mind when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@airbyte/cli/pyab.py`:
- Around line 825-836: The CLI builds a result dict in the smoke test flow that
only adds the "error" key when error_message is truthy, which differs from the
MCP DestinationSmokeTestResult that always includes error (possibly None);
update the result construction around the result = {...} block so it always sets
result["error"] = error_message (which may be None) before echoing via
click.echo(json.dumps(result, indent=2)), ensuring the CLI output structure
matches DestinationSmokeTestResult and keeping symbols like result,
error_message, DestinationSmokeTestResult, and click.echo in mind when making
the change.
In `@airbyte/mcp/local.py`:
- Around line 971-976: The code is redundantly calling
resolve_list_of_strings(scenario_filter) twice; compute resolved_filter once
(using resolve_list_of_strings(scenario_filter)) before the try block so it’s in
scope, then replace the second call with that variable when building
scenarios_str (set scenarios_str = ",".join(resolved_filter) if resolved_filter
else "all_fast"); ensure any moved declaration still has correct error handling
or fallbacks and that references to scenario_filter, resolved_filter,
resolve_list_of_strings, and scenarios_str are updated accordingly.
- Around line 915-920: The call to resolve_connector_config is missing the
config_spec_jsonschema argument used elsewhere for early schema validation;
update the call in this block to pass the connector spec JSON schema (as other
functions like validate_connector_config and list_source_streams do) by fetching
or reusing the destination spec (e.g., via the same get_*_spec helper used
elsewhere) and supply it as config_spec_jsonschema to resolve_connector_config
so the destination config is validated against its JSON schema before
continuing.
- Around line 966-968: In the except Exception as ex block where error_message =
str(ex), change handling to include the full traceback for debugging: import
traceback at top (if not present), call traceback.format_exc() inside the except
and either append or replace error_message with that value (e.g., error_message
= f"{str(ex)}\n{traceback.format_exc()}") or log traceback separately via the
same logger used elsewhere; update references to error_message so the
stored/returned error includes the traceback for connector debugging.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 098234f2-e81f-4e12-bca7-0b86fb416889
📒 Files selected for processing (2)
airbyte/cli/pyab.pyairbyte/mcp/local.py
- Move DestinationSmokeTestResult, get_smoke_test_source, run_destination_smoke_test into shared module used by both CLI and MCP - Fix incorrect scenario names in CLI help text (bug: names didn't match actual scenarios) - Remove duplicated orchestration logic from CLI and MCP - CLI and MCP are now thin presentation layers over the shared implementation Co-Authored-By: AJ Steers <aj@airbyte.io>
Co-Authored-By: AJ Steers <aj@airbyte.io>
|
Addressed the actionable bot findings in commits Fixed (real bugs/edge cases):
Acknowledged but not acting on (bot suggestions, not bugs):
|
…tinations Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
🧹 Nitpick comments (3)
airbyte/cli/smoke_test_source/source.py (1)
327-343: Nice helper for stream status messages!The implementation looks good. One small observation: this method doesn't use
self, so it could be a@staticmethodif you wanted to be more explicit about that—but totally fine as-is too. What do you think, wdyt?Also, I noticed there's a similar
get_stream_status_messagefunction inairbyte/_message_iterators.py(context snippet 1), though that one appears to be specifically for success/complete status only. Your method is more flexible with thestatusparameter, so the duplication is justified here.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/cli/smoke_test_source/source.py` around lines 327 - 343, The _stream_status_message method doesn't use self, so mark it as a `@staticmethod` and remove the unused self parameter: add the `@staticmethod` decorator above _stream_status_message and change its signature from (self, stream_name: str, status: AirbyteStreamStatus) to (stream_name: str, status: AirbyteStreamStatus); keep the function body and return value exactly the same so callers can continue invoking it as a bound method or via the class.airbyte/_util/destination_smoke_tests.py (2)
145-153: Reasonable error sanitization approach!Using
get_message()for PyAirbyte exceptions avoids leaking config/context data (as confirmed in thePyAirbyteErrorcontext snippet showing it only returns the message attribute).For the fallback at line 153,
str(ex)is used for non-PyAirbyte exceptions. Standard library exceptions typically don't include sensitive config data, but if a third-party library exception happens to include such data in its__str__, it could potentially be exposed. This is probably an acceptable tradeoff for debuggability—just wanted to flag it. Do you think it's worth adding a note in the docstring about this edge case, wdyt?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/_util/destination_smoke_tests.py` around lines 145 - 153, Update the _sanitize_error function docstring to note the tradeoff: it uses ex.get_message() for PyAirbyte exceptions (e.g. PyAirbyteError) to avoid leaking config/context, and falls back to str(ex) for other exception types which may, in rare third‑party cases, include sensitive data; explicitly document this edge case and the intentional choice for better debuggability so reviewers know this is an accepted risk.
137-142: Consider documenting thesource-smoke-testprerequisite.The smoke test already raises a descriptive
AirbyteConnectorExecutableNotFoundErrorif the executable isn't on PATH (via the executor utility), so the error handling is in good shape. However, it might be worth adding a note in the docstring or README that users needsource-smoke-testinstalled to userun_destination_smoke_test(). This would prevent users from wondering what's missing when they hit that error, wdyt?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte/_util/destination_smoke_tests.py` around lines 137 - 142, Add documentation that the helper expects the external executable "source-smoke-test" on PATH: update the docstring for run_destination_smoke_test (or the module-level docstring) to state that get_source is invoked with local_executable="source-smoke-test" and users must have that executable installed/available on PATH before running run_destination_smoke_test(); optionally add a short note to the README pointing to installation instructions for the source-smoke-test binary.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@airbyte/_util/destination_smoke_tests.py`:
- Around line 145-153: Update the _sanitize_error function docstring to note the
tradeoff: it uses ex.get_message() for PyAirbyte exceptions (e.g.
PyAirbyteError) to avoid leaking config/context, and falls back to str(ex) for
other exception types which may, in rare third‑party cases, include sensitive
data; explicitly document this edge case and the intentional choice for better
debuggability so reviewers know this is an accepted risk.
- Around line 137-142: Add documentation that the helper expects the external
executable "source-smoke-test" on PATH: update the docstring for
run_destination_smoke_test (or the module-level docstring) to state that
get_source is invoked with local_executable="source-smoke-test" and users must
have that executable installed/available on PATH before running
run_destination_smoke_test(); optionally add a short note to the README pointing
to installation instructions for the source-smoke-test binary.
In `@airbyte/cli/smoke_test_source/source.py`:
- Around line 327-343: The _stream_status_message method doesn't use self, so
mark it as a `@staticmethod` and remove the unused self parameter: add the
`@staticmethod` decorator above _stream_status_message and change its signature
from (self, stream_name: str, status: AirbyteStreamStatus) to (stream_name: str,
status: AirbyteStreamStatus); keep the function body and return value exactly
the same so callers can continue invoking it as a bound method or via the class.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 7948578b-2991-44b0-b35b-7d5efa40bd1b
📒 Files selected for processing (4)
airbyte/_util/destination_smoke_tests.pyairbyte/cli/pyab.pyairbyte/cli/smoke_test_source/source.pyairbyte/mcp/local.py
🚧 Files skipped from review as they are similar to previous changes (2)
- airbyte/mcp/local.py
- airbyte/cli/pyab.py
Summary
Adds a new
pyab destination-smoke-testCLI command and a correspondingdestination_smoke_testMCP tool. Both send synthetic data from the existingSourceSmokeTest(15 predefined scenarios covering type variations, null handling, naming edge cases, etc.) to a destination connector viaDestination.write()and report pass/fail as structured JSON.This is the execution layer for destination regression testing — it answers "does this destination accept the smoke test data without errors?" No readback or comparison is performed (that's a future increment).
Architecture: Shared logic lives in
airbyte/_util/destination_smoke_tests.py— both CLI and MCP are thin presentation layers that delegate torun_destination_smoke_test(). Neither imports from the other.Scenario keywords:
fast(default) — all fast predefined scenarios, excludinglarge_batch_streamall— every predefined scenario includinglarge_batch_streamCLI usage:
pyab destination-smoke-test \ --destination=destination-snowflake \ --config=./secrets/snowflake.json pyab destination-smoke-test \ --destination=destination-dev-null \ --scenarios=basic_types,null_handling pyab destination-smoke-test \ --destination=destination-snowflake \ --config=./secrets/snowflake.json --scenarios=allMCP tool: Same functionality exposed as
destination_smoke_testin thelocaltools module, returning a structuredDestinationSmokeTestResultPydantic model. Also supportscustom_scenariosas inline dicts anddocker_imageoverride.Updates since last revision
destination-dev-null) requireSTARTED/RUNNING/COMPLETEstream status trace messages per the Airbyte protocol. The smoke test source was only emittingRECORDmessages, causing all JDK destinations to fail withTransientErrorException: streams did not receive a terminal stream status message. Fixed insource.pyby emitting the requiredTRACEmessages around each stream's records.fast/allscenario keywords implemented withfastas default. Empty scenario lists are normalized tofast.destination-dev-null— 28 records across 13 fast streams delivered successfully, exit code 0.Review & Testing Checklist for Human
source.py(shipped in PR feat(cli): addsource-smoke-testCLI #996). The sequence is STARTED → RUNNING → records → COMPLETE per stream. Verify this matches the protocol expectations for both JDK and Python CDK destinations — the order matters and incorrect ordering could cause different failures on other destinations.destination-dev-nullwhich is a no-op sink — it validates protocol compliance but not actual data delivery.docker_imageis specified and Docker is installed, the MCP tool defaults todocker_image=True. The CLI instead delegates to_resolve_destination_jobwhich has slightly different resolution logic. Confirm this divergence is acceptable._sanitize_error()usesget_message()for PyAirbyte exceptions but falls back tostr(ex)for others. Third-party exceptions could theoretically include config details.Recommended test plan
Notes
DestinationSmokeTestResultmodel and_sanitize_errorhelper are exported from the shared module for potential reuse by the ops repo test harness.Link to Devin session: https://app.devin.ai/sessions/9c72389579884c06bf18cef11c4550e8
Requested by: Aaron ("AJ") Steers (@aaronsteers)
Summary by CodeRabbit
New Features