fix(workflow): retry transient DB errors in pipeline instead of crashing#93
fix(workflow): retry transient DB errors in pipeline instead of crashing#93allenhutchison merged 5 commits intomainfrom
Conversation
The pipeline orchestrator crashed after ~10.8 days because a transient Supabase connection timeout (OperationalError) propagated out of _pipeline_iteration() and was caught by the top-level except, shutting down the entire pipeline. Now transient DB errors are caught inside the main loop and retried with exponential backoff (5s→10s→20s→40s→60s cap), only shutting down after 5 consecutive failures.
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
WalkthroughAdds float env parsing and three DB-retry fields to PipelineConfig; updates PipelineConfig.from_env to read/validate new vars; updates orchestrator loop to catch Changes
Sequence Diagram(s)sequenceDiagram
participant Orchestrator as Orchestrator
participant Config as Config\n(PipelineConfig)
participant DB as Database
participant Logger as Logger
rect rgba(240,248,255,0.5)
Orchestrator->>Config: load retry settings (max_consecutive_db_errors, db_retry_base_wait, db_retry_max_wait)
end
Orchestrator->>DB: perform pipeline iteration
alt iteration succeeds
DB-->>Orchestrator: success
Orchestrator->>Orchestrator: reset consecutive_db_errors
Orchestrator->>Logger: log progress or idle
else OperationalError occurs
DB-->>Orchestrator: OperationalError
rect rgba(255,228,196,0.5)
Orchestrator->>Orchestrator: increment consecutive_db_errors
Orchestrator->>Logger: warn, compute backoff (base_wait * 2^n, capped at max_wait)
Orchestrator->>Orchestrator: sleep(backoff, small increments)
Orchestrator->>Orchestrator: if consecutive_db_errors >= max_consecutive_db_errors -> shutdown
end
else other Exception
DB-->>Orchestrator: Exception
Orchestrator->>Orchestrator: reset consecutive_db_errors
Orchestrator->>Logger: log and sleep idle interval
end
Orchestrator->>Logger: finalize and restore signal handlers
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Comment Tip CodeRabbit can use Trivy to scan for security misconfigurations and secrets in Infrastructure as Code files.Add a .trivyignore file to your project to customize which findings Trivy reports. |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/workflow/config.py`:
- Around line 79-96: After parsing raw into value in the config parsing logic
(the block that sets value = float(raw) and uses variables raw, name, min_val,
max_val), reject non-finite floats by checking math.isfinite(value) and raising
a ValueError if it returns False (e.g., "Invalid value for {name}: '{raw}' is
not a finite number"); place this check immediately after the except block and
before the min_val/max_val comparisons so "nan", "inf", "-inf" are rejected
early.
In `@src/workflow/orchestrator.py`:
- Around line 217-219: The generic except block should be split to handle DB
operational errors separately: replace the bare "except Exception" with a
specific "except OperationalError as e" handler that increments
consecutive_db_errors, logs the DB error via logger.exception and enforces
max_consecutive_db_errors (triggering appropriate shutdown/retry logic),
followed by a separate "except Exception as e" handler that resets
consecutive_db_errors to 0 and logs the unexpected error and sleeps using
self.pipeline_config.idle_wait_seconds; ensure you import the correct
OperationalError (e.g., from sqlalchemy.exc) and update any code paths that
check consecutive_db_errors and max_consecutive_db_errors accordingly.
- Around line 200-205: The fatal DB error branch logs and breaks out but never
flips the orchestrator's running flag, so _running remains True; update the
branch in the block that checks consecutive_db_errors >= max_errors (where
logger.error is called and currently uses break) to mark the orchestrator as
stopped before exiting the loop—call the instance stop routine or set _running =
False (consistent with how run() normally ends) and then break so get_status()
reflects the stopped state; ensure you reference the same control path that
currently uses consecutive_db_errors, max_errors, logger.error, and break.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a86dfdf6-1c1f-4d9e-8d68-3fae4006eed3
📒 Files selected for processing (2)
src/workflow/config.pysrc/workflow/orchestrator.py
📜 Review details
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Use PascalCase for class names (e.g.,TranscriptionManager,GeminiFileSearchManager)
Use snake_case for function and method names (e.g.,handle_transcription,search_vector_db)
Use UPPER_SNAKE_CASE for constants (e.g.,TRANSCRIPTION_OUTPUT_SUFFIX)
Use leading underscore for private methods (e.g.,_parse_response)
Use try/except with specific exception types instead of bare exceptions
Log errors with context using logger.error with traceback
Implement retry logic with exponential backoff for network operations
Validate inputs with Pydantic schemas
Files:
src/workflow/config.pysrc/workflow/orchestrator.py
src/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/**/*.py: Use Factory pattern for model loading (Whisper, AI clients)
Use Service Layer pattern for separation of business logic from routes
Include docstrings in all modules, classes, and functions to describe purpose and behavior
Maintain type hints on function signatures and class attributes for better IDE support and type checking
Files:
src/workflow/config.pysrc/workflow/orchestrator.py
src/workflow/orchestrator.py
📄 CodeRabbit inference engine (AGENTS.md)
Implement PipelineOrchestrator with GPU-optimized continuous processing for podcast pipeline workflow
Files:
src/workflow/orchestrator.py
🧠 Learnings (1)
📚 Learning: 2025-12-30T23:15:26.251Z
Learnt from: CR
Repo: allenhutchison/podcast-rag PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-30T23:15:26.251Z
Learning: Applies to src/workflow/orchestrator.py : Implement PipelineOrchestrator with GPU-optimized continuous processing for podcast pipeline workflow
Applied to files:
src/workflow/orchestrator.py
🧬 Code graph analysis (1)
src/workflow/orchestrator.py (1)
tests/test_workflow.py (1)
pipeline_config(42-53)
🪛 Ruff (0.15.4)
src/workflow/config.py
[warning] 82-84: Avoid specifying long messages outside the exception class
(TRY003)
[warning] 87-89: Avoid specifying long messages outside the exception class
(TRY003)
[warning] 92-94: Avoid specifying long messages outside the exception class
(TRY003)
src/workflow/orchestrator.py
[warning] 201-204: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
- Reject non-finite floats (nan, inf) in _get_float_env to prevent broken backoff behavior - Set _running = False before breaking on max consecutive DB errors so get_status() reflects the stopped state - Reset consecutive_db_errors in generic except handler so non-DB errors break the consecutive DB error streak
There was a problem hiding this comment.
♻️ Duplicate comments (1)
src/workflow/orchestrator.py (1)
223-230:⚠️ Potential issue | 🟠 MajorClear
_runningon the outer failure path too.If
_startup()or any other exception escapes to Line 223,run()still returns after_shutdown()with_running == True, soget_status()reports the orchestrator as running even though it has already stopped.Suggested fix
finally: + self._running = False # Restore signal handlers signal.signal(signal.SIGINT, original_sigint) signal.signal(signal.SIGTERM, original_sigterm) self._shutdown()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/workflow/orchestrator.py` around lines 223 - 230, The except/finally path in run() leaves self._running True when an exception from _startup() or elsewhere escapes, so get_status() can report running after shutdown; update the exception/finally handling in run() to set self._running = False on the outer failure path (e.g., in the except Exception: block or immediately before calling self._shutdown() in the finally) so that after _shutdown() completes the _running flag accurately reflects the stopped state; touch run(), _startup(), _shutdown(), and get_status() usages only to clear/set self._running appropriately.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/workflow/orchestrator.py`:
- Around line 223-230: The except/finally path in run() leaves self._running
True when an exception from _startup() or elsewhere escapes, so get_status() can
report running after shutdown; update the exception/finally handling in run() to
set self._running = False on the outer failure path (e.g., in the except
Exception: block or immediately before calling self._shutdown() in the finally)
so that after _shutdown() completes the _running flag accurately reflects the
stopped state; touch run(), _startup(), _shutdown(), and get_status() usages
only to clear/set self._running appropriately.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f552e992-2b4f-4805-835f-c924c53b7305
📒 Files selected for processing (2)
src/workflow/config.pysrc/workflow/orchestrator.py
📜 Review details
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Use PascalCase for class names (e.g.,TranscriptionManager,GeminiFileSearchManager)
Use snake_case for function and method names (e.g.,handle_transcription,search_vector_db)
Use UPPER_SNAKE_CASE for constants (e.g.,TRANSCRIPTION_OUTPUT_SUFFIX)
Use leading underscore for private methods (e.g.,_parse_response)
Use try/except with specific exception types instead of bare exceptions
Log errors with context using logger.error with traceback
Implement retry logic with exponential backoff for network operations
Validate inputs with Pydantic schemas
Files:
src/workflow/orchestrator.pysrc/workflow/config.py
src/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/**/*.py: Use Factory pattern for model loading (Whisper, AI clients)
Use Service Layer pattern for separation of business logic from routes
Include docstrings in all modules, classes, and functions to describe purpose and behavior
Maintain type hints on function signatures and class attributes for better IDE support and type checking
Files:
src/workflow/orchestrator.pysrc/workflow/config.py
src/workflow/orchestrator.py
📄 CodeRabbit inference engine (AGENTS.md)
Implement PipelineOrchestrator with GPU-optimized continuous processing for podcast pipeline workflow
Files:
src/workflow/orchestrator.py
🧠 Learnings (2)
📓 Common learnings
Learnt from: CR
Repo: allenhutchison/podcast-rag PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-30T23:15:26.251Z
Learning: Applies to **/*.py : Implement retry logic with exponential backoff for network operations
📚 Learning: 2025-12-30T23:15:26.251Z
Learnt from: CR
Repo: allenhutchison/podcast-rag PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-30T23:15:26.251Z
Learning: Applies to src/workflow/orchestrator.py : Implement PipelineOrchestrator with GPU-optimized continuous processing for podcast pipeline workflow
Applied to files:
src/workflow/orchestrator.py
🧬 Code graph analysis (1)
src/workflow/orchestrator.py (1)
tests/test_workflow.py (1)
pipeline_config(42-53)
🪛 Ruff (0.15.4)
src/workflow/orchestrator.py
[warning] 201-204: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
src/workflow/config.py
[warning] 83-85: Avoid specifying long messages outside the exception class
(TRY003)
[warning] 88-90: Avoid specifying long messages outside the exception class
(TRY003)
[warning] 93-95: Avoid specifying long messages outside the exception class
(TRY003)
[warning] 98-100: Avoid specifying long messages outside the exception class
(TRY003)
Set self._running = False in the finally block of run() so get_status() accurately reflects stopped state after startup failures or any other exit path.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/workflow/orchestrator.py`:
- Around line 207-216: The retry handler currently calls time.sleep(wait) which
blocks shutdown because PipelineOrchestrator.stop() only flips self._running;
modify the retry backoff in the function containing the shown block (the code
that computes wait using self.pipeline_config.db_retry_base_wait /
db_retry_max_wait and increments consecutive_db_errors) to replace the single
blocking time.sleep(wait) with a loop that sleeps in 1-second slices (or min(1,
remaining)) while checking self._running between slices and aborts early if
self._running is False; keep the same wait calculation and the existing
logger.warning call but ensure the loop decrements remaining time until zero or
until self._running is False so shutdown can proceed promptly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 966ed3d1-7954-4615-ac95-f0534ab5226b
📒 Files selected for processing (1)
src/workflow/orchestrator.py
📜 Review details
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Use PascalCase for class names (e.g.,TranscriptionManager,GeminiFileSearchManager)
Use snake_case for function and method names (e.g.,handle_transcription,search_vector_db)
Use UPPER_SNAKE_CASE for constants (e.g.,TRANSCRIPTION_OUTPUT_SUFFIX)
Use leading underscore for private methods (e.g.,_parse_response)
Use try/except with specific exception types instead of bare exceptions
Log errors with context using logger.error with traceback
Implement retry logic with exponential backoff for network operations
Validate inputs with Pydantic schemas
Files:
src/workflow/orchestrator.py
src/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/**/*.py: Use Factory pattern for model loading (Whisper, AI clients)
Use Service Layer pattern for separation of business logic from routes
Include docstrings in all modules, classes, and functions to describe purpose and behavior
Maintain type hints on function signatures and class attributes for better IDE support and type checking
Files:
src/workflow/orchestrator.py
src/workflow/orchestrator.py
📄 CodeRabbit inference engine (AGENTS.md)
Implement PipelineOrchestrator with GPU-optimized continuous processing for podcast pipeline workflow
Files:
src/workflow/orchestrator.py
🧠 Learnings (2)
📓 Common learnings
Learnt from: CR
Repo: allenhutchison/podcast-rag PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-30T23:15:26.251Z
Learning: Applies to src/workflow/orchestrator.py : Implement PipelineOrchestrator with GPU-optimized continuous processing for podcast pipeline workflow
📚 Learning: 2025-12-30T23:15:26.251Z
Learnt from: CR
Repo: allenhutchison/podcast-rag PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-30T23:15:26.251Z
Learning: Applies to src/workflow/orchestrator.py : Implement PipelineOrchestrator with GPU-optimized continuous processing for podcast pipeline workflow
Applied to files:
src/workflow/orchestrator.py
🧬 Code graph analysis (1)
src/workflow/orchestrator.py (1)
tests/test_workflow.py (1)
pipeline_config(42-53)
🪛 Ruff (0.15.4)
src/workflow/orchestrator.py
[warning] 201-204: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🔇 Additional comments (2)
src/workflow/orchestrator.py (2)
178-205: Nice separation of DB and non-DB failures.Resetting
consecutive_db_errorson successful iterations and on unexpected non-DB failures keeps mixed error sequences from tripping the DB shutdown budget.Also applies to: 218-221
225-230: Good exit-path cleanup.Restoring the original signal handlers and clearing
_runninginfinallykeepsget_status()accurate after startup failures and fatal DB exits.
Replace blocking time.sleep(wait) with 1-second slices that check self._running between iterations, so graceful shutdown proceeds promptly instead of waiting up to 60s for the backoff to complete.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
src/workflow/orchestrator.py (1)
201-204:⚠️ Potential issue | 🟡 MinorKeep the traceback on the final DB failure path.
This is the last
OperationalErrorbefore shutdown, butlogger.error(...{e})only records the exception string. Uselogger.exception(...)orlogger.error(..., exc_info=True)here so the terminal failure keeps its stack trace.💡 Minimal fix
- logger.error( - f"Database unreachable after {consecutive_db_errors} " - f"consecutive errors, shutting down: {e}" - ) + logger.exception( + "Database unreachable after %s consecutive errors, shutting down", + consecutive_db_errors, + )As per coding guidelines, "Log errors with context using logger.error with traceback".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/workflow/orchestrator.py` around lines 201 - 204, The shutdown log currently calls logger.error(... f"...{e}") and thus loses the traceback; in the final DB failure path (the exception handler that references consecutive_db_errors and the caught OperationalError variable e inside orchestrator.py) change the call to include the exception traceback — either use logger.exception(...) or call logger.error(..., exc_info=True) so the stack trace is preserved when logging the terminal failure.
🧹 Nitpick comments (1)
src/workflow/orchestrator.py (1)
182-224: Verify the retry boundary is safe for non-DB side effects.
OperationalErrornow retries the entire_pipeline_iteration(). If that exception can escape aftertranscribe_single(),_help_post_process(), or post-processing submission has already done work, the next pass reruns the whole unit of work and can duplicate expensive processing. Please either narrow the retry to DB-only calls or confirm those side effects are idempotent before keeping the outer retry loop.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/workflow/orchestrator.py` around lines 182 - 224, The OperationalError catch currently wraps the whole loop and can cause repeated re-execution of non-idempotent work in _pipeline_iteration and _help_post_process (e.g., transcribe_single and post-processing submission); to fix, narrow the retry boundary so only database interactions raise OperationalError to this outer handler or make the side-effects idempotent: either move the try/except for OperationalError into DB-access functions called by _pipeline_iteration (so only DB calls trigger the backoff), or modify _pipeline_iteration/_help_post_process (and transcribe_single) to record and check idempotency markers in the DB before doing expensive work so re-running the loop is safe.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/workflow/orchestrator.py`:
- Around line 201-204: The shutdown log currently calls logger.error(...
f"...{e}") and thus loses the traceback; in the final DB failure path (the
exception handler that references consecutive_db_errors and the caught
OperationalError variable e inside orchestrator.py) change the call to include
the exception traceback — either use logger.exception(...) or call
logger.error(..., exc_info=True) so the stack trace is preserved when logging
the terminal failure.
---
Nitpick comments:
In `@src/workflow/orchestrator.py`:
- Around line 182-224: The OperationalError catch currently wraps the whole loop
and can cause repeated re-execution of non-idempotent work in
_pipeline_iteration and _help_post_process (e.g., transcribe_single and
post-processing submission); to fix, narrow the retry boundary so only database
interactions raise OperationalError to this outer handler or make the
side-effects idempotent: either move the try/except for OperationalError into
DB-access functions called by _pipeline_iteration (so only DB calls trigger the
backoff), or modify _pipeline_iteration/_help_post_process (and
transcribe_single) to record and check idempotency markers in the DB before
doing expensive work so re-running the loop is safe.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 41e45da7-1362-4a21-97dd-f0df7ab39947
📒 Files selected for processing (1)
src/workflow/orchestrator.py
📜 Review details
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Use PascalCase for class names (e.g.,TranscriptionManager,GeminiFileSearchManager)
Use snake_case for function and method names (e.g.,handle_transcription,search_vector_db)
Use UPPER_SNAKE_CASE for constants (e.g.,TRANSCRIPTION_OUTPUT_SUFFIX)
Use leading underscore for private methods (e.g.,_parse_response)
Use try/except with specific exception types instead of bare exceptions
Log errors with context using logger.error with traceback
Implement retry logic with exponential backoff for network operations
Validate inputs with Pydantic schemas
Files:
src/workflow/orchestrator.py
src/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/**/*.py: Use Factory pattern for model loading (Whisper, AI clients)
Use Service Layer pattern for separation of business logic from routes
Include docstrings in all modules, classes, and functions to describe purpose and behavior
Maintain type hints on function signatures and class attributes for better IDE support and type checking
Files:
src/workflow/orchestrator.py
src/workflow/orchestrator.py
📄 CodeRabbit inference engine (AGENTS.md)
Implement PipelineOrchestrator with GPU-optimized continuous processing for podcast pipeline workflow
Files:
src/workflow/orchestrator.py
🧠 Learnings (2)
📓 Common learnings
Learnt from: CR
Repo: allenhutchison/podcast-rag PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-30T23:15:26.251Z
Learning: Applies to src/workflow/orchestrator.py : Implement PipelineOrchestrator with GPU-optimized continuous processing for podcast pipeline workflow
📚 Learning: 2025-12-30T23:15:26.251Z
Learnt from: CR
Repo: allenhutchison/podcast-rag PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-30T23:15:26.251Z
Learning: Applies to src/workflow/orchestrator.py : Implement PipelineOrchestrator with GPU-optimized continuous processing for podcast pipeline workflow
Applied to files:
src/workflow/orchestrator.py
🪛 Ruff (0.15.4)
src/workflow/orchestrator.py
[warning] 201-204: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
Use exc_info=True so the full stack trace is logged when the pipeline shuts down after max consecutive DB errors.
Summary
sqlalchemy.exc.OperationalError(connection timeouts, server gone away) inside the pipeline main loop instead of letting it crash the orchestratorExceptionin a single iteration now logs and continues instead of killing the pipelinePipelineConfigfields (max_consecutive_db_errors,db_retry_base_wait,db_retry_max_wait) configurable via environment variablesTest plan
Summary by CodeRabbit
New Features
Bug Fixes & Reliability