Skip to content

Commit 9f5cc54

Browse files
feat: modify run_docker_command to always return EntrypointOutput
- Always store STDOUT to disk for memory efficiency - Add returncode and stderr properties to EntrypointOutput for backward compatibility - Update all callers in standard tests to use EntrypointOutput return type - Improve error analysis and readability in test failure outputs - Maintain strict backward compatibility through enhanced EntrypointOutput properties Co-Authored-By: AJ Steers <[email protected]>
1 parent 0afea4a commit 9f5cc54

File tree

3 files changed

+58
-44
lines changed

3 files changed

+58
-44
lines changed

airbyte_cdk/test/entrypoint_wrapper.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import json
1818
import logging
1919
import re
20+
import subprocess
2021
import tempfile
2122
import traceback
2223
from collections import deque
@@ -68,6 +69,7 @@ def __init__(
6869
uncaught_exception: Optional[BaseException] = None,
6970
*,
7071
message_file: Path | None = None,
72+
completed_process: Optional[subprocess.CompletedProcess[str]] = None,
7173
) -> None:
7274
if messages is None and message_file is None:
7375
raise ValueError("Either messages or message_file must be provided")
@@ -76,6 +78,7 @@ def __init__(
7678

7779
self._messages: list[AirbyteMessage] | None = None
7880
self._message_file: Path | None = message_file
81+
self._completed_process: Optional[subprocess.CompletedProcess[str]] = completed_process
7982
if messages:
8083
try:
8184
self._messages = [self._parse_message(message) for message in messages]
@@ -291,6 +294,16 @@ def is_not_in_logs(self, pattern: str) -> bool:
291294
"""Check if no log message matches the case-insensitive pattern."""
292295
return not self.is_in_logs(pattern)
293296

297+
@property
298+
def returncode(self) -> int | None:
299+
"""Return the exit code of the process, if available."""
300+
return self._completed_process.returncode if self._completed_process else None
301+
302+
@property
303+
def stderr(self) -> str | None:
304+
"""Return the stderr output of the process, if available."""
305+
return self._completed_process.stderr if self._completed_process else None
306+
294307

295308
def _run_command(
296309
source: Source,

airbyte_cdk/test/standard_tests/docker_base.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import warnings
1111
from dataclasses import asdict
1212
from pathlib import Path
13-
from subprocess import CompletedProcess, SubprocessError
13+
from subprocess import SubprocessError
1414
from typing import Literal
1515

1616
import orjson
@@ -146,7 +146,7 @@ def test_docker_image_build_and_spec(
146146
)
147147

148148
try:
149-
result: CompletedProcess[str] = run_docker_command(
149+
result: EntrypointOutput = run_docker_command(
150150
[
151151
"docker",
152152
"run",
@@ -203,7 +203,7 @@ def test_docker_image_build_and_check(
203203
with scenario.with_temp_config_file(
204204
connector_root=connector_root,
205205
) as temp_config_file:
206-
_ = run_docker_command(
206+
result: EntrypointOutput = run_docker_command(
207207
[
208208
"docker",
209209
"run",
@@ -295,7 +295,7 @@ def test_docker_image_build_and_read(
295295
) as temp_dir_str,
296296
):
297297
temp_dir = Path(temp_dir_str)
298-
discover_result = run_docker_command(
298+
discover_result: EntrypointOutput = run_docker_command(
299299
[
300300
"docker",
301301
"run",
@@ -311,14 +311,14 @@ def test_docker_image_build_and_read(
311311
capture_stderr=True,
312312
capture_stdout=True,
313313
)
314-
parsed_output = EntrypointOutput(messages=discover_result.stdout.splitlines())
314+
parsed_output = discover_result
315315
try:
316316
catalog_message = parsed_output.catalog # Get catalog message
317317
assert catalog_message.catalog is not None, "Catalog message missing catalog."
318318
discovered_catalog: AirbyteCatalog = parsed_output.catalog.catalog
319319
except Exception as ex:
320320
raise AssertionError(
321-
f"Failed to load discovered catalog from {discover_result.stdout}. "
321+
f"Failed to load discovered catalog from discover command output. "
322322
f"Error: {ex!s}"
323323
) from None
324324
if not discovered_catalog.streams:
@@ -355,7 +355,7 @@ def test_docker_image_build_and_read(
355355
configured_catalog_path.write_text(
356356
orjson.dumps(asdict(configured_catalog)).decode("utf-8")
357357
)
358-
read_result: CompletedProcess[str] = run_docker_command(
358+
read_result: EntrypointOutput = run_docker_command(
359359
[
360360
"docker",
361361
"run",
@@ -376,13 +376,18 @@ def test_docker_image_build_and_read(
376376
capture_stdout=True,
377377
)
378378
if read_result.returncode != 0:
379+
error_messages = (
380+
[f"Error message: {error.trace.error.message}" for error in read_result.errors if error.trace and error.trace.error]
381+
if read_result.errors
382+
else ["No error messages found"]
383+
)
379384
raise AssertionError(
380385
f"Failed to run `read` command in docker image {connector_image!r}. "
381386
"\n-----------------"
382387
f"EXIT CODE: {read_result.returncode}\n"
383388
"STDERR:\n"
384389
f"{read_result.stderr}\n"
385-
f"STDOUT:\n"
386-
f"{read_result.stdout}\n"
390+
"ERROR MESSAGES:\n"
391+
f"{chr(10).join(error_messages)}\n"
387392
"\n-----------------"
388393
) from None

airbyte_cdk/utils/docker.py

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import platform
88
import subprocess
99
import sys
10+
import tempfile
1011
from contextlib import ExitStack
1112
from dataclasses import dataclass
1213
from enum import Enum
@@ -17,6 +18,7 @@
1718
import requests
1819

1920
from airbyte_cdk.models.connector_metadata import ConnectorLanguage, MetadataFile
21+
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
2022
from airbyte_cdk.utils.connector_paths import resolve_airbyte_repo_root
2123

2224

@@ -391,52 +393,53 @@ def run_docker_command(
391393
check: bool = True,
392394
capture_stdout: bool | Path = False,
393395
capture_stderr: bool | Path = False,
394-
) -> subprocess.CompletedProcess[str]:
396+
) -> EntrypointOutput:
395397
"""Run a Docker command as a subprocess.
396398
397399
Args:
398400
cmd: The command to run as a list of strings.
399401
check: If True, raises an exception if the command fails. If False, the caller is
400402
responsible for checking the return code.
401-
capture_stdout: How to process stdout.
403+
capture_stdout: How to process stdout. Always stored to disk for memory efficiency.
402404
capture_stderr: If True, captures stderr in memory and returns to the caller.
403405
If a Path is provided, the output is written to the specified file.
404406
405-
For stdout and stderr process:
406-
- If False (the default), stdout is not captured.
407-
- If True, output is captured in memory and returned within the `CompletedProcess` object.
408-
- If a Path is provided, the output is written to the specified file. (Recommended for large syncs.)
407+
Returns:
408+
EntrypointOutput: An object containing parsed Airbyte protocol messages and process info.
409409
410410
Raises:
411411
subprocess.CalledProcessError: If the command fails and check is True.
412412
"""
413413
print(f"Running command: {' '.join(cmd)}")
414414

415415
with ExitStack() as stack:
416-
# Shared context manager to handle file closing, if needed.
417-
stderr: TextIOWrapper | int | None
418-
stdout: TextIOWrapper | int | None
416+
stdout_temp_file = stack.enter_context(
417+
tempfile.NamedTemporaryFile(mode="w", delete=False, encoding="utf-8")
418+
)
419+
stdout_path = Path(stdout_temp_file.name)
419420

420-
# If capture_stderr or capture_stdout is a Path, we open the file in write mode.
421-
# If it's a boolean, we set it to either subprocess.PIPE or None.
421+
stderr: TextIOWrapper | int | None
422422
if isinstance(capture_stderr, Path):
423423
stderr = stack.enter_context(capture_stderr.open("w", encoding="utf-8"))
424424
elif isinstance(capture_stderr, bool):
425425
stderr = subprocess.PIPE if capture_stderr is True else None
426-
427-
if isinstance(capture_stdout, Path):
428-
stdout = stack.enter_context(capture_stdout.open("w", encoding="utf-8"))
429-
elif isinstance(capture_stdout, bool):
430-
stdout = subprocess.PIPE if capture_stdout is True else None
426+
else:
427+
stderr = None
431428

432429
completed_process: subprocess.CompletedProcess[str] = subprocess.run(
433430
cmd,
434431
text=True,
435432
check=check,
436433
stderr=stderr,
437-
stdout=stdout,
434+
stdout=stdout_temp_file,
435+
)
436+
437+
stdout_temp_file.close()
438+
439+
return EntrypointOutput(
440+
message_file=stdout_path,
441+
completed_process=completed_process,
438442
)
439-
return completed_process
440443

441444

442445
def verify_docker_installation() -> bool:
@@ -471,27 +474,20 @@ def verify_connector_image(
471474
capture_stdout=True,
472475
)
473476
# check that the output is valid JSON
474-
if result.stdout:
475-
found_spec_output = False
476-
for line in result.stdout.split("\n"):
477-
if line.strip():
478-
try:
479-
# Check if the line is a valid JSON object
480-
msg = json.loads(line)
481-
if isinstance(msg, dict) and "type" in msg and msg["type"] == "SPEC":
482-
found_spec_output = True
483-
484-
except json.JSONDecodeError as e:
485-
logger.warning(f"Invalid JSON output from spec command: {e}: {line}")
486-
487-
if not found_spec_output:
488-
logger.error("No valid JSON output found for spec command.")
477+
spec_messages = result.spec_messages
478+
if spec_messages:
479+
if len(spec_messages) > 0:
480+
logger.info("Found valid SPEC message in output.")
481+
else:
482+
logger.error("No valid SPEC message found in output.")
489483
return False
490484
else:
491-
logger.error("No output from spec command.")
485+
logger.error("No spec messages found in output.")
492486
return False
493487
except subprocess.CalledProcessError as e:
494-
logger.error(f"Image verification failed: {e.stderr}")
488+
logger.error(
489+
f"Image verification failed: {result.stderr if 'result' in locals() else e.stderr}"
490+
)
495491
return False
496492

497493
return True

0 commit comments

Comments
 (0)