Skip to content

Commit 1c1cd2a

Browse files
committed
fix: "write /dev/stdout: broken pipe" error during execution
1 parent d064d42 commit 1c1cd2a

File tree

1 file changed

+29
-28
lines changed

1 file changed

+29
-28
lines changed

airbyte/_executors/base.py

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -117,34 +117,35 @@ def _stream_from_subprocess(
117117
yield _stream_from_file(process.stdout)
118118
process.wait()
119119
finally:
120-
# Close the stdout stream
121-
if process.stdout:
122-
process.stdout.close()
123-
124-
# Terminate the process if it is still running
125-
if process.poll() is None: # Check if the process is still running
126-
process.terminate()
127-
try:
128-
# Wait for a short period to allow process to terminate gracefully
129-
process.wait(timeout=10)
130-
except subprocess.TimeoutExpired:
131-
# If the process does not terminate within the timeout, force kill it
132-
process.kill()
133-
134-
# Now, the process is either terminated or killed. Check the exit code.
135-
exit_code = process.wait()
136-
137-
# If the exit code is not 0 or -15 (SIGTERM), raise an exception
138-
if exit_code not in {0, -15}:
139-
raise exc.AirbyteSubprocessFailedError(
140-
run_args=args,
141-
exit_code=exit_code,
142-
original_exception=(
143-
exception_holder.exception
144-
if not isinstance(exception_holder.exception, BrokenPipeError)
145-
else None
146-
),
147-
)
120+
try:
121+
# Terminate the process if it is still running
122+
if process.poll() is None: # Check if the process is still running
123+
process.terminate()
124+
try:
125+
# Wait for a short period to allow process to terminate gracefully
126+
process.wait(timeout=10)
127+
except subprocess.TimeoutExpired:
128+
# If the process does not terminate within the timeout, force kill it
129+
process.kill()
130+
131+
# Now, the process is either terminated or killed. Check the exit code.
132+
exit_code = process.wait()
133+
134+
# If the exit code is not 0 or -15 (SIGTERM), raise an exception
135+
if exit_code not in {0, -15}:
136+
raise exc.AirbyteSubprocessFailedError(
137+
run_args=args,
138+
exit_code=exit_code,
139+
original_exception=(
140+
exception_holder.exception
141+
if not isinstance(exception_holder.exception, BrokenPipeError)
142+
else None
143+
),
144+
)
145+
finally:
146+
# Close the stdout stream
147+
if process.stdout:
148+
process.stdout.close()
148149

149150

150151
class Executor(ABC):

0 commit comments

Comments
 (0)