diff --git a/airbyte/_executors/base.py b/airbyte/_executors/base.py index cf11c925d..c3ad4e0c8 100644 --- a/airbyte/_executors/base.py +++ b/airbyte/_executors/base.py @@ -69,7 +69,13 @@ def _stream_from_subprocess( log_file: IO[str] | None = None, suppress_stderr: bool = False, ) -> Generator[Iterable[str], None, None]: - """Stream lines from a subprocess.""" + """Stream lines from a subprocess. + + When stdin is an AirbyteMessageIterator, input is pumped to the subprocess + in a separate thread while output is read concurrently. This avoids a + potential deadlock where the subprocess blocks on stdout (buffer full) + while we're waiting for input to finish before reading stdout. + """ input_thread: Thread | None = None exception_holder = ExceptionHolder() if isinstance(stdin, AirbyteMessageIterator): @@ -89,16 +95,11 @@ def _stream_from_subprocess( stdin, exception_holder, ), + daemon=True, # Prevent blocking interpreter shutdown if thread gets stuck ) input_thread.start() - input_thread.join() # Ensure the input thread has finished - - # Don't bother raising broken pipe errors, as they only - # indicate that a subprocess has terminated early. - if exception_holder.exception and not isinstance( - exception_holder.exception, BrokenPipeError - ): - raise exception_holder.exception + # Don't join here - let input and output happen concurrently to avoid deadlock. + # The input thread will be joined in the finally block after reading is done. else: # stdin is None or a file-like object @@ -136,20 +137,32 @@ def _stream_from_subprocess( # If the process does not terminate within the timeout, force kill it process.kill() + # Join the input thread if it exists (after process termination so stdin closes) + if input_thread is not None: + input_thread.join(timeout=10) + # Now, the process is either terminated or killed. Check the exit code. exit_code = process.wait() - # If the exit code is not 0 or -15 (SIGTERM), raise an exception + # If the exit code is not 0 or -15 (SIGTERM), raise AirbyteSubprocessFailedError. + # Include input thread exception as original_exception if present. if exit_code not in {0, -15}: raise exc.AirbyteSubprocessFailedError( run_args=args, exit_code=exit_code, original_exception=( exception_holder.exception - if not isinstance(exception_holder.exception, BrokenPipeError) + if exception_holder.exception + and not isinstance(exception_holder.exception, BrokenPipeError) else None ), ) + + # Only raise input thread exception if exit code was OK (ignore BrokenPipeError) + if exception_holder.exception and not isinstance( + exception_holder.exception, BrokenPipeError + ): + raise exception_holder.exception finally: # Close the stdout stream if process.stdout: