Skip to content

Continued work on mbw v1 using updated pipeline work#25

Merged
akkaouim merged 5 commits intojjackson:labs-mainfrom
akkaouim:labs-mbw-v3
Feb 27, 2026
Merged

Continued work on mbw v1 using updated pipeline work#25
akkaouim merged 5 commits intojjackson:labs-mainfrom
akkaouim:labs-mbw-v3

Conversation

@akkaouim
Copy link
Collaborator

@akkaouim akkaouim commented Feb 27, 2026

feat: port v2 features to labs-mbw-v3, fix GPS/pipeline/V2 template issues

Port 8 commits from labs-mbw-workflow-v2 branch and fix several blocking issues discovered during testing:

  • Fix refreshTrigger render error by removing dead toggleGuide function from SSE useEffect (broke V2 template string matching)
  • Fix UUID visit_id cast to int in SQL backend (CCHQ form IDs are UUIDs)
  • Make registrations & GS forms pipelines optional in V2 template so dashboards work on QA domains without gold standard forms
  • Fix empty GPS tab: add field.extractor support to SQL backend (query_builder + backend post-processing) and convert visit_datetime and app_build_version to path-based extraction

Other ported changes: SSE heartbeat, deliver_unit_id fix, pghistory LabsUser middleware fix, duplicate run prevention, case-insensitive FLW username matching, visits_by_flw embedding, OCS session linking, getAISessions action, dashboard guide.

Summary by CodeRabbit

  • New Features

    • AI session retrieval for tasks; SSE heartbeat support for long-running streams; create-and-redirect workflow run flow
  • Improvements

    • Per-FLW last-active tracking and propagation; GPS visits grouped for per-FLW drill-downs
    • Pipeline extraction refined for visit datetime, app build version, and GPS handling
  • Bug Fixes

    • More resilient visit-data post-processing and extractor handling; null-handling for certain form fields; safer AI session linking; skip transient users in middleware
  • Documentation

    • Added MBW Monitoring Dashboard guide and updated pipeline extraction docs

…ssues

Port 8 commits from labs-mbw-workflow-v2 branch and fix several blocking
issues discovered during testing:

- Fix refreshTrigger render error by removing dead toggleGuide function
  from SSE useEffect (broke V2 template string matching)
- Fix UUID visit_id cast to int in SQL backend (CCHQ form IDs are UUIDs)
- Make registrations & GS forms pipelines optional in V2 template so
  dashboards work on QA domains without gold standard forms
- Fix empty GPS tab: add field.extractor support to SQL backend
  (query_builder + backend post-processing) and convert visit_datetime
  and app_build_version to path-based extraction

Other ported changes: SSE heartbeat, deliver_unit_id fix, pghistory
LabsUser middleware fix, duplicate run prevention, case-insensitive
FLW username matching, visits_by_flw embedding, OCS session linking,
getAISessions action, dashboard guide.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link

coderabbitai bot commented Feb 27, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds SQL visit-context normalization with a new _build_visit_dict, defers extractor-backed field extraction to post-query processing, propagates per-FLW last-active via fetch_flw_names, adds SSE heartbeat wrapping, ties AI triggers to returned session IDs, updates MBW pipeline extraction/docs/UI readiness, and introduces run-creation redirect behavior.

Changes

Cohort / File(s) Summary
SQL backend & extractor handling
commcare_connect/labs/analysis/backends/sql/backend.py, commcare_connect/labs/analysis/backends/sql/query_builder.py
Adds _build_visit_dict(row) and makes visit-dict construction lazy; query_builder detects field.extractor and emits NULL placeholders while marking fields for post-query extraction; visit_id uses row.id and visit_date uses row.visit_date.
CCHQ form normalization
commcare_connect/labs/analysis/backends/sql/cchq_fetcher.py
normalize_cchq_form_to_visit_dict now uses None instead of "" for deliver_unit_id and completed_work_id.
Data access & FLW last-active
commcare_connect/labs/analysis/data_access.py, commcare_connect/workflow/templates/mbw_monitoring/flw_api.py, commcare_connect/workflow/templates/mbw_monitoring/views.py
fetch_flw_names signature gains optional last_active_out; caches and returns per-FLW last_active and populates last_active_out when provided; MBW views prefer lowercased username lookup and worker_results with backward fallback to flw_results, and integrate last-active + GS form data into summaries.
SSE streaming
commcare_connect/labs/analysis/sse_streaming.py
Adds heartbeat_enabled and heartbeat_interval and _with_heartbeat(...) to wrap stream generators and emit periodic SSE heartbeats when idle.
AI task flows & frontend API
commcare_connect/tasks/views.py, commcare_connect/static/js/workflow-runner.tsx, components/workflow/types.ts
AI trigger flow captures trigger_bot result, derives session_id/status from the result and records session accordingly; frontend adds getAISessions(taskId) ActionHandler and type.
Workflow run creation & middleware
commcare_connect/workflow/views.py, commcare_connect/utils/middleware.py
WorkflowRunView.get can create a run and redirect when no run_id; middleware skips adding user details unless the authenticated user object has _meta.
MBW pipeline extraction & docs
commcare_connect/workflow/templates/mbw_monitoring/pipeline_config.py, commcare_connect/workflow/templates/mbw_monitoring/DOCUMENTATION.md, commcare_connect/workflow/templates/mbw_monitoring/DASHBOARD_GUIDE.md
Moves visit_datetime and app_build_version from extractor to path-based extraction (adds _safe_parse_int), documents gps_location as extractor (SQL NULL placeholder + Python post-processing), and adds dashboard guide.
UI pipeline readiness & templates
commcare_connect/workflow/templates/mbw_monitoring_v2.py, commcare_connect/workflow/templates/mbw_monitoring/pipeline_config.py
Adjusts readiness checks and indicator text/icon logic to treat pipeline presence vs strict row-count and update status messaging.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Frontend as Workflow UI
    participant Server as tasks.views
    participant Trigger as trigger_bot
    participant AIService as AI backend
    participant DB

    Client->>Frontend: user triggers AI task
    Frontend->>Server: POST /tasks/<id>/trigger
    Server->>Trigger: trigger_bot(payload)
    Trigger->>AIService: start session / request
    AIService-->>Trigger: response (may include session_id, status)
    Trigger-->>Server: result (contains session_id/status)
    Server->>DB: create AI session record (session_id, status)
    Server-->>Frontend: JSON { success, session_id, status }
    Frontend-->>Client: display session info
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Poem

🐇 I hop through rows and parse each nested field,
Heartbeats hum to keep the streams from sleep,
Last-active tracked, sessions tied to what we yield,
Extractors wait — then Python stitches what they keep,
A tiny rabbit cheers the new code we now seed.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 inconclusive)

Check name Status Explanation Resolution
Title check ❓ Inconclusive The title is partially related to the changeset, referring to mbw v1 pipeline updates, but does not clearly summarize the primary changes. It lacks specificity about the key fixes and additions. Consider a more specific title that highlights the main blocking issue resolved (e.g., 'Fix blocking issues in mbw v1: extractor support, optional pipelines, and GS forms') or key additions (visit_id casting, extractor field support, SSE heartbeat, etc.).
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed Docstring coverage is 82.61% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
commcare_connect/workflow/templates/mbw_monitoring_v2.py (1)

346-355: ⚠️ Potential issue | 🟠 Major

Optional pipeline handling is still effectively required.

pipelinesReady still hard-requires registrations and gs_forms objects to exist. If either alias is intentionally omitted, analysis never becomes ready and the UI stays stuck waiting.

Suggested fix
-    var pipelinesReady = pipelines
-        && pipelines.visits && pipelines.visits.rows && pipelines.visits.rows.length > 0
-        && pipelines.registrations && pipelines.registrations.rows
-        && pipelines.gs_forms && pipelines.gs_forms.rows;
+    var visitsReady = !!(pipelines && pipelines.visits && Array.isArray(pipelines.visits.rows) && pipelines.visits.rows.length > 0);
+    var registrationsReady = !pipelines?.registrations || Array.isArray(pipelines.registrations.rows);
+    var gsFormsReady = !pipelines?.gs_forms || Array.isArray(pipelines.gs_forms.rows);
+    var pipelinesReady = visitsReady && registrationsReady && gsFormsReady;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/workflow/templates/mbw_monitoring_v2.py` around lines 346 -
355, The readiness check currently forces registrations and gs_forms to exist
(pipelinesReady) even when those aliases are intentionally omitted; change
pipelinesReady to only require that every pipeline that is present has non-empty
rows. Replace the hard-coded AND chain with a dynamic check over the keys (e.g.,
['visits','registrations','gs_forms']) that: for each key, if pipelines[key] is
defined then require pipelines[key].rows && pipelines[key].rows.length > 0; set
pipelinesReady true if pipelines is defined and all present pipelines pass that
check (pipelinesPartial should keep its current semantics of "any present
pipeline has rows"). Use the symbols pipelinesReady and pipelinesPartial to
locate and update the logic.
🧹 Nitpick comments (3)
commcare_connect/workflow/views.py (1)

208-220: Close WorkflowDataAccess in a finally block.

If create_run() raises, data_access.close() is skipped. This is a reliability footgun in request handling.

Suggested fix
-                data_access = WorkflowDataAccess(request=request)
-                today = datetime.now().date()
-                week_start = today - timedelta(days=today.weekday())
-                week_end = week_start + timedelta(days=6)
-                run = data_access.create_run(
-                    definition_id=definition_id,
-                    opportunity_id=opportunity_id,
-                    period_start=week_start.isoformat(),
-                    period_end=week_end.isoformat(),
-                    initial_state={"worker_states": {}},
-                )
-                data_access.close()
+                data_access = WorkflowDataAccess(request=request)
+                try:
+                    today = datetime.now().date()
+                    week_start = today - timedelta(days=today.weekday())
+                    week_end = week_start + timedelta(days=6)
+                    run = data_access.create_run(
+                        definition_id=definition_id,
+                        opportunity_id=opportunity_id,
+                        period_start=week_start.isoformat(),
+                        period_end=week_end.isoformat(),
+                        initial_state={"worker_states": {}},
+                    )
+                finally:
+                    data_access.close()
                 return redirect(f"{request.path}?run_id={run.id}")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/workflow/views.py` around lines 208 - 220, The
WorkflowDataAccess instance is closed only after create_run succeeds, so if
create_run raises the connection isn't closed; wrap the usage of
WorkflowDataAccess in a try/finally (or context manager) so data_access.close()
is always called: create the instance via data_access =
WorkflowDataAccess(request=request), call data_access.create_run(...) inside the
try block, and invoke data_access.close() in the finally block (or implement/use
__enter__/__exit__ on WorkflowDataAccess and use a with-statement) to ensure
cleanup even on exceptions.
commcare_connect/labs/analysis/backends/sql/backend.py (1)

472-494: LGTM with optional refactor suggestion.

The extractor field post-processing correctly mirrors the existing transform handling pattern. The broad Exception catch (line 492) is appropriate here since extractors are callable functions that could raise arbitrary exceptions — logging the failure and returning None is the right defensive approach.

Optional: The visit_dict construction (lines 484-490) is duplicated from the transform block (lines 460-466). Consider extracting a helper function to reduce duplication.

♻️ Optional: Extract visit_dict construction helper
def _build_visit_dict_for_transform(row: dict) -> dict:
    """Build visit dict for transform/extractor post-processing."""
    import json

    form_json = row.get("form_json", {})
    if isinstance(form_json, str):
        form_json = json.loads(form_json) if form_json else {}

    images = row.get("images", [])
    if isinstance(images, str):
        images = json.loads(images) if images else []

    return {
        "form_json": form_json,
        "images": images,
        "username": row.get("username"),
        "visit_date": row.get("visit_date"),
        "entity_name": row.get("entity_name"),
    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/labs/analysis/backends/sql/backend.py` around lines 472 -
494, Extract the duplicated visit_dict construction into a shared helper and
call it from both the transform and extractor branches: create a function (e.g.
_build_visit_dict_for_transform) that takes row, handles JSON decoding for
"form_json" and "images" and returns the dict with "form_json", "images",
"username", "visit_date", and "entity_name"; then replace the inline visit_dict
creation in the field.extractor branch (where field.extractor is called) and the
corresponding transform branch (where transform is applied) to call this helper
instead.
commcare_connect/workflow/templates/mbw_monitoring/DASHBOARD_GUIDE.md (1)

661-693: Minor: Add blank lines around tables for markdown best practices.

The Color Coding Reference section has tables that should be surrounded by blank lines per markdownlint MD058.

📝 Example fix for one table section
 ### Follow-Up Rate / GS Score Colors
+
 | Color | Follow-up Rate | GS Score |
 |-------|---------------|----------|
 | Green | ≥ 80% | ≥ 70% |
 | Yellow | 60-79% | 50-69% |
 | Red | < 60% | < 50% |
+
 ### EBF Colors
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/workflow/templates/mbw_monitoring/DASHBOARD_GUIDE.md` around
lines 661 - 693, The tables in the "Color Coding Reference" area (the 3-column
Follow-up/GS table, the "EBF Colors" table, the "Eligible 5+ / % Still Eligible
Colors" table, the "Last Active Colors" table, and the "GPS Flags" table) need a
blank line before and after each table to satisfy markdownlint MD058; edit
DASHBOARD_GUIDE.md (the Color Coding Reference section) and insert a single
blank line above and below each pipe-table block so each table is separated from
surrounding headings and paragraphs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@commcare_connect/labs/analysis/data_access.py`:
- Around line 53-61: The early return after reading cached =
cache.get(cache_key) incorrectly returns cached FLW names even when
last_active_out was requested but flw_last_active_{opportunity_id} is missing;
change the logic in the block around cache_key/last_active_out/opportunity_id so
that if last_active_out is not None you only return the cached names when the
corresponding la_cached (cache.get(f"flw_last_active_{opportunity_id}")) is
present — otherwise do not return early and allow the function to load fresh
last_active and names (i.e., skip the return when la_cached is falsy so the code
will proceed to populate last_active_out and then return).

In `@commcare_connect/labs/analysis/sse_streaming.py`:
- Around line 114-123: The producer currently uses an unbounded data_queue which
can grow without bound in _producer when iterating over generator; change
data_queue to a bounded queue (e.g., queue.Queue(maxsize=MAX_QUEUE_SIZE)) and
update _producer to use data_queue.put(item, timeout=PUT_TIMEOUT) in a loop that
catches queue.Full and either retries or drops/backs off while periodically
checking stop_event; ensure the put call respects stop_event (break if set) and
handle Interrupted/Full exceptions so the producer blocks or drops safely
instead of unboundedly buffering.

In `@commcare_connect/tasks/views.py`:
- Around line 810-815: The current session_id extraction calls
result.get("session", {}).get("id") which will raise if result["session"] exists
but is not a dict; change the logic in the session_id computation to first fetch
session = result.get("session") and only call session.get("id") when
isinstance(session, dict), otherwise fall back to result.get("session_id") or
result.get("id"); update the expression around the existing session_id variable
so it safely handles non-dict session values without raising.
- Around line 804-806: The current logger.info in the trigger_bot handling logs
the entire result (variable result) which can leak participant/session data;
change the logging in the view where trigger_bot is invoked (the logger.info
line referencing task_id and result) to only log minimal diagnostics by
extracting the top-level status (e.g., result.get("status")) and the list of
top-level keys (e.g., list(result.keys())) and log those along with task_id
instead of the full payload; ensure you do this in the same function/method that
currently calls logger.info with "trigger_bot response for task {task_id}:
{result}" so only status and keys are written to logs.

In `@commcare_connect/workflow/templates/mbw_monitoring/pipeline_config.py`:
- Around line 131-133: The transform for the "form.meta.app_build_version" field
currently uses transform=lambda x: int(x) if x else None which will raise
ValueError on malformed values; update the transform used in pipeline_config.py
to a safe parser that: returns None for falsy input, strips whitespace, ignores
non-numeric characters or attempts int conversion inside a try/except, and falls
back to None (or a safe default) on any conversion error; locate the transform
for the path "form.meta.app_build_version" and replace the inline lambda with a
small helper or inline logic that catches exceptions and returns None for bad
input.

In `@commcare_connect/workflow/views.py`:
- Line 220: The redirect currently loses existing query params by using return
redirect(f"{request.path}?run_id={run.id}"); change it to preserve request.GET:
copy the request.GET QueryDict, set/update the 'run_id' key to run.id, urlencode
the modified QueryDict and redirect to f"{request.path}?{query_string}". Update
the redirect logic where the snippet return
redirect(f"{request.path}?run_id={run.id}") appears so existing params like
sync, template, tolerance are retained.

---

Outside diff comments:
In `@commcare_connect/workflow/templates/mbw_monitoring_v2.py`:
- Around line 346-355: The readiness check currently forces registrations and
gs_forms to exist (pipelinesReady) even when those aliases are intentionally
omitted; change pipelinesReady to only require that every pipeline that is
present has non-empty rows. Replace the hard-coded AND chain with a dynamic
check over the keys (e.g., ['visits','registrations','gs_forms']) that: for each
key, if pipelines[key] is defined then require pipelines[key].rows &&
pipelines[key].rows.length > 0; set pipelinesReady true if pipelines is defined
and all present pipelines pass that check (pipelinesPartial should keep its
current semantics of "any present pipeline has rows"). Use the symbols
pipelinesReady and pipelinesPartial to locate and update the logic.

---

Nitpick comments:
In `@commcare_connect/labs/analysis/backends/sql/backend.py`:
- Around line 472-494: Extract the duplicated visit_dict construction into a
shared helper and call it from both the transform and extractor branches: create
a function (e.g. _build_visit_dict_for_transform) that takes row, handles JSON
decoding for "form_json" and "images" and returns the dict with "form_json",
"images", "username", "visit_date", and "entity_name"; then replace the inline
visit_dict creation in the field.extractor branch (where field.extractor is
called) and the corresponding transform branch (where transform is applied) to
call this helper instead.

In `@commcare_connect/workflow/templates/mbw_monitoring/DASHBOARD_GUIDE.md`:
- Around line 661-693: The tables in the "Color Coding Reference" area (the
3-column Follow-up/GS table, the "EBF Colors" table, the "Eligible 5+ / % Still
Eligible Colors" table, the "Last Active Colors" table, and the "GPS Flags"
table) need a blank line before and after each table to satisfy markdownlint
MD058; edit DASHBOARD_GUIDE.md (the Color Coding Reference section) and insert a
single blank line above and below each pipe-table block so each table is
separated from surrounding headings and paragraphs.

In `@commcare_connect/workflow/views.py`:
- Around line 208-220: The WorkflowDataAccess instance is closed only after
create_run succeeds, so if create_run raises the connection isn't closed; wrap
the usage of WorkflowDataAccess in a try/finally (or context manager) so
data_access.close() is always called: create the instance via data_access =
WorkflowDataAccess(request=request), call data_access.create_run(...) inside the
try block, and invoke data_access.close() in the finally block (or implement/use
__enter__/__exit__ on WorkflowDataAccess and use a with-statement) to ensure
cleanup even on exceptions.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4fb7998 and e82ace0.

📒 Files selected for processing (16)
  • commcare_connect/labs/analysis/backends/sql/backend.py
  • commcare_connect/labs/analysis/backends/sql/cchq_fetcher.py
  • commcare_connect/labs/analysis/backends/sql/query_builder.py
  • commcare_connect/labs/analysis/data_access.py
  • commcare_connect/labs/analysis/sse_streaming.py
  • commcare_connect/static/js/workflow-runner.tsx
  • commcare_connect/tasks/views.py
  • commcare_connect/utils/middleware.py
  • commcare_connect/workflow/templates/mbw_monitoring/DASHBOARD_GUIDE.md
  • commcare_connect/workflow/templates/mbw_monitoring/flw_api.py
  • commcare_connect/workflow/templates/mbw_monitoring/pipeline_config.py
  • commcare_connect/workflow/templates/mbw_monitoring/template.py
  • commcare_connect/workflow/templates/mbw_monitoring/views.py
  • commcare_connect/workflow/templates/mbw_monitoring_v2.py
  • commcare_connect/workflow/views.py
  • components/workflow/types.ts

akkaouim and others added 2 commits February 27, 2026 20:57
- data_access.py: Skip cache return when last_active_out requested but
  last_active cache is missing (force fresh fetch)
- tasks/views.py: Safe session_id extraction (handle non-dict session);
  log only status/keys instead of full trigger_bot response
- pipeline_config.py: Replace inline lambda with _safe_parse_int that
  catches ValueError/TypeError (SQL backend pattern matching preserved)
- workflow/views.py: Preserve query params on create-run redirect;
  wrap WorkflowDataAccess in try/finally for cleanup on error
- sse_streaming.py: Bound data_queue to maxsize=100 with backpressure
- mbw_monitoring_v2.py: Dynamic pipelinesReady — tolerates missing
  pipeline keys, only visits must have data
- backend.py: Extract _build_visit_dict helper, removing duplication
  between transform and extractor branches
- DASHBOARD_GUIDE.md: Add blank lines around tables (MD058)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Reflect that visit_datetime and app_build_version now use path-based
extraction instead of extractors. Document SQL backend extractor
support (NULL placeholder + Python post-processing) for gps_location.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
commcare_connect/labs/analysis/data_access.py (1)

58-61: ⚠️ Potential issue | 🟠 Major

Treat empty flw_last_active cache as a valid hit.

Line 58 uses a truthy check (if la_cached:). An empty cached dict is valid data, but this path forces a fresh API fetch every time.

✅ Minimal fix
-                    if la_cached:
+                    if la_cached is not None:
                         last_active_out.update(la_cached)
                     else:
                         cached = None  # Force fresh fetch to populate last_active
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/labs/analysis/data_access.py` around lines 58 - 61, The code
treats an empty cached dict as a cache miss by using a truthy check on
la_cached; change the condition to an explicit None check so empty dicts are
accepted as valid hits: replace "if la_cached:" with "if la_cached is not None:"
(keeping the existing branches that call last_active_out.update(la_cached) and
set cached = None), so the flw_last_active cache returns empty dicts without
forcing a fresh fetch.
🧹 Nitpick comments (2)
commcare_connect/tasks/views.py (1)

1095-1095: Consider using timezone-aware UTC datetime.

datetime.now() returns a naive datetime without timezone info. For consistent UTC timestamps, consider using datetime.now(timezone.utc).

 from datetime import datetime
+from datetime import timezone
-                    "saved_at": datetime.now().isoformat(),
+                    "saved_at": datetime.now(timezone.utc).isoformat(),

Based on learnings: "prefer timezone-aware UTC datetimes for timestamp fields by using datetime.now(timezone.utc) instead of naive datetime.now()."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/tasks/views.py` at line 1095, The "saved_at" assignment is
using a naive datetime via datetime.now(); replace it with a timezone-aware UTC
timestamp by using datetime.now(timezone.utc) and ensure the module imports
timezone (e.g., add or update the import to include timezone) so the "saved_at"
value is consistent and timezone-aware across the codebase.
commcare_connect/labs/analysis/backends/sql/backend.py (1)

466-475: Build visit context once per row to avoid repeated parsing.

Line 466 and Line 474 rebuild the same visit context repeatedly inside the field loop, which adds avoidable per-row overhead.

♻️ Suggested refactor
-            for field in config.fields:
+            visit_context = None
+            for field in config.fields:
                 if field.name not in computed_field_names:
                     continue
@@
                     if "visit_data" in params or len(params) == 0:
                         try:
-                            visit_dict = _build_visit_dict(row)
-                            computed[field.name] = field.transform(visit_dict)
+                            if visit_context is None:
+                                visit_context = _build_visit_dict(row)
+                            computed[field.name] = field.transform(visit_context)
                         except Exception as e:
                             logger.warning(f"Transform for {field.name} failed: {e}")
                             computed[field.name] = None

                 elif field.extractor and callable(field.extractor):
                     try:
-                        visit_dict = _build_visit_dict(row)
-                        computed[field.name] = field.extractor(visit_dict)
+                        if visit_context is None:
+                            visit_context = _build_visit_dict(row)
+                        computed[field.name] = field.extractor(visit_context)
                     except Exception as e:
                         logger.warning(f"Extractor for {field.name} failed: {e}")
                         computed[field.name] = None
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/labs/analysis/backends/sql/backend.py` around lines 466 -
475, The loop calls _build_visit_dict(row) multiple times per row (once for
field.transform and again for field.extractor), causing redundant parsing;
compute the visit context once per row and reuse it for all fields instead.
Refactor the code around the field processing so that a single call to
_build_visit_dict(row) (e.g., assign to visit_dict) is done before handling each
field, then pass that visit_dict into field.transform(visit_dict) and
field.extractor(visit_dict); preserve the existing try/except behavior for
field.transform and the callable check for field.extractor while only removing
the duplicated calls to _build_visit_dict.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@commcare_connect/labs/analysis/backends/sql/backend.py`:
- Around line 60-66: In _build_visit_dict, json.loads on row.get("form_json")
and row.get("images") can raise decode errors and cause downstream computed
fields to be dropped; wrap the json.loads calls for form_json and images in
try/except (catching ValueError/JSONDecodeError) and on failure fall back to an
empty dict for form_json and empty list for images (and optionally log/debug the
offending payload), ensuring the function continues to return the visit dict
rather than letting decode errors propagate from _build_visit_dict.

In `@commcare_connect/labs/analysis/sse_streaming.py`:
- Around line 128-132: The except/finally blocks currently call
data_queue.put(...) without timeouts, which can block the producer thread;
update the put calls inside the except and finally in the SSE streaming producer
(the data_queue.put(("error", e)) and data_queue.put(("done", None)) sites) to
use data_queue.put(item, timeout=1) and catch queue.Full (or ignore/skip the
write) so the producer won't hang — mirror the non-blocking behavior used
earlier on the data path (the other data_queue.put call that already uses a
timeout).

In `@commcare_connect/workflow/views.py`:
- Around line 208-225: The view currently calls WorkflowDataAccess.create_run()
without handling exceptions, so any write error bubbles up and returns a 500;
wrap the create_run call in a try/except that catches Exception, logs the error
(using the existing logger or request logger), ensures data_access.close() still
runs (keep the finally), and then return the render path with a user-facing
creation error in the context (e.g., an "error" or "creation_error" key) instead
of redirecting; update the block around WorkflowDataAccess, create_run and the
redirect to check for a successful run before building params/redirect, and
apply the same pattern to the other occurrence noted (lines ~328-332).

---

Duplicate comments:
In `@commcare_connect/labs/analysis/data_access.py`:
- Around line 58-61: The code treats an empty cached dict as a cache miss by
using a truthy check on la_cached; change the condition to an explicit None
check so empty dicts are accepted as valid hits: replace "if la_cached:" with
"if la_cached is not None:" (keeping the existing branches that call
last_active_out.update(la_cached) and set cached = None), so the flw_last_active
cache returns empty dicts without forcing a fresh fetch.

---

Nitpick comments:
In `@commcare_connect/labs/analysis/backends/sql/backend.py`:
- Around line 466-475: The loop calls _build_visit_dict(row) multiple times per
row (once for field.transform and again for field.extractor), causing redundant
parsing; compute the visit context once per row and reuse it for all fields
instead. Refactor the code around the field processing so that a single call to
_build_visit_dict(row) (e.g., assign to visit_dict) is done before handling each
field, then pass that visit_dict into field.transform(visit_dict) and
field.extractor(visit_dict); preserve the existing try/except behavior for
field.transform and the callable check for field.extractor while only removing
the duplicated calls to _build_visit_dict.

In `@commcare_connect/tasks/views.py`:
- Line 1095: The "saved_at" assignment is using a naive datetime via
datetime.now(); replace it with a timezone-aware UTC timestamp by using
datetime.now(timezone.utc) and ensure the module imports timezone (e.g., add or
update the import to include timezone) so the "saved_at" value is consistent and
timezone-aware across the codebase.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e82ace0 and 30c578b.

📒 Files selected for processing (8)
  • commcare_connect/labs/analysis/backends/sql/backend.py
  • commcare_connect/labs/analysis/data_access.py
  • commcare_connect/labs/analysis/sse_streaming.py
  • commcare_connect/tasks/views.py
  • commcare_connect/workflow/templates/mbw_monitoring/DASHBOARD_GUIDE.md
  • commcare_connect/workflow/templates/mbw_monitoring/pipeline_config.py
  • commcare_connect/workflow/templates/mbw_monitoring_v2.py
  • commcare_connect/workflow/views.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • commcare_connect/workflow/templates/mbw_monitoring_v2.py

Comment on lines +208 to +225
data_access = WorkflowDataAccess(request=request)
try:
today = datetime.now().date()
week_start = today - timedelta(days=today.weekday())
week_end = week_start + timedelta(days=6)
run = data_access.create_run(
definition_id=definition_id,
opportunity_id=opportunity_id,
period_start=week_start.isoformat(),
period_end=week_end.isoformat(),
initial_state={"worker_states": {}},
)
finally:
data_access.close()
params = request.GET.copy()
params["run_id"] = str(run.id)
return redirect(f"{request.path}?{params.urlencode()}")

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle run creation failures gracefully instead of returning a 500.

create_run() (Line 213) is an external write call. If it throws, get() fails hard and users never reach a friendly error state. Please catch/log the exception and pass a clear creation error into the render path.

💡 Proposed fix
                 data_access = WorkflowDataAccess(request=request)
                 try:
                     today = datetime.now().date()
                     week_start = today - timedelta(days=today.weekday())
                     week_end = week_start + timedelta(days=6)
                     run = data_access.create_run(
                         definition_id=definition_id,
                         opportunity_id=opportunity_id,
                         period_start=week_start.isoformat(),
                         period_end=week_end.isoformat(),
                         initial_state={"worker_states": {}},
                     )
+                except Exception as e:
+                    logger.error(
+                        f"Failed to create workflow run for definition {definition_id}: {e}",
+                        exc_info=True,
+                    )
+                    request._workflow_create_error = "Could not create a new run. Please try again."
+                    return super().get(request, *args, **kwargs)
                 finally:
                     data_access.close()
                 params = request.GET.copy()
                 params["run_id"] = str(run.id)
                 return redirect(f"{request.path}?{params.urlencode()}")
@@
             else:
                 # No run_id and not edit mode — get() should have redirected.
                 # This branch only executes if opportunity_id was missing at
                 # get() time (no labs context), so show a friendly error.
-                context["error"] = "Could not create a new run. Please select an opportunity."
+                context["error"] = getattr(
+                    self.request,
+                    "_workflow_create_error",
+                    "Could not create a new run. Please select an opportunity.",
+                )
                 return context

Also applies to: 328-332

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/workflow/views.py` around lines 208 - 225, The view
currently calls WorkflowDataAccess.create_run() without handling exceptions, so
any write error bubbles up and returns a 500; wrap the create_run call in a
try/except that catches Exception, logs the error (using the existing logger or
request logger), ensures data_access.close() still runs (keep the finally), and
then return the render path with a user-facing creation error in the context
(e.g., an "error" or "creation_error" key) instead of redirecting; update the
block around WorkflowDataAccess, create_run and the redirect to check for a
successful run before building params/redirect, and apply the same pattern to
the other occurrence noted (lines ~328-332).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@commcare_connect/workflow/templates/mbw_monitoring/DOCUMENTATION.md`:
- Line 572: The doc text is inconsistent: update the earlier summary that
currently says "3 use extractor, 9 use path" to reflect that only gps_location
remains extractor-based (so "1 uses extractor, 11 use path" or equivalent). Edit
the sentence referencing counts (the one mentioning "3 use extractor, 9 use
path") to match the note that gps_location uses extractor and visit_datetime and
app_build_version were converted to path-based; ensure the terms gps_location,
visit_datetime, app_build_version, extractor and path are mentioned if present.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 30c578b and af0c493.

📒 Files selected for processing (1)
  • commcare_connect/workflow/templates/mbw_monitoring/DOCUMENTATION.md

- backend.py: Add try/except around json.loads in _build_visit_dict; lazy visit_dict per row
- sse_streaming.py: Add timeout=1 to error/done queue puts with queue.Full catch
- workflow/views.py: Catch create_run exceptions, log and fall through to error page
- data_access.py: Use `is not None` for la_cached check
- tasks/views.py: Use timezone-aware datetime.now(timezone.utc) for saved_at

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
commcare_connect/workflow/views.py (2)

210-212: Use timezone-aware datetime for consistent date calculation.

datetime.now().date() depends on the server's local timezone, which could produce different "today" values near midnight compared to UTC. For consistency with other timestamp fields in the codebase, use timezone-aware UTC.

Suggested fix
-                    today = datetime.now().date()
+                    today = datetime.now(timezone.utc).date()

Also update the import on line 204:

-                from datetime import datetime, timedelta
+                from datetime import datetime, timedelta, timezone

Based on learnings: "In Python code, prefer timezone-aware UTC datetimes for timestamp fields by using datetime.now(timezone.utc) instead of naive datetime.now()."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/workflow/views.py` around lines 210 - 212, Replace the naive
local datetime usage when computing today/week_start/week_end: use a UTC-aware
datetime (e.g., datetime.now(timezone.utc).date() or
datetime.now(timezone.utc).astimezone(timezone.utc).date()) instead of
datetime.now().date(); update the imports to include timezone from datetime (so
the variables today, week_start, week_end are computed from a timezone-aware
datetime) to ensure consistent UTC-based date calculations across the codebase.

221-221: Remove redundant exception object from logging.exception call.

logger.exception() automatically includes the full exception traceback. Including {e} in the message duplicates the exception text.

Suggested fix
-                    logger.exception(f"Failed to create run for opp {opportunity_id}: {e}")
+                    logger.exception(f"Failed to create run for opp {opportunity_id}")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/workflow/views.py` at line 221, The log call in
commcare_connect.workflow.views where logger.exception is used to report "Failed
to create run for opp {opportunity_id}: {e}" should be changed to avoid
duplicating the exception text; update the logger.exception invocation (in the
function that creates runs for an opportunity, referencing opportunity_id) to
pass only the descriptive message (e.g. "Failed to create run for opp
{opportunity_id}") and let logger.exception include the traceback automatically
(alternatively use logger.exception with a format string and opportunity_id as
an argument).
commcare_connect/labs/analysis/sse_streaming.py (1)

117-138: Consider explicitly closing the generator on early exit.

When the producer breaks due to stop_event, the wrapped generator isn't explicitly closed. While it will eventually be garbage collected (triggering any finally blocks), explicit cleanup ensures immediate resource release.

♻️ Proposed improvement
         def _producer():
             try:
                 for item in generator:
                     if stop_event.is_set():
                         break
                     while not stop_event.is_set():
                         try:
                             data_queue.put(("data", item), timeout=1)
                             break
                         except queue.Full:
                             continue
             except Exception as e:
                 try:
                     data_queue.put(("error", e), timeout=1)
                 except queue.Full:
                     pass
             finally:
+                # Explicitly close generator to ensure cleanup
+                if hasattr(generator, 'close'):
+                    generator.close()
                 try:
                     data_queue.put(("done", None), timeout=1)
                 except queue.Full:
                     pass

Regarding the broad except Exception at line 128: this is acceptable here since the producer thread must catch all non-system-exiting exceptions from the generator and propagate them to the consumer thread for re-raising. Adding a # noqa: BLE001 comment would silence the Ruff warning.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/labs/analysis/sse_streaming.py` around lines 117 - 138, The
_producer function should explicitly close the wrapped generator when breaking
out early due to stop_event: after detecting stop_event.is_set() (inside the for
loop and before break), call generator.close() in a try/except (or ensure
closure in the finally) so resources are released immediately; ensure this
closure is safe (catch GeneratorExit or RuntimeError if needed) and still allows
the existing error/done messages to be queued to data_queue. Also add a `# noqa:
BLE001` to the broad `except Exception as e:` in _producer to silence the Ruff
warning while preserving the intended behavior of propagating generator
exceptions to the consumer.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@commcare_connect/labs/analysis/sse_streaming.py`:
- Around line 117-138: The _producer function should explicitly close the
wrapped generator when breaking out early due to stop_event: after detecting
stop_event.is_set() (inside the for loop and before break), call
generator.close() in a try/except (or ensure closure in the finally) so
resources are released immediately; ensure this closure is safe (catch
GeneratorExit or RuntimeError if needed) and still allows the existing
error/done messages to be queued to data_queue. Also add a `# noqa: BLE001` to
the broad `except Exception as e:` in _producer to silence the Ruff warning
while preserving the intended behavior of propagating generator exceptions to
the consumer.

In `@commcare_connect/workflow/views.py`:
- Around line 210-212: Replace the naive local datetime usage when computing
today/week_start/week_end: use a UTC-aware datetime (e.g.,
datetime.now(timezone.utc).date() or
datetime.now(timezone.utc).astimezone(timezone.utc).date()) instead of
datetime.now().date(); update the imports to include timezone from datetime (so
the variables today, week_start, week_end are computed from a timezone-aware
datetime) to ensure consistent UTC-based date calculations across the codebase.
- Line 221: The log call in commcare_connect.workflow.views where
logger.exception is used to report "Failed to create run for opp
{opportunity_id}: {e}" should be changed to avoid duplicating the exception
text; update the logger.exception invocation (in the function that creates runs
for an opportunity, referencing opportunity_id) to pass only the descriptive
message (e.g. "Failed to create run for opp {opportunity_id}") and let
logger.exception include the traceback automatically (alternatively use
logger.exception with a format string and opportunity_id as an argument).

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between af0c493 and 3247eac.

📒 Files selected for processing (5)
  • commcare_connect/labs/analysis/backends/sql/backend.py
  • commcare_connect/labs/analysis/data_access.py
  • commcare_connect/labs/analysis/sse_streaming.py
  • commcare_connect/tasks/views.py
  • commcare_connect/workflow/views.py

- workflow/views.py: Use timezone-aware datetime.now(timezone.utc).date() for
  week boundary calculation, matching get_or_create_run() in data_access.py
- workflow/views.py: Remove redundant {e} from logger.exception (traceback
  already included automatically)
- sse_streaming.py: Explicitly close wrapped generator in _producer finally
  block to release resources promptly; add # noqa: BLE001 for intentional
  broad except
- DOCUMENTATION.md: Fix stale extractor count (was "3 extractor, 9 path",
  now "1 extractor (gps_location), 12 path")

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
commcare_connect/workflow/views.py (1)

220-223: ⚠️ Potential issue | 🟠 Major

Propagate run-creation failure state to avoid misleading fallback error.

At Line 222, the fallback render path loses creation-failure context. Downstream, Line 334 can show “Please select an opportunity” even when an opportunity is present and creation simply failed.

Suggested fix
-                except Exception as e:
+                except Exception:
                     logger.exception("Failed to create run for opp %s", opportunity_id)
+                    request._workflow_create_error = "Could not create a new run. Please try again."
                     return super().get(request, *args, **kwargs)
                 finally:
                     data_access.close()
@@
-                context["error"] = "Could not create a new run. Please select an opportunity."
+                context["error"] = getattr(
+                    self.request,
+                    "_workflow_create_error",
+                    "Could not create a new run. Please try again.",
+                )
                 return context
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/workflow/views.py` around lines 220 - 223, The fallback path
swallows the run-creation failure causing downstream UI to show misleading
prompts; after logger.exception("Failed to create run for opp %s",
opportunity_id) propagate the failure by adding a user-visible error (use
Django's messages framework: messages.error(request, ... ) including the
exception text or context) before returning super().get(request, *args,
**kwargs) so the template/flow (downstream logic around the selection prompt)
can detect and surface the actual creation failure instead of the generic
"Please select an opportunity" message; ensure messages is imported and include
opportunity_id and the exception message for clarity.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@commcare_connect/workflow/views.py`:
- Around line 220-223: The fallback path swallows the run-creation failure
causing downstream UI to show misleading prompts; after logger.exception("Failed
to create run for opp %s", opportunity_id) propagate the failure by adding a
user-visible error (use Django's messages framework: messages.error(request, ...
) including the exception text or context) before returning super().get(request,
*args, **kwargs) so the template/flow (downstream logic around the selection
prompt) can detect and surface the actual creation failure instead of the
generic "Please select an opportunity" message; ensure messages is imported and
include opportunity_id and the exception message for clarity.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3247eac and 2d73077.

📒 Files selected for processing (3)
  • commcare_connect/labs/analysis/sse_streaming.py
  • commcare_connect/workflow/templates/mbw_monitoring/DOCUMENTATION.md
  • commcare_connect/workflow/views.py

@akkaouim
Copy link
Collaborator Author

All concerns addressed and tested.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant