fix: stream-parse-and-store to prevent OOM on large CSV downloads#27
fix: stream-parse-and-store to prevent OOM on large CSV downloads#27akkaouim merged 4 commits intojjackson:labs-mainfrom
Conversation
The MBW dashboard crashed on production with "Connection lost" when loading large opportunities (44K-50K records, ~669 MB CSV). Root cause: stream_raw_visits held the full CSV as bytes, BytesIO copied it (1.34 GB), then parsing accumulated another ~669 MB — totaling ~2 GB on a 1 GB container. Fix: download CSV to a temp file (0 bytes in Python memory), parse in 1000-row chunks, store each chunk to DB immediately with form_json, and keep only slim dicts (no form_json) in Python. Peak memory drops from ~2 GB to ~50 MB. Changes: - csv_parsing.py: add parse_csv_file_chunks() generator - backend.py: stream to temp file + _parse_and_store_streaming() - cache.py: add store_raw_visits_start/batch for batched inserts - pipeline.py: track raw_data_already_stored, pass skip_raw_store - views.py: del all_pipeline_rows after last usage Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
📝 WalkthroughWalkthroughAdds disk-backed, chunked CSV parsing and streaming storage in the SQL backend with batched cache insert APIs; pipeline now consumes backend streaming events and can skip redundant raw-storage; MBW monitoring gains a non-blocking mid-stream CommCare re-auth helper and UI/state guard logic. Changes
Sequence DiagramsequenceDiagram
participant Pipeline
participant SQLBackend
participant CSVParser
participant CacheManager
participant Database
Pipeline->>SQLBackend: request processing (may stream)
SQLBackend->>SQLBackend: download CSV -> temp file
SQLBackend->>CSVParser: parse_csv_file_chunks(csv_path)
loop per chunk
CSVParser-->>SQLBackend: yield list[visit dicts]
SQLBackend->>CacheManager: store_raw_visits_start(total_estimate) (once)
SQLBackend->>CacheManager: store_raw_visits_batch(chunk_dicts)
CacheManager->>Database: bulk insert pending rows
Database-->>CacheManager: insert result
SQLBackend->>SQLBackend: strip form_json -> slim dicts (if storing)
SQLBackend-->>Pipeline: emit progress / yield slim dicts
end
SQLBackend->>SQLBackend: store_raw_visits_finalize(actual_count)
SQLBackend->>SQLBackend: delete temp file
SQLBackend-->>Pipeline: final result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
commcare_connect/labs/analysis/pipeline.py (1)
370-421: Consider extracting the repeatedstream_raw_visitsevent loop into a helper.The same event-handling logic appears in three branches; consolidating it would reduce drift risk and simplify future fixes.
Also applies to: 499-532, 603-630
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@commcare_connect/labs/analysis/pipeline.py` around lines 370 - 421, The event-handling for self.backend.stream_raw_visits (handling "cached"/"progress"/"parsing"/"complete" events and setting visit_dicts and raw_data_already_stored, yielding EVENT_STATUS/EVENT_DOWNLOAD) is duplicated; extract it into a single helper on the Pipeline class (e.g., _stream_raw_visits_handler or _consume_stream_raw_visits) that accepts opportunity_id, access_token, expected_visit_count, force_refresh, tolerance and yields the same event tuples while returning or yielding the final visit_dicts and raw_data_already_stored flag; replace the three inline loops (the block around self.backend.stream_raw_visits and the similar blocks at the other locations) with calls to this helper so the logic for logging, setting visit_dicts/raw_data_already_stored, and yielding EVENT_STATUS/EVENT_DOWNLOAD is centralized.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@commcare_connect/labs/analysis/backends/sql/backend.py`:
- Around line 237-241: The code detects parsed-row mismatch (visit_count vs
raw_line_count) but only logs it; fix by updating the cache metadata to reflect
the true parsed count (set actual_count to raw_line_count - 1 or visit_count as
appropriate) after batch ingestion completes so subsequent cache validity checks
use accurate metadata: locate the block using variables visit_count,
raw_line_count and the logger.warning and update the cache finalization step to
write actual_count (the corrected row count) into the raw-cache metadata store
(the same place where the estimated visit_count was originally written),
ensuring this update occurs in all the branches noted around the existing checks
(including the blocks around lines where visit_count is first used to write
metadata and the later logging branch).
In `@commcare_connect/labs/analysis/backends/sql/cache.py`:
- Around line 152-163: store_raw_visits_start currently clears the cache and
sets _pending_visit_count/_pending_expires_at but metadata is visible while
batches are still being written; fix by implementing a two-phase commit: keep
rows invisible until finalize. Concretely, add a "committed" marker on
RawVisitCache (or use a pending boolean/NULL expires_at) and ensure
store_raw_visits_batch inserts rows with committed=False (and does not set final
metadata), keep store_raw_visits_start only initializing _pending_* and clearing
old rows, and add a new store_raw_visits_finalize (called after the last batch)
that flips committed=True (or writes the final expires_at/_pending_visit_count)
in a single atomic update so readers only see complete cache when finalized;
update any reader queries to filter committed=True (or non-NULL expires_at) and
modify store_raw_visits_batch/store_raw_visits_start to use these fields.
---
Nitpick comments:
In `@commcare_connect/labs/analysis/pipeline.py`:
- Around line 370-421: The event-handling for self.backend.stream_raw_visits
(handling "cached"/"progress"/"parsing"/"complete" events and setting
visit_dicts and raw_data_already_stored, yielding EVENT_STATUS/EVENT_DOWNLOAD)
is duplicated; extract it into a single helper on the Pipeline class (e.g.,
_stream_raw_visits_handler or _consume_stream_raw_visits) that accepts
opportunity_id, access_token, expected_visit_count, force_refresh, tolerance and
yields the same event tuples while returning or yielding the final visit_dicts
and raw_data_already_stored flag; replace the three inline loops (the block
around self.backend.stream_raw_visits and the similar blocks at the other
locations) with calls to this helper so the logic for logging, setting
visit_dicts/raw_data_already_stored, and yielding EVENT_STATUS/EVENT_DOWNLOAD is
centralized.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
commcare_connect/labs/analysis/backends/csv_parsing.pycommcare_connect/labs/analysis/backends/sql/backend.pycommcare_connect/labs/analysis/backends/sql/cache.pycommcare_connect/labs/analysis/pipeline.pycommcare_connect/workflow/templates/mbw_monitoring/views.py
- Add `cancelled` flag to SSE useEffect preventing React StrictMode double-mount from creating duplicate SSE connections - Use per-writer negative sentinel in store_raw_visits_finalize to prevent cross-contamination when concurrent writers share the same opportunity - Skip FLW history fetch when reopening a saved run (workers already selected) - Extract _consume_raw_visits_stream helper to deduplicate 3 inline loops Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When CCHQ OAuth expires during the ~5-minute Connect data download, the stream now attempts auto-refresh via refresh_token first. If that fails, it pauses the SSE stream, sends an auth_required event to the frontend which shows a modal overlay prompting re-authorization in a new tab, and polls the session DB every 10s for up to 5 minutes. The stream resumes automatically once the user re-authenticates. Backend: _ensure_cchq_oauth() generator helper called at Steps 2b and 4 Frontend: sseAuthRequired state + modal overlay with auto-dismiss Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
commcare_connect/workflow/templates/mbw_monitoring/template.py (1)
292-314:⚠️ Potential issue | 🟠 MajorAdd
cancelledguard inside async callbacks incheckOAuthAndStream.The guard at line 292 only prevents the function from being invoked, but the
.then()and.catch()handlers at lines 297–313 still execute after unmount and trigger state updates. Addif (cancelled) return;inside both callbacks to match the pattern used in the snapshot fetch at line 323.🔧 Suggested fix
function checkOAuthAndStream(bustCache) { if (cancelled) return; setOauthStatus(null); setSseMessages(['Checking authentication...']); fetch('/custom_analysis/mbw_monitoring/api/oauth-status/?next=' + encodeURIComponent(window.location.pathname + window.location.search)) .then(function(r) { return r.json(); }) .then(function(status) { + if (cancelled) return; var expired = []; if (!status.connect?.active) expired.push('connect'); if (!status.commcare?.active) expired.push('commcare'); if (!status.ocs?.active) expired.push('ocs'); // Always store OAuth status (used by inline task OCS check) setOauthStatus(status); // Connect + CommCare are required; OCS is optional if (!status.connect?.active || !status.commcare?.active) { setSseMessages([]); return; } startSSEStream(bustCache); }) .catch(function() { + if (cancelled) return; // Network error checking OAuth — proceed anyway, SSE will fail with its own error startSSEStream(bustCache); }); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@commcare_connect/workflow/templates/mbw_monitoring/template.py` around lines 292 - 314, In checkOAuthAndStream add the same cancelled guard inside the async callbacks: at the start of the .then(function(status) { ... }) handler and at the start of the .catch(function() { ... }) handler check "if (cancelled) return;" before calling setOauthStatus, setSseMessages or startSSEStream so state updates and stream startup are skipped when the operation was cancelled; ensure you guard both the success path (before setOauthStatus / setSseMessages / startSSEStream) and the error path (before startSSEStream).
🧹 Nitpick comments (1)
commcare_connect/workflow/templates/mbw_monitoring/template.py (1)
1034-1059: Add modal accessibility semantics.The overlay should expose dialog semantics and keyboard handling (
role="dialog",aria-modal="true", focus management, Escape close) so keyboard/screen-reader users can complete re-auth reliably.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@commcare_connect/workflow/templates/mbw_monitoring/template.py` around lines 1034 - 1059, The overlay rendered when sseAuthRequired is truthy lacks modal/dialog semantics and keyboard handling; update the wrapper div that currently renders this UI to include role="dialog" and aria-modal="true", ensure focus is moved into the modal when it opens (focus the first interactive element such as the authorize link), trap focus within the dialog while open, handle Escape to call setSseAuthRequired(null) to close, and restore focus to the previously focused element when the modal closes; implement these behaviors in the component that uses sseAuthRequired and setSseAuthRequired so screen-reader and keyboard users can interact with the re-auth flow reliably.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@commcare_connect/labs/analysis/backends/sql/backend.py`:
- Around line 172-174: The slim-cache path causes N+1 queries because
_load_from_cache(..., skip_form_json=True) defers form_json but
_model_to_visit_dict(row) unconditionally accesses row.form_json; change
_model_to_visit_dict to accept a skip_form_json (or include_form_json) parameter
and, when skip_form_json is True, do NOT access row.form_json (set form_json to
{} or omit it) so the deferred field is never touched. Update callers (notably
_load_from_cache and the place that yields ("cached", visit_dicts)) to pass the
flag through to _model_to_visit_dict so cached/slim loads avoid per-row DB
fetches.
In `@commcare_connect/labs/analysis/backends/sql/cache.py`:
- Around line 167-170: The current implementation deletes RawVisitCache rows at
the start of store_raw_visits_start() but allows concurrent writers for the same
opportunity to interleave across store_raw_visits_batch() and
store_raw_visits_finalize(), causing mixed/duplicate rows; fix this by
introducing a per-opportunity lock acquired at the beginning of
store_raw_visits_start() (before RawVisitCache.objects.filter(...).delete()) and
held across all subsequent store_raw_visits_batch() calls and released in
store_raw_visits_finalize(), so each opportunity’s writer runs serially; use a
reproducible lock key derived from opportunity_id and implement the lock with
your chosen mechanism (e.g. Django cache lock or DB advisory lock) and ensure
lock acquisition errors are handled and the lock is always released in
finalization.
---
Outside diff comments:
In `@commcare_connect/workflow/templates/mbw_monitoring/template.py`:
- Around line 292-314: In checkOAuthAndStream add the same cancelled guard
inside the async callbacks: at the start of the .then(function(status) { ... })
handler and at the start of the .catch(function() { ... }) handler check "if
(cancelled) return;" before calling setOauthStatus, setSseMessages or
startSSEStream so state updates and stream startup are skipped when the
operation was cancelled; ensure you guard both the success path (before
setOauthStatus / setSseMessages / startSSEStream) and the error path (before
startSSEStream).
---
Nitpick comments:
In `@commcare_connect/workflow/templates/mbw_monitoring/template.py`:
- Around line 1034-1059: The overlay rendered when sseAuthRequired is truthy
lacks modal/dialog semantics and keyboard handling; update the wrapper div that
currently renders this UI to include role="dialog" and aria-modal="true", ensure
focus is moved into the modal when it opens (focus the first interactive element
such as the authorize link), trap focus within the dialog while open, handle
Escape to call setSseAuthRequired(null) to close, and restore focus to the
previously focused element when the modal closes; implement these behaviors in
the component that uses sseAuthRequired and setSseAuthRequired so screen-reader
and keyboard users can interact with the re-auth flow reliably.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
commcare_connect/labs/analysis/backends/sql/backend.pycommcare_connect/labs/analysis/backends/sql/cache.pycommcare_connect/labs/analysis/pipeline.pycommcare_connect/workflow/templates/mbw_monitoring/template.pycommcare_connect/workflow/templates/mbw_monitoring/views.py
…d guards 1. Fix N+1 queries in slim-cache path: pass skip_form_json to _model_to_visit_dict so deferred form_json is never accessed 2. Concurrent writer safety: move DELETE from store_raw_visits_start to store_raw_visits_finalize (atomic), filter sentinel rows from reader queries via visit_count__gt=0 3. Add cancelled guards to checkOAuthAndStream .then/.catch callbacks 4. Modal accessibility: add role="dialog", aria-modal, Escape handler 5. Prevent compounding OAuth wait loops: per-request _cchq_oauth_unavailable flag short-circuits after first timeout Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
commcare_connect/workflow/templates/mbw_monitoring/template.py (1)
242-285:⚠️ Potential issue | 🟠 MajorGuard
EventSourcecallbacks withcancelledbefore mutating state.Line 210 introduces cancellation control, but Line 242 and Line 280 handlers still update state unconditionally. Late SSE events from a previous stream can still race and overwrite current UI state.
Suggested fix
es.onmessage = function(event) { + if (cancelled) { + es.close(); + return; + } try { var parsed = JSON.parse(event.data); if (parsed.error) { setSseError(parsed.error); if (parsed.authorize_url) { setSseAuthorizeUrl(parsed.authorize_url); } es.close(); return; } // Mid-stream auth prompt (stream is alive — don't close!) if (parsed.auth_required) { setSseAuthRequired({ message: parsed.message, authorize_url: parsed.authorize_url }); return; } // Auto-dismiss modal when stream resumes setSseAuthRequired(null); if (parsed.message === 'Complete!' && parsed.data) { setDashData(parsed.data); setSseComplete(true); setFromSnapshot(false); setSnapshotTimestamp(null); if (parsed.data.monitoring_session?.flw_results) { setWorkerResults(parsed.data.monitoring_session.flw_results); } es.close(); } else if (parsed.message) { setSseMessages(function(prev) { return prev.concat([parsed.message]); }); } } catch (e) { console.error('SSE parse error:', e); } }; es.onerror = function() { + if (cancelled) { + es.close(); + return; + } if (!sseComplete) { setSseError('Connection lost. Please refresh the page.'); } es.close(); };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@commcare_connect/workflow/templates/mbw_monitoring/template.py` around lines 242 - 285, The SSE handlers (es.onmessage and es.onerror) currently mutate React state even after a stream has been cancelled; guard both handlers with the cancellation flag introduced earlier by returning early when cancelled and only performing state updates if !cancelled. Specifically, at the top of es.onmessage return immediately if cancelled before any setSseError/setSseAuthRequired/setDashData/setSseComplete/setFromSnapshot/setSnapshotTimestamp/setWorkerResults/setSseMessages calls, and in es.onerror only call setSseError (and any other state setters) when !cancelled && !sseComplete, then close the EventSource as before.
🧹 Nitpick comments (1)
commcare_connect/labs/analysis/backends/sql/backend.py (1)
218-221: Consider usinglogging.exceptionfor automatic traceback capture.When logging errors from exception handlers,
logging.exceptionautomatically includes the traceback without needingexc_info=True.♻️ Suggested improvement
except httpx.TimeoutException as e: - logger.error(f"[SQL] Timeout downloading for opp {opportunity_id}: {e}") + logger.exception(f"[SQL] Timeout downloading for opp {opportunity_id}") sentry_sdk.capture_exception(e) raise RuntimeError("Connect API timeout") from e🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@commcare_connect/labs/analysis/backends/sql/backend.py` around lines 218 - 221, The except block catching httpx.TimeoutException (the handler that currently calls logger.error, sentry_sdk.capture_exception(e), then raises RuntimeError) should log the exception with traceback automatically by using logger.exception instead of logger.error (or logger.error(..., exc_info=True)); update the code in that except httpx.TimeoutException as e block to call logger.exception with the existing message before capturing the exception to sentry and re-raising the RuntimeError so the full traceback is included in the logs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@commcare_connect/workflow/templates/mbw_monitoring/template.py`:
- Around line 1047-1051: The anchor renders even when
sseAuthRequired.authorize_url is missing; update the template to check
sseAuthRequired.authorize_url (or parsed.authorize_url) and render a safe
fallback when null: if authorize_url exists render the current <a> link,
otherwise render a disabled button or non-clickable element with the same
styling and an informative label (e.g., "Authorization unavailable") so the CTA
is not clickable; ensure the conditional uses the exact symbol
sseAuthRequired.authorize_url (or parsed.authorize_url if that variable is used
elsewhere) to locate the code to change.
---
Outside diff comments:
In `@commcare_connect/workflow/templates/mbw_monitoring/template.py`:
- Around line 242-285: The SSE handlers (es.onmessage and es.onerror) currently
mutate React state even after a stream has been cancelled; guard both handlers
with the cancellation flag introduced earlier by returning early when cancelled
and only performing state updates if !cancelled. Specifically, at the top of
es.onmessage return immediately if cancelled before any
setSseError/setSseAuthRequired/setDashData/setSseComplete/setFromSnapshot/setSnapshotTimestamp/setWorkerResults/setSseMessages
calls, and in es.onerror only call setSseError (and any other state setters)
when !cancelled && !sseComplete, then close the EventSource as before.
---
Nitpick comments:
In `@commcare_connect/labs/analysis/backends/sql/backend.py`:
- Around line 218-221: The except block catching httpx.TimeoutException (the
handler that currently calls logger.error, sentry_sdk.capture_exception(e), then
raises RuntimeError) should log the exception with traceback automatically by
using logger.exception instead of logger.error (or logger.error(...,
exc_info=True)); update the code in that except httpx.TimeoutException as e
block to call logger.exception with the existing message before capturing the
exception to sentry and re-raising the RuntimeError so the full traceback is
included in the logs.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
commcare_connect/labs/analysis/backends/sql/backend.pycommcare_connect/labs/analysis/backends/sql/cache.pycommcare_connect/workflow/templates/mbw_monitoring/template.pycommcare_connect/workflow/templates/mbw_monitoring/views.py
The MBW dashboard crashed on production with "Connection lost" when loading large opportunities (44K-50K records, ~669 MB CSV). Root cause: stream_raw_visits held the full CSV as bytes, BytesIO copied it (1.34 GB), then parsing accumulated another ~669 MB — totaling ~2 GB on a 1 GB container.
Fix: download CSV to a temp file (0 bytes in Python memory), parse in 1000-row chunks, store each chunk to DB immediately with form_json, and keep only slim dicts (no form_json) in Python. Peak memory drops from ~2 GB to ~50 MB.
Changes:
Summary by CodeRabbit
New Features
Refactor
Bug Fixes