Skip to content

fix: Prevent deadlock in subprocess executor by allowing concurrent I/O#901

Merged
Aaron ("AJ") Steers (aaronsteers) merged 4 commits intomainfrom
devin/1765249790-fix-executor-deadlock
Dec 17, 2025
Merged

fix: Prevent deadlock in subprocess executor by allowing concurrent I/O#901
Aaron ("AJ") Steers (aaronsteers) merged 4 commits intomainfrom
devin/1765249790-fix-executor-deadlock

Conversation

@aaronsteers
Copy link
Contributor

@aaronsteers Aaron ("AJ") Steers (aaronsteers) commented Dec 9, 2025

Summary

Fixes a potential deadlock in _stream_from_subprocess that could cause destination writes to hang indefinitely (reported in #871 as MySQL → Postgres sync hanging during write phase).

Root cause: The previous implementation called input_thread.join() immediately after starting the input thread, before reading from stdout. This creates a classic pipe deadlock:

  1. Input thread writes to subprocess stdin
  2. Subprocess processes data and writes to stdout
  3. If stdout buffer fills up, subprocess blocks
  4. But we're blocked waiting for input thread to finish before reading stdout
  5. Input thread can't finish because subprocess is blocked
  6. Deadlock

Fix: Move input_thread.join() to the finally block after reading is complete, allowing input and output to happen concurrently.

Closes #871

Updates since last revision

Addressed CodeRabbit feedback on exception handling order: Exit code is now checked first before raising input thread exceptions. This ensures AirbyteSubprocessFailedError is raised (with input thread exception as original_exception) when the subprocess fails, preserving the error-handling contract for callers that catch AirbyteSubprocessFailedError.

Review & Testing Checklist for Human

  • Verify the deadlock analysis is correct - The fix is based on theoretical understanding of pipe buffering, not empirical reproduction of the MySQL → Postgres hang
  • Test a high-volume destination write - Run a sync with significant data volume (e.g., source-faker → destination-postgres) to verify no hangs occur
  • Verify exception handling order - Exit code is now checked before input thread exceptions; confirm subprocess failures are properly wrapped in AirbyteSubprocessFailedError
  • Check for regressions - This is core executor code affecting all connectors; run integration tests across multiple connector types

Recommended test plan:

  1. Run python examples/run_faker.py with a large record count
  2. If possible, reproduce the original reporter's scenario (MySQL → Postgres incremental sync)
  3. Verify early termination scenarios (Ctrl+C during sync) still clean up properly

Notes

  • The 10-second timeout on input_thread.join() in the finally block prevents indefinite hangs during cleanup
  • BrokenPipeError continues to be ignored as it's expected during graceful shutdown
  • All 218 unit tests pass locally

Requested by: AJ Steers (Aaron ("AJ") Steers (@aaronsteers))
Link to Devin run: https://app.devin.ai/sessions/5094f54055bd4fc68a357aa705357fce

Summary by CodeRabbit

  • Bug Fixes
    • Run input streaming concurrently with output processing to prevent deadlocks and ensure clean shutdown sequencing.
    • Input streaming now runs as a background thread and is properly awaited during termination.
    • Improved error handling: non‑zero subprocess exits are surfaced as errors (except expected terminations and broken pipe), and input-thread exceptions are reported after shutdown.

✏️ Tip: You can customize this high-level summary in your review settings.

Important

Auto-merge enabled.

This PR is set to merge automatically when all requirements are met.

The previous implementation called input_thread.join() before reading
from stdout, which could cause a deadlock when:
1. The input thread writes to stdin
2. The subprocess processes and writes to stdout
3. If stdout buffer fills up, subprocess blocks
4. But we're waiting for input thread to finish before reading stdout
5. Input thread can't finish because subprocess is blocked on stdout

This fix allows input and output to happen concurrently by moving the
input_thread.join() to the finally block after reading is complete.

Closes #871

Co-Authored-By: AJ Steers <aj@airbyte.io>
@devin-ai-integration
Copy link
Contributor

Original prompt from AJ Steers
Received message in Slack channel #ask-devin-ai:

@Devin - Triage the latest 10 issues in the PyAirbyte repo that are from external community members / contributors. Don't comment on them directly but look into them and prepare for me a set of suggestions on how to disposition each.
Thread URL: https://airbytehq-team.slack.com/archives/C08BHPUMEPJ/p1765247038245949

@devin-ai-integration
Copy link
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link

github-actions bot commented Dec 9, 2025

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This PyAirbyte Version

You can test this version of PyAirbyte using the following:

# Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1765249790-fix-executor-deadlock' pyairbyte --help

# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1765249790-fix-executor-deadlock'

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /fix-pr - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test-pr - Runs tests with the updated PyAirbyte

Community Support

Questions? Join the #pyairbyte channel in our Slack workspace.

📝 Edit this welcome message.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 9, 2025

📝 Walkthrough
📝 Walkthrough

Pre-merge checks and finishing touches

✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately and concisely summarizes the main change: fixing a deadlock by allowing concurrent I/O in the subprocess executor.
Linked Issues check ✅ Passed The PR directly addresses all coding objectives from issue #871: prevents indefinite blocking during write phase, fixes the concurrent I/O issue by moving join() to finally block, implements proper error handling with exit codes, and maintains existing cleanup behavior.
Out of Scope Changes check ✅ Passed All changes are directly related to fixing the deadlock issue in the subprocess executor as specified in issue #871; no unrelated modifications detected.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch devin/1765249790-fix-executor-deadlock

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
airbyte/_executors/base.py (1)

72-78: Nice docstring explaining the deadlock scenario

The docstring clearly captures the stdout-buffer deadlock and the rationale for concurrent stdin/stdout handling; this should make future maintenance much easier, and the behavior matches the new control flow as far as I can see. 👍 Wdyt about keeping this as the single source of truth and linking to issue #871 here for extra context, or is that overkill?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1b48476 and 656b5da.

📒 Files selected for processing (1)
  • airbyte/_executors/base.py (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (No Creds)
🔇 Additional comments (1)
airbyte/_executors/base.py (1)

100-102: Comment accurately documents the key concurrency change

The inline comment about not joining the input thread immediately matches the new behavior and the deadlock analysis from the PR description; letting the stdin thread run while stdout is consumed should avoid the prior pipe back‑pressure stall. I don’t see any mismatch between the comment and the code here, so this looks good to me—anything specific you’d want to mention about the subsequent timeout join behavior, or is that sufficiently covered by the code below already, wdyt?

@github-actions
Copy link

github-actions bot commented Dec 9, 2025

PyTest Results (Fast Tests Only, No Creds)

348 tests  ±0   348 ✅ ±0   5m 55s ⏱️ +6s
  1 suites ±0     0 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit 60e87fa. ± Comparison against base commit d9aba9c.

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Dec 9, 2025

PyTest Results (Full)

416 tests  ±0   399 ✅ ±0   23m 40s ⏱️ - 1m 44s
  1 suites ±0    17 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit 60e87fa. ± Comparison against base commit d9aba9c.

♻️ This comment has been updated with latest results.

@aaronsteers
Copy link
Contributor Author

Devin, please review bot feedback.

…ror wrapper

Addresses CodeRabbit feedback: check exit code first and raise
AirbyteSubprocessFailedError (with input thread exception as original_exception)
before raising raw input thread exceptions. This ensures callers that catch
AirbyteSubprocessFailedError receive the proper wrapped exception.

Co-Authored-By: AJ Steers <aj@airbyte.io>
@devin-ai-integration
Copy link
Contributor

Reviewed and addressed the CodeRabbit feedback. The bot correctly identified that the exception handling order was wrong - I was raising the raw input thread exception before checking the exit code, which would bypass the AirbyteSubprocessFailedError wrapper that callers expect to catch.

Fix (commit d26a8a1): Reordered the checks so that:

  1. Exit code is checked first - if non-zero, raise AirbyteSubprocessFailedError with the input thread exception as original_exception
  2. Only if exit code is OK, then raise the raw input thread exception (if present and not BrokenPipeError)

This ensures callers like airbyte/_connector_base.py:493 that catch AirbyteSubprocessFailedError receive the proper wrapped exception.

All 218 unit tests pass locally.

Per CodeRabbit feedback: if input_thread.join(timeout=10) expires and the
thread is still alive, a non-daemon thread would block Python interpreter
shutdown. Setting daemon=True ensures the thread won't prevent process
shutdown while still allowing graceful cleanup via the join timeout.

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte/_executors/base.py (1)

140-142: Thread join placement looks correct!

Joining the input thread after process termination (with a 10-second timeout) makes sense. Once the process is terminated, stdin closes, causing the input thread to exit.

One optional thought: would it be helpful to log a debug message if the join times out and the thread is still alive? This could aid troubleshooting if the input thread ever gets stuck, wdyt?

Example:

if input_thread is not None:
    input_thread.join(timeout=10)
    if input_thread.is_alive():
        # Optional: log that thread is still alive but daemon will not block shutdown
        pass
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d26a8a1 and 60e87fa.

📒 Files selected for processing (1)
  • airbyte/_executors/base.py (3 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-11T19:53:44.427Z
Learnt from: yohannj
Repo: airbytehq/PyAirbyte PR: 716
File: airbyte/logs.py:384-402
Timestamp: 2025-07-11T19:53:44.427Z
Learning: In the PyAirbyte project, when reviewing PRs, maintain clear separation of concerns. Don't suggest changes that are outside the scope of the PR's main objective, even if they would improve consistency or fix other issues. This helps with reviewing changes and potential reverts.

Applied to files:

  • airbyte/_executors/base.py
🧬 Code graph analysis (1)
airbyte/_executors/base.py (1)
airbyte/exceptions.py (1)
  • AirbyteSubprocessFailedError (274-277)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (No Creds)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (4)
airbyte/_executors/base.py (4)

72-78: Excellent documentation of the concurrent I/O approach!

The updated docstring clearly explains why the input thread runs concurrently with stdout reading, making the deadlock prevention strategy explicit for future maintainers.


91-102: LGTM - Daemon thread and concurrent I/O strategy properly implemented!

The daemon thread setting (line 98) and the deferred join approach (lines 101-102) correctly address the deadlock issue and prevent interpreter shutdown hangs. This incorporates the feedback from the previous review.


147-165: Exception handling logic is spot-on!

The reordered exception handling correctly prioritizes subprocess failure (lines 147-159) over input thread exceptions (lines 161-165), ensuring that AirbyteSubprocessFailedError is raised when the subprocess fails, with the input thread exception attached as context when appropriate.

This properly addresses the previous review feedback and maintains the error-handling contract for callers like airbyte/_connector_base.py:493 that catch AirbyteSubprocessFailedError.

The logic correctly handles all scenarios:

  • Subprocess failure always raises AirbyteSubprocessFailedError (with input exception as context if relevant)
  • Input thread exceptions only surface when the subprocess succeeded
  • BrokenPipeError is appropriately ignored during graceful shutdown

64-169: Overall implementation successfully fixes the deadlock issue!

This PR correctly addresses the root cause of issue #871 by enabling concurrent stdin/stdout I/O. The key improvements:

  1. ✅ Input thread runs concurrently without blocking stdout reading (prevents pipe deadlock)
  2. ✅ Daemon thread prevents interpreter shutdown hangs if thread gets stuck
  3. ✅ Thread join moved to finally block with timeout for proper cleanup
  4. ✅ Exception handling prioritizes subprocess failures while preserving input thread context
  5. BrokenPipeError appropriately ignored during graceful shutdown

Both previous review concerns have been incorporated, and the implementation handles all error scenarios correctly. The code is well-documented and maintains proper resource cleanup.

@aaronsteers Aaron ("AJ") Steers (aaronsteers) merged commit 9c9f4a7 into main Dec 17, 2025
24 checks passed
@aaronsteers Aaron ("AJ") Steers (aaronsteers) deleted the devin/1765249790-fix-executor-deadlock branch December 17, 2025 05:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Incremental MySQL → Postgres sync hangs during write phase (PyAirbyte stuck in tally_confirmed_writes)

1 participant