Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions airbyte/_executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,13 @@ def _stream_from_subprocess(
log_file: IO[str] | None = None,
suppress_stderr: bool = False,
) -> Generator[Iterable[str], None, None]:
"""Stream lines from a subprocess."""
"""Stream lines from a subprocess.

When stdin is an AirbyteMessageIterator, input is pumped to the subprocess
in a separate thread while output is read concurrently. This avoids a
potential deadlock where the subprocess blocks on stdout (buffer full)
while we're waiting for input to finish before reading stdout.
"""
input_thread: Thread | None = None
exception_holder = ExceptionHolder()
if isinstance(stdin, AirbyteMessageIterator):
Expand All @@ -89,16 +95,11 @@ def _stream_from_subprocess(
stdin,
exception_holder,
),
daemon=True, # Prevent blocking interpreter shutdown if thread gets stuck
)
input_thread.start()
input_thread.join() # Ensure the input thread has finished

# Don't bother raising broken pipe errors, as they only
# indicate that a subprocess has terminated early.
if exception_holder.exception and not isinstance(
exception_holder.exception, BrokenPipeError
):
raise exception_holder.exception
# Don't join here - let input and output happen concurrently to avoid deadlock.
# The input thread will be joined in the finally block after reading is done.

else:
# stdin is None or a file-like object
Expand Down Expand Up @@ -136,20 +137,32 @@ def _stream_from_subprocess(
# If the process does not terminate within the timeout, force kill it
process.kill()

# Join the input thread if it exists (after process termination so stdin closes)
if input_thread is not None:
input_thread.join(timeout=10)

# Now, the process is either terminated or killed. Check the exit code.
exit_code = process.wait()

# If the exit code is not 0 or -15 (SIGTERM), raise an exception
# If the exit code is not 0 or -15 (SIGTERM), raise AirbyteSubprocessFailedError.
# Include input thread exception as original_exception if present.
if exit_code not in {0, -15}:
raise exc.AirbyteSubprocessFailedError(
run_args=args,
exit_code=exit_code,
original_exception=(
exception_holder.exception
if not isinstance(exception_holder.exception, BrokenPipeError)
if exception_holder.exception
and not isinstance(exception_holder.exception, BrokenPipeError)
else None
),
)

# Only raise input thread exception if exit code was OK (ignore BrokenPipeError)
if exception_holder.exception and not isinstance(
exception_holder.exception, BrokenPipeError
):
raise exception_holder.exception
finally:
# Close the stdout stream
if process.stdout:
Expand Down