fix(api): rewrite rls_transaction to retry mid-query replica failures with primary fallback#10374
Conversation
… with primary fallback
|
✅ All necessary |
|
✅ Conflict Markers Resolved All conflict markers have been successfully resolved in this pull request. |
| def _handle_retry(self, error): | ||
| try: | ||
| connections[self._alias].close() | ||
| except Exception: |
Check notice
Code scanning / CodeQL
Empty except Note
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 5 days ago
In general, to fix an "empty except" issue, either narrow the exception type and handle it explicitly, or at least document and log why the exception is being ignored, so that failures are observable and justified. Avoid bare except Exception: with a pass, especially in infrastructure code like database utilities.
Here, the best minimal fix that doesn’t change existing external behavior is to keep swallowing the exception (so retries continue unimpeded) but add a log message in the except block explaining that closing the connection failed and that the error is being ignored. We already have a logger in this module, so no new imports are needed. Concretely, in api/src/backend/api/db_utils.py, inside the _RLSAttempt._handle_retry method, replace:
235: try:
236: connections[self._alias].close()
237: except Exception:
238: passwith something like:
235: try:
236: connections[self._alias].close()
237: except Exception as close_error:
238: logger.warning(
239: "Failed to close DB connection for alias %s during RLS retry; "
240: "continuing with retry. Error: %r",
241: self._alias,
242: close_error,
243: )This preserves the semantics (no re-raise), adds a clear explanation for the ignored exception, and provides diagnostic information if connection closing starts failing.
| @@ -234,8 +234,13 @@ | ||
| def _handle_retry(self, error): | ||
| try: | ||
| connections[self._alias].close() | ||
| except Exception: | ||
| pass | ||
| except Exception as close_error: | ||
| logger.warning( | ||
| "Failed to close DB connection for alias %s during RLS retry; " | ||
| "continuing with retry. Error: %r", | ||
| self._alias, | ||
| close_error, | ||
| ) | ||
| attempt = self._iterator._attempt | ||
| max_att = self._iterator._max_attempts | ||
| delay = REPLICA_RETRY_BASE_DELAY * (2 ** (attempt - 1)) |
…R-1225-fix-read-queries-on-the-read-replica-that-doesnt-use-the-write-replica-on-retries
… with primary fallback - Add api/CHANGELOG.md entry
🔒 Container Security ScanImage: 📊 Vulnerability Summary
3 package(s) affected
|
There was a problem hiding this comment.
Pull request overview
This PR refactors the API’s Postgres RLS transaction helper to correctly retry when a read-replica fails mid-query, including a primary DB fallback, and migrates the codebase to the new retryable for/with usage pattern.
Changes:
- Replaces
rls_transactionfrom a single-yield@contextmanagerinto an iterable that yields per-attempt context managers (replica retries + primary fallback). - Migrates production call sites to
for attempt in rls_transaction(...): with attempt:so mid-bodyOperationalErrorcan trigger a full re-execution. - Updates/adjusts unit + integration tests and removes the now-redundant hand-rolled retry logic in integrations.
Reviewed changes
Copilot reviewed 36 out of 36 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| api/src/backend/api/db_utils.py | Core rls_transaction rewrite (iterator + retry/fallback semantics). |
| api/src/backend/tasks/jobs/integrations.py | Removes manual replica retry loop; relies on rls_transaction retries. |
| api/src/backend/tasks/jobs/scan.py | Migrates scan DB operations to new for/with retry pattern. |
| api/src/backend/tasks/tasks.py | Migrates task DB access (scheduled scans, outputs, integrations) to new pattern. |
| api/src/backend/tasks/jobs/backfill.py | Migrates backfill jobs to new pattern. |
| api/src/backend/tasks/jobs/deletion.py | Migrates deletion workflows to new pattern. |
| api/src/backend/tasks/jobs/export.py | Migrates export path DB access to new pattern. |
| api/src/backend/tasks/jobs/muting.py | Migrates muting job DB access to new pattern. |
| api/src/backend/tasks/jobs/report.py | Migrates report generation DB access to new pattern. |
| api/src/backend/tasks/jobs/reports/base.py | Migrates report data loading to new pattern. |
| api/src/backend/tasks/jobs/threatscore.py | Migrates threatscore DB access to new pattern. |
| api/src/backend/tasks/jobs/threatscore_utils.py | Migrates threatscore data-loading/aggregation to new pattern. |
| api/src/backend/tasks/jobs/attack_paths/scan.py | Migrates attack paths scan DB access to new pattern. |
| api/src/backend/tasks/jobs/attack_paths/findings.py | Migrates attack paths findings fetch/enrichment to new pattern. |
| api/src/backend/tasks/jobs/attack_paths/db_utils.py | Migrates attack paths DB helpers to new pattern. |
| api/src/backend/tasks/beat.py | Migrates beat-scheduled scan creation to new pattern. |
| api/src/backend/config/celery.py | Migrates task-result persistence to new pattern. |
| api/src/backend/api/base_views.py | Migrates request initialization tenant/user RLS setup to new pattern. |
| api/src/backend/api/renderers.py | Migrates conditional include-render RLS wrapping to new pattern. |
| api/src/backend/api/v1/views.py | Migrates SAML domain/config lookups to new pattern. |
| api/src/backend/api/utils.py | Migrates integration config updates to new pattern. |
| api/src/backend/api/decorators.py | Migrates provider-deletion guard queries to new pattern. |
| api/src/backend/api/adapters.py | Migrates social signup post-create writes to new pattern. |
| api/src/backend/api/migrations/0008_daily_scheduled_tasks_update.py | Updates migration DB writes to new pattern. |
| api/src/backend/api/management/commands/findings.py | Updates management command DB writes to new pattern. |
| api/src/backend/conftest.py | Updates fixtures to new pattern. |
| api/src/backend/api/tests/test_db_utils.py | Adds/updates unit tests for mid-body retry + attempt semantics. |
| api/src/backend/api/tests/integration/test_rls_transaction.py | Updates integration tests to new for/with usage. |
| api/src/backend/api/tests/test_utils.py | Updates mocks for iterable rls_transaction. |
| api/src/backend/api/tests/test_decorators.py | Updates mocks for iterable rls_transaction. |
| api/src/backend/tasks/tests/test_export.py | Updates mocks for iterable rls_transaction. |
| api/src/backend/tasks/tests/test_integrations.py | Updates mocks; removes obsolete manual-retry test. |
| api/src/backend/tasks/tests/test_scan.py | Updates mocks for iterable rls_transaction. |
| api/src/backend/tasks/tests/test_tasks.py | Updates mocks for iterable rls_transaction. |
| api/src/backend/tasks/tests/test_attack_paths_scan.py | Updates mocks for iterable rls_transaction. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| else: | ||
| tag_instance = tag_cache[tag_key] | ||
| tags.append(tag_instance) | ||
| resource_instance.upsert_or_delete_tags(tags=tags) |
| # Fetch all compliance requirement overview rows for this scan | ||
| requirement_rows = ComplianceRequirementOverview.objects.filter( | ||
| tenant_id=tenant_id, scan_id=scan_id | ||
| ).values( | ||
| "compliance_id", | ||
| "requirement_id", | ||
| "requirement_status", | ||
| ) | ||
|
|
||
| # Group by (compliance_id, requirement_id) across regions | ||
| requirement_statuses = defaultdict( | ||
| lambda: {"fail_count": 0, "pass_count": 0, "total_count": 0} | ||
| ) | ||
| if not requirement_rows: | ||
| return {"status": "no compliance data to backfill"} |
| completed_scans = ( | ||
| Scan.objects.filter(**scan_filter) | ||
| .order_by("provider_id", "-completed_at") | ||
| .values("id", "provider_id", "completed_at") | ||
| ) | ||
|
|
||
| if not completed_scans: | ||
| return {"status": "no scans to backfill"} | ||
| if not completed_scans: | ||
| return {"status": "no scans to backfill"} | ||
|
|
||
| # Keep only latest scan per provider/day | ||
| latest_scans_by_day = {} | ||
| for scan in completed_scans: | ||
| key = (scan["provider_id"], scan["completed_at"].date()) | ||
| if key not in latest_scans_by_day: | ||
| latest_scans_by_day[key] = scan | ||
| # Keep only latest scan per provider/day | ||
| latest_scans_by_day = {} | ||
| for scan in completed_scans: | ||
| key = (scan["provider_id"], scan["completed_at"].date()) | ||
| if key not in latest_scans_by_day: | ||
| latest_scans_by_day[key] = scan |
| completed_scans = ( | ||
| Scan.objects.filter(**scan_filter) | ||
| .order_by("-completed_at") | ||
| .values("id", "completed_at") | ||
| ) | ||
|
|
||
| if not completed_scans: | ||
| return {"status": "no scans to backfill"} | ||
| if not completed_scans: | ||
| return {"status": "no scans to backfill"} | ||
|
|
||
| # Keep only latest scan per day | ||
| latest_scans_by_day = {} | ||
| for scan in completed_scans: | ||
| key = scan["completed_at"].date() | ||
| if key not in latest_scans_by_day: | ||
| latest_scans_by_day[key] = scan | ||
| # Keep only latest scan per day | ||
| latest_scans_by_day = {} | ||
| for scan in completed_scans: | ||
| key = scan["completed_at"].date() | ||
| if key not in latest_scans_by_day: | ||
| latest_scans_by_day[key] = scan |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #10374 +/- ##
===========================================
+ Coverage 56.85% 93.32% +36.47%
===========================================
Files 87 218 +131
Lines 2846 30567 +27721
===========================================
+ Hits 1618 28528 +26910
- Misses 1228 2039 +811
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
Closing this in favor of #10379. After reviewing the approach I realized the The replacement PR keeps The |
EDIT: Superseded by #10379. Closing this in favor of a simpler approach that fixes the same bug without changing any call sites. The new PR uses Django's
execute_wrapperAPI insiderls_transactionto intercept mid-queryOperationalErroron the replica, retry with backoff, and fall back to primary. Samewith rls_transaction(...)interface, ~80 lines changed in one file instead of 2400 across 35.Context
When the read replica dies mid-query, the retry and primary fallback logic in
rls_transactionnever executes. The function is a@contextmanagergenerator that can only yield once. After yielding the cursor to the caller, any error hits a guard that re-raises immediately — the retry loop is unreachable.Also,
REPLICA_MAX_ATTEMPTS=3only gives 2 replica tries because the primary fallback consumes one.Description
Rewrites
rls_transactionfrom a@contextmanagerto a class with an iterator protocol. All call sites change from withrls_transaction(...)tofor attempt in rls_transaction(...): with attempt:. The for loop re-executes the entire body on failure, so both connection-setup errors and mid-query errors are retried.Changes:
rls_transactionis now a class that yields_RLSAttemptcontext managers_RLSAttempt.__enter__retries connection-setup failures inline_RLSAttempt.__exit__catches mid-query failures and lets the loop continueREPLICA_MAX_ATTEMPTS=3now means 3 replica tries + 1 primary fallbackintegrations.pyfor/withpattern, over 100 of eachSteps to review
Deep look at:
api/src/backend/api/db_utils.pyrenderers.py(conditional),integrations.py(deleted manual retry),scan.py(deadlock loop)test_db_utils.pyfor mid-body retry and max attempts semanticsRun tests and use the application locally.
Checklist
API
License
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.