Skip to content

MBW pipeline migration: v2 job handler, CCHQ CLI OAuth, parity tests#24

Merged
jjackson merged 19 commits intolabs-mainfrom
feature/mbw-pipeline-migration
Feb 26, 2026
Merged

MBW pipeline migration: v2 job handler, CCHQ CLI OAuth, parity tests#24
jjackson merged 19 commits intolabs-mainfrom
feature/mbw-pipeline-migration

Conversation

@jjackson
Copy link
Owner

@jjackson jjackson commented Feb 26, 2026

Summary

  • MBW Monitoring v2 workflow template: Pipeline-based replacement for the SSE streaming approach. Uses 3 pipeline sources (Connect visits, CCHQ registrations, CCHQ GS forms) and an mbw_monitoring job handler for GPS analysis, follow-up rates, quality metrics, and FLW performance.
  • CCHQ OAuth CLI support: create_cli_request() now auto-loads the CommCare HQ token into the mock session. Fixed get_commcare_token to use correct CCHQ OAuth endpoints (/oauth/ not /o/).
  • GPS parity fix: Added visit_date to v2's _build_gps_visit_dicts() output so analyze_gps_metrics can compute trailing 7-day and daily travel metrics.
  • Pipeline cleanup: SQL-only backend (removed python_redis), lazy imports to avoid AppRegistryNotReady, shared data_transforms.py module.
  • Comprehensive parity testing: 21 unit tests + test_mbw_parity management command for live end-to-end payload comparison. Verified identical output on opp 765 (45,652 visits, 100 FLWs, 23,881 registration forms).

Test plan

  • 21 unit parity tests pass (pytest commcare_connect/workflow/tests/test_mbw_v1_v2_parity.py --no-migrations)
  • Live parity test passes on opp 765: all 6 dashboard sections (GPS, follow-up, visit status, overview, performance, drilldown) match between v1 and v2
  • CCHQ OAuth flow works end-to-end via python manage.py get_commcare_token
  • Visual comparison: load v2 template in browser and compare with v1 dashboard

🤖 Generated with Claude Code


Note

High Risk
High risk because it removes the python_redis analysis backend and forces the pipeline onto the SQL backend while also adding a new CommCare HQ Form API data source and new MBW v2 computation path, which can change caching/data-fetch behavior and dashboard outputs.

Overview
Labs analysis is migrated to SQL-only. The PR removes the python_redis backend (and backend protocol/exports), updates imports (e.g. compute_visit_fields), and makes AnalysisPipeline always instantiate SQLBackend.

The pipeline gains an alternate raw-data source from CommCare HQ forms. AnalysisPipelineConfig now includes data_source (connect_csv vs cchq_forms), and the pipeline can fetch/normalize CCHQ Form API results into visit-shaped dicts via a new SQL backend cchq_fetcher.

MBW Monitoring v2 is added. A new mbw_monitoring workflow job handler consumes multi-pipeline outputs (Connect visits + CCHQ registrations + CCHQ GS forms) to compute GPS, follow-up, quality, performance, and overview sections; registration is wired via workflow.job_handlers import, and a new test_mbw_parity command plus unit tests validate v1/v2 parity.

CLI OAuth support is extended. get_oauth_token() now allows configurable authorize/token paths (used to fix CommCare HQ /oauth/* endpoints), and create_cli_request() can inject a saved CommCare token into the mock session.

Written by Cursor Bugbot for commit d0b56b4. This will update automatically on new commits. Configure here.

Summary by CodeRabbit

  • New Features

    • CommCare HQ form data supported as a pipeline data source.
    • MBW Monitoring v2: pipeline + job-based dashboard workflow and job handler.
    • CLI: improved CommCare OAuth handling for CLI requests.
  • Refactor

    • Analysis pipeline standardized on a SQL/PostgreSQL backend; legacy Redis backend and related caching removed.
    • Pipeline flow simplified and unified; CCHQ form handling integrated.
    • Cache tolerance and TTL made configurable.
  • Documentation

    • Architecture docs updated to reflect SQL-backed pipeline and data-source options.
  • Tests

    • Added parity and data-source tests for CCHQ and MBW flows.
  • Chores

    • .gitignore expanded to ignore local token/worktree files.

jjackson and others added 16 commits February 24, 2026 19:22
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add cchq_fetcher.py that normalizes CCHQ form dicts to Connect visit
dict shape so FieldComputation path extraction works identically. Integrate
the CCHQ data source into all 3 fetch paths in pipeline.py (cache miss
with filters, force refresh with filters, and normal unfiltered path).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Create job handler that orchestrates MBW dashboard computations using
existing computation modules. The handler receives pipeline data (visits,
registrations, gs_forms) and runs GPS analysis, follow-up rate computation,
quality metrics, FLW performance, and overview summary.

Includes a PipelineRowAdapter to bridge serialized pipeline row dicts
to the VisitRow-like interface expected by followup_analysis functions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the SSE streaming data loading layer with pipeline + job handler
approach. The v2 render code:

- Reads from the `pipelines` prop (visits, registrations, gs_forms) which
  are auto-loaded by the workflow runner infrastructure
- Shows pipeline loading status with row counts for each data source
- Presents a "Run Analysis" button when all 3 pipelines are loaded
- Triggers actions.startJob() to run the mbw_monitoring job handler
- Streams job progress via actions.streamJobProgress()
- Assembles dashData from job results in the shape the tabs expect

All 4 tab renderings (Overview, GPS, Follow-Up, FLW Performance) and all
worker result management code remain identical to v1.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add 21 parity tests comparing v1 (MBWMonitoringStreamView) and v2
(job handler) data transformation paths. Tests cover:
- GPS dict-building and full GPS analysis
- Per-mother field extraction (parity, ANC/PNC dates, baby DOB)
- EBF % computation
- PipelineRowAdapter attribute access fidelity
- Follow-up analysis (build_followup, aggregate, status distribution)
- End-to-end job handler output comparison

The tests caught a real bug: _build_gps_visit_dicts was not lowercasing
usernames, causing mixed-case usernames to be treated as separate FLWs
(v1 normalizes to lowercase at line 458 of views.py).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add CCHQ OAuth to CLI: `create_cli_request()` now auto-loads CommCare
  HQ token from `~/.commcare-connect/commcare_token.json` into the mock
  session so `CommCareDataAccess` works transparently in CLI scripts.
- Fix CommCare HQ OAuth endpoints: `get_oauth_token()` now accepts
  `authorize_path` and `token_path` params (CCHQ uses `/oauth/` not `/o/`).
  `get_commcare_token` command passes the correct paths.
- Fix GPS parity: add `visit_date` to `_build_gps_visit_dicts()` output
  in the v2 job handler — without it, `analyze_gps_metrics` couldn't
  compute trailing_7_days or avg_daily_travel_km.
- Fix pipeline import: use lazy `get_backend()` instead of direct
  `SQLBackend()` to avoid AppRegistryNotReady errors.
- Add `COMMCARE_OAUTH_CLI_CLIENT_ID` to local settings.
- Extract v1 transforms to shared `data_transforms.py` module, update
  parity tests to use them instead of duplicated inline code.
- Add `test_mbw_parity` management command for live end-to-end payload
  comparison with `--gs-app-id` flag.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link

coderabbitai bot commented Feb 26, 2026

📝 Walkthrough

Walkthrough

This PR removes the Python/Redis labs analysis backend and Protocol, consolidates analysis on an SQL/PostgreSQL backend with SQL-backed cache tables, adds CCHQ Forms as a data source, refactors visit-level computations, and introduces pipeline/job-driven MBW Monitoring V2 with tests and parity tooling.

Changes

Cohort / File(s) Summary
Backend removal (python_redis)
commcare_connect/labs/analysis/backends/protocol.py, commcare_connect/labs/analysis/backends/python_redis/*
Deleted the AnalysisBackend Protocol and the entire python_redis backend implementation (backend, cache, computations, flw_analyzer, visit_analyzer, package re-export). Removes Redis-based caching and backend-specific compute/caching code.
SQL backend & CCHQ fetcher
commcare_connect/labs/analysis/backends/sql/cchq_fetcher.py, commcare_connect/labs/analysis/backends/sql/*, commcare_connect/labs/analysis/pipeline.py
Added CCHQ form fetcher that normalizes forms to visit dicts; pipeline now lazily imports/uses SQLBackend only, handles cchq_forms data_source, and threads tolerance_pct/TTL-based cache validation into SQL cache calls.
Computation utilities
commcare_connect/labs/analysis/computations.py, commcare_connect/labs/analysis/__init__.py
Introduced backend-agnostic visit-level computation utilities (field & histogram extraction) and removed legacy exported analyzers (FLWAnalyzer, VisitAnalyzer, compute_flw/visit_analysis) from module exports.
Config & schema plumbing
commcare_connect/labs/analysis/config.py, commcare_connect/workflow/data_access.py
Added DataSourceConfig and threaded data_source through AnalysisPipelineConfig and schema parsing so pipelines can target connect_csv or cchq_forms.
MBW monitoring job handlers & templates
commcare_connect/workflow/job_handlers/*, commcare_connect/workflow/tasks.py, commcare_connect/workflow/templates/mbw_monitoring_v2.py, commcare_connect/workflow/templates/mbw_monitoring/data_transforms.py, commcare_connect/workflow/templates/__init__.py
Added job handler registration and mbw_monitoring job handler implementing multi-step pipeline-driven analytics with progress reporting; added MBW V2 template, pipeline schemas (visits/registrations/gs_forms), shared transforms, and template re-exports.
MBW parity tooling & tests
commcare_connect/workflow/management/commands/test_mbw_parity.py, commcare_connect/workflow/tests/test_mbw_v1_v2_parity.py
Added CLI parity command and comprehensive v1/v2 parity tests validating transformation and analytics parity across flows.
Removed legacy parity & python-redis test paths
commcare_connect/labs/management/commands/test_backend_parity.py, commcare_connect/custom_analysis/chc_nutrition/management/commands/test_chc_nutrition.py
Removed legacy backend parity command and deleted python_redis cache testing paths; nutrition test now runs pipeline-based analysis without cache flags.
CCHQ pipeline tests & config tests
commcare_connect/labs/tests/test_cchq_pipeline.py, commcare_connect/labs/tests/test_data_source_config.py
Added tests for CCHQ form normalization into visit dicts and for DataSourceConfig validation/defaults.
OAuth / CLI changes
commcare_connect/labs/integrations/connect/cli/client.py, commcare_connect/labs/management/commands/get_commcare_token.py
Made OAuth endpoints configurable (authorize_path, token_path), added CLI helper to inject stored CommCare token into requests, and updated call sites to pass explicit paths.
Docs, settings, misc
.claude/AGENTS.md, commcare_connect/labs/analysis/backends/__init__.py, docs/LABS_ARCHITECTURE.md, config/settings/*, .gitignore
Documentation updated to SQL-only architecture; removed LABS_ANALYSIS_BACKEND setting; added COMMCARE_OAUTH_CLI_CLIENT_ID, PIPELINE_CACHE_TOLERANCE_PCT, PIPELINE_CACHE_TTL_HOURS; minor .gitignore additions.
SQL cache API surface
commcare_connect/labs/analysis/backends/sql/cache.py, commcare_connect/labs/analysis/backends/sql/backend.py
Introduced tolerance_pct and dynamic TTL usage across SQL cache methods and streaming/download paths; added raw_line_count logging and mismatch warnings.

Sequence Diagram(s)

mermaid
sequenceDiagram
participant UI as User/UI
participant Runner as Job Runner
participant Pipeline as AnalysisPipeline
participant SQL as SQLBackend / PostgreSQL
participant CCHQ as CommCare HQ API

UI->>Runner: start MBW job (pipeline config)
Runner->>Pipeline: initialize pipeline (data_source)
alt data_source == cchq_forms
Pipeline->>CCHQ: fetch forms (OAuth token)
CCHQ-->>Pipeline: return normalized visit dicts
else connect_csv
Pipeline->>SQL: stream raw visits / raw cache
SQL-->>Pipeline: stream visit dicts
end
Pipeline->>SQL: process & cache computed visits/FLW
SQL-->>Runner: job progress/events + final results
Runner-->>UI: stream results / completion

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

Poem

🐇 I hopped through code, left Redis behind,

SQL gardens now neatly aligned.
CCHQ forms pranced into tidy rows,
Jobs hummed along as progress flows—
V2 took wing and the rabbit glows.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 67.68% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'MBW pipeline migration: v2 job handler, CCHQ CLI OAuth, parity tests' is fully related to the changeset. It captures the three main themes: MBW pipeline v2 migration, CCHQ CLI OAuth enhancements, and addition of parity tests. This aligns with the PR objectives showing introduction of v2 job handler, CCHQ OAuth support, and comprehensive parity testing.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/mbw-pipeline-migration

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (9)
commcare_connect/custom_analysis/chc_nutrition/management/commands/test_chc_nutrition.py (2)

245-255: The use_cache parameter is now unused.

The run_full_analysis method accepts use_cache=False (line 245), but the new AnalysisPipeline.stream_analysis_ignore_events() call (line 255) doesn't use it. The --use-cache CLI argument (lines 61-64) has no effect.

Consider removing the dead parameter or implementing cache control in the SQL backend.

♻️ Suggested cleanup
-    def run_full_analysis(self, request, opportunity_id, use_cache=False):
+    def run_full_analysis(self, request, opportunity_id):
         """Run the full analysis and show results."""

And remove the --use-cache argument from add_arguments, or keep it with a deprecation warning until SQL cache control is implemented.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@commcare_connect/custom_analysis/chc_nutrition/management/commands/test_chc_nutrition.py`
around lines 245 - 255, The run_full_analysis method currently accepts a
use_cache parameter but never passes it to
AnalysisPipeline.stream_analysis_ignore_events, so the CLI --use-cache flag has
no effect; either remove the dead parameter and the --use-cache argument in
add_arguments (or emit a deprecation warning from add_arguments) or wire cache
control through the pipeline by updating run_full_analysis to pass use_cache
into AnalysisPipeline (e.g., call
stream_analysis_ignore_events(CHC_NUTRITION_CONFIG, use_cache=use_cache)) and
implement cache handling in AnalysisPipeline.stream_analysis_ignore_events/SQL
backend accordingly.

234-243: Cache test stub is acceptable, but consider removing the CLI argument.

The TODO comment is helpful for tracking the incomplete migration. However, the --test-cache argument still exists in add_arguments (line 66-69), which may confuse users. Consider either:

  1. Removing the argument entirely until SQL cache testing is implemented
  2. Adding a deprecation warning when the argument is provided
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@commcare_connect/custom_analysis/chc_nutrition/management/commands/test_chc_nutrition.py`
around lines 234 - 243, Remove the obsolete CLI flag by deleting the
"--test-cache" argument from the command's add_arguments method and any related
handling, and remove or simplify test_cache_functionality's warning block (or
keep the stub but no longer reference the flag), so users aren't offered a
non-functional option; locate add_arguments and test_cache_functionality in the
management command and update help text/docstring to reflect that SQL-backend
cache testing is not implemented (alternatively, if you prefer a softer change,
add a deprecation warning inside add_arguments that logs a clear message when
"--test-cache" is present and points to the TODO in test_cache_functionality).
commcare_connect/labs/tests/test_cchq_pipeline.py (2)

28-28: Remove unused imports.

MagicMock and patch are imported but not used in the current tests.

♻️ Cleanup unused imports
-from unittest.mock import MagicMock, patch  # noqa: E402
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/labs/tests/test_cchq_pipeline.py` at line 28, The test file
imports unused symbols MagicMock and patch; remove them from the import
statement in commcare_connect/labs/tests/test_cchq_pipeline.py (the line
containing "from unittest.mock import MagicMock, patch") so the module only
imports what it actually uses; if other tests later need mocking, add the
specific imports at that time.

8-25: Consider using pytest-django instead of manual configuration.

The manual settings.configure() block is fragile:

  • It may conflict if tests run after Django is already configured
  • It sets up a minimal in-memory SQLite that may not match production schema

If pytest-django is available, use @pytest.mark.django_db and conftest.py fixtures instead. If this file must run standalone (e.g., during CI without full Django setup), add a comment explaining why.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/labs/tests/test_cchq_pipeline.py` around lines 8 - 25, The
manual Django bootstrap (settings.configure(...) and django.setup() in this test
module) is fragile; replace it by relying on pytest-django fixtures or make the
intent explicit: remove the inline settings/configure and annotate tests with
`@pytest.mark.django_db` and add any required DB fixtures in conftest.py, or if
this module must run standalone, add a brief comment above the
settings.configure/django.setup explaining why standalone configuration is
required and guard it with a check to avoid reconfiguring Django (e.g., only
configure when not settings.configured), keeping the existing settings keys
moved to a project conftest.py fixture named e.g. django_db_settings to
centralize configuration.
commcare_connect/workflow/templates/__init__.py (1)

148-190: Consider making pipeline_schema and pipeline_schemas mutually exclusive.

If a template provides both pipeline_schema (singular) and pipeline_schemas (plural), both blocks will execute and append to pipeline_sources. This could lead to unexpected behavior.

Consider adding a guard or documenting the expected usage:

♻️ Proposed fix to make schemas mutually exclusive
+    # Handle pipeline creation - singular OR plural, not both
+    pipeline_schemas = template.get("pipeline_schemas", [])
+    if pipeline_schema and pipeline_schemas:
+        raise ValueError(
+            f"Template {template_key} has both pipeline_schema and pipeline_schemas; use only one"
+        )
+
     # Create pipeline if template has one (singular schema)
     if pipeline_schema and request:

Alternatively, if both are intentionally supported (singular is "main" + plural are "additional"), document this behavior clearly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/workflow/templates/__init__.py` around lines 148 - 190, The
code currently processes both pipeline_schema and pipeline_schemas and appends
into pipeline_sources, causing unexpected duplicates; update the logic in
__init__.py around pipeline_schema, pipeline_schemas, pipeline_sources so the
two modes are mutually exclusive (e.g., if pipeline_schemas is present
prefer/only process it, else process pipeline_schema) or add an explicit guard
that raises/logs when both are provided; ensure the branch that computes
pipeline_alias using template_key/alias_map remains applied only for the
singular pipeline_schema path and that pipeline_data_access usage and
pipeline_sources population occur in the chosen exclusive branch.
commcare_connect/labs/analysis/computations.py (1)

103-116: Broad exception handling is acceptable here, but consider being more specific.

The except Exception blocks (flagged by Ruff BLE001) are reasonable for batch data processing where one malformed record shouldn't crash the entire pipeline. However, for transforms specifically, you could narrow to common expected exceptions:

♻️ Optional: More specific exception handling for transforms
                 if value is not None and field_comp.transform and not field_comp.uses_extractor:
                     try:
                         value = field_comp.transform(value)
-                    except Exception:
+                    except (TypeError, ValueError, AttributeError, KeyError):
                         value = None

This makes the intent clearer while still catching the common failure modes. The outer except Exception on line 114 can remain broad since extraction failures are more varied.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/labs/analysis/computations.py` around lines 103 - 116, The
inner transform error handling currently uses a bare except in the field
processing loop; update the try/except around field_comp.transform(value) to
catch specific expected errors (for example ValueError, TypeError, KeyError)
instead of Exception so malformed values are handled explicitly, while leaving
the outer except in the surrounding block (the handler that logs "Failed to
compute field {field_comp.name} for visit {visit.id}: {e}") as a broad except to
continue protecting the overall extraction; reference field_comp.transform and
the outer except that assigns visit_result[field_comp.name] = field_comp.default
when implementing this change.
commcare_connect/labs/analysis/pipeline.py (1)

340-386: Extract duplicated raw-fetch streaming logic into one helper.

These two branches duplicate the same event loop/translation behavior. A single internal helper would reduce drift risk and keep future fixes consistent.

♻️ Refactor sketch
+    def _stream_visit_fetch(self, opp_id: int, force_refresh: bool):
+        visit_dicts = None
+        for event in self.backend.stream_raw_visits(
+            opportunity_id=opp_id,
+            access_token=self.access_token,
+            expected_visit_count=self.visit_count,
+            force_refresh=force_refresh,
+        ):
+            # existing cached/progress/parsing/complete mapping
+            ...
+        return visit_dicts

Also applies to: 455-484

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/labs/analysis/pipeline.py` around lines 340 - 386, The two
branches that produce and translate streaming events duplicate the same loop;
extract that logic into a single helper (e.g., a private method
_stream_and_collect_raw_visits(opportunity_id, access_token, force_refresh,
expected_visit_count) on the Pipeline class) and call it from both the
cchq_forms branch (after fetch_cchq_forms_as_visit_dicts produces its
visit_dicts) and the self.backend branch; the helper should iterate the same
event tuples from self.backend.stream_raw_visits (or accept an iterator of
events), map "cached"/"progress"/"parsing"/"complete" to the same yields
(EVENT_STATUS and EVENT_DOWNLOAD), set/return visit_dicts when "cached" or
"complete" occurs, and preserve logging using self.backend_name and logger so
the behavior in fetch_cchq_forms_as_visit_dicts, visit_dicts assignment,
EVENT_STATUS/EVENT_DOWNLOAD yields, and messages remain identical; apply the
same refactor for the other duplicated region around the second occurrence.
commcare_connect/workflow/job_handlers/mbw_monitoring.py (2)

186-222: The access_token parameter is unused but required by the handler interface.

The static analysis correctly flags that access_token is not used in this handler. However, the @register_job_handler decorator requires all handlers to have the signature (job_config, access_token, progress_callback). Consider adding an underscore prefix (_access_token) to signal intentional non-use and silence the linter, or add a brief comment explaining it's unused for this handler.

💡 Optional: Signal intentional non-use
 `@register_job_handler`("mbw_monitoring")
-def handle_mbw_monitoring_job(job_config: dict, access_token: str, progress_callback) -> dict:
+def handle_mbw_monitoring_job(job_config: dict, _access_token: str, progress_callback) -> dict:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/workflow/job_handlers/mbw_monitoring.py` around lines 186 -
222, The access_token parameter of handle_mbw_monitoring_job is intentionally
unused but flagged by linters; update the function signature to rename
access_token to _access_token (or prefix it with an underscore) and/or add a
brief inline comment like "# intentionally unused — required by
register_job_handler interface" inside handle_mbw_monitoring_job to explicitly
signal non-use and silence static analysis warnings while keeping the required
(job_config, access_token, progress_callback) shape for `@register_job_handler`.

357-370: Non-idiomatic variable existence check using dir().

Line 370 uses "adapted_visit_rows" not in dir() to check if a local variable exists, which is unusual and confusing. This check is needed because adapted_visit_rows is defined inside the try block (line 296) and may not exist if an exception occurs before that line.

A cleaner approach is to initialize the variable before the try block:

♻️ Suggested refactor: Initialize variable before try block

Move the initialization before Step 2's try block:

+    # Initialize before try block so it exists even if step 2 fails early
+    adapted_visit_rows: list[_PipelineRowAdapter] = []
+
     # =========================================================================
     # Step 2: Follow-up Rate Analysis
     # =========================================================================
     try:
         progress_callback("Computing follow-up rates...", processed=1, total=5)

         # Adapt visit rows for followup_analysis functions
         adapted_visit_rows = _adapt_rows(visit_rows)

Then simplify the exception handler:

-        adapted_visit_rows = _adapt_rows(visit_rows) if "adapted_visit_rows" not in dir() else adapted_visit_rows
+        if not adapted_visit_rows:
+            adapted_visit_rows = _adapt_rows(visit_rows)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/workflow/job_handlers/mbw_monitoring.py` around lines 357 -
370, The code uses "adapted_visit_rows" not in dir() to detect whether
adapted_visit_rows was created inside the try; instead initialize
adapted_visit_rows (e.g., set to None or an empty list) immediately before the
try block that defines it (the try beginning around Step 2 where
adapted_visit_rows is assigned), and in the except handler replace the dir()
check with a direct None (or empty) check (e.g., if adapted_visit_rows is None)
and then call _adapt_rows(visit_rows) as needed; update references to
adapted_visit_rows in the except block accordingly so the variable always exists
without using dir().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@commcare_connect/labs/analysis/backends/sql/cchq_fetcher.py`:
- Around line 103-126: The fetch currently always passes app_id to
client.fetch_forms even when xmlns was resolved from data_source.gs_app_id or
via client.discover_form_xmlns, which can incorrectly filter out matches; modify
the logic to track where xmlns was found (e.g., a flag like xmlns_source or
xmlns_from_app) when calling client.get_form_xmlns(data_source.gs_app_id, ...)
or client.get_form_xmlns(app_id, ...) or client.discover_form_xmlns(...), and
only pass app_id into client.fetch_forms when xmlns was specifically discovered
via the main app_id (i.e., xmlns_from_app is True); otherwise call
client.fetch_forms(xmlns=xmlns, app_id=None) so forms from other apps are not
excluded.

In `@commcare_connect/labs/integrations/connect/cli/client.py`:
- Line 153: The current construction of auth_url via string concatenation
(auth_url = f"{production_url}{authorize_path}?{urlencode(auth_params)}") is
fragile when production_url may end with a slash or authorize_path may/may not
start with one; replace this with proper URL joining using urllib.parse.urljoin
to combine production_url and authorize_path, then append the query string built
with urlencode; do the same normalization for the token endpoint (the similar
construction around token_url or wherever production_url + token_path is used)
so both auth and token URLs are robust to trailing/leading slashes.

In `@commcare_connect/workflow/data_access.py`:
- Around line 1745-1752: Guard against schema["data_source"] being null/non-dict
before calling .get on it: replace the current data_source_dict =
schema.get("data_source", {}) with a type check such as data_source_raw =
schema.get("data_source"); if not isinstance(data_source_raw, dict):
data_source_dict = {} else: data_source_dict = data_source_raw, then proceed to
construct DataSourceConfig(...) using data_source_dict; this prevents
AttributeError when schema contains "data_source": null while keeping the same
defaults for missing keys.

In `@commcare_connect/workflow/management/commands/test_mbw_parity.py`:
- Around line 151-153: The except block catching "except Exception as e" in the
command (inside the Command.handle method) currently only logs via
self.stdout.write(self.style.WARNING(...)) and then allows the parity comparison
to continue; change it to abort immediately after logging by raising a
CommandError (or returning from handle) with the error message so the command
stops and does not run with incomplete inputs, and ensure CommandError is
imported (or use return) to prevent further parity checks after the CCHQ fetch
failure.
- Around line 590-601: The command prints "FAILED" when all_diffs is non-empty
but still exits 0; update the failure branch in test_mbw_parity.py (inside the
management command's handle method where all_diffs is checked) to terminate with
a non-zero exit code—either call sys.exit(1) (and add import sys) or raise
django.core.management.CommandError with a clear message—immediately after
writing the failure output so CI sees a failure when all_diffs is present.

---

Nitpick comments:
In
`@commcare_connect/custom_analysis/chc_nutrition/management/commands/test_chc_nutrition.py`:
- Around line 245-255: The run_full_analysis method currently accepts a
use_cache parameter but never passes it to
AnalysisPipeline.stream_analysis_ignore_events, so the CLI --use-cache flag has
no effect; either remove the dead parameter and the --use-cache argument in
add_arguments (or emit a deprecation warning from add_arguments) or wire cache
control through the pipeline by updating run_full_analysis to pass use_cache
into AnalysisPipeline (e.g., call
stream_analysis_ignore_events(CHC_NUTRITION_CONFIG, use_cache=use_cache)) and
implement cache handling in AnalysisPipeline.stream_analysis_ignore_events/SQL
backend accordingly.
- Around line 234-243: Remove the obsolete CLI flag by deleting the
"--test-cache" argument from the command's add_arguments method and any related
handling, and remove or simplify test_cache_functionality's warning block (or
keep the stub but no longer reference the flag), so users aren't offered a
non-functional option; locate add_arguments and test_cache_functionality in the
management command and update help text/docstring to reflect that SQL-backend
cache testing is not implemented (alternatively, if you prefer a softer change,
add a deprecation warning inside add_arguments that logs a clear message when
"--test-cache" is present and points to the TODO in test_cache_functionality).

In `@commcare_connect/labs/analysis/computations.py`:
- Around line 103-116: The inner transform error handling currently uses a bare
except in the field processing loop; update the try/except around
field_comp.transform(value) to catch specific expected errors (for example
ValueError, TypeError, KeyError) instead of Exception so malformed values are
handled explicitly, while leaving the outer except in the surrounding block (the
handler that logs "Failed to compute field {field_comp.name} for visit
{visit.id}: {e}") as a broad except to continue protecting the overall
extraction; reference field_comp.transform and the outer except that assigns
visit_result[field_comp.name] = field_comp.default when implementing this
change.

In `@commcare_connect/labs/analysis/pipeline.py`:
- Around line 340-386: The two branches that produce and translate streaming
events duplicate the same loop; extract that logic into a single helper (e.g., a
private method _stream_and_collect_raw_visits(opportunity_id, access_token,
force_refresh, expected_visit_count) on the Pipeline class) and call it from
both the cchq_forms branch (after fetch_cchq_forms_as_visit_dicts produces its
visit_dicts) and the self.backend branch; the helper should iterate the same
event tuples from self.backend.stream_raw_visits (or accept an iterator of
events), map "cached"/"progress"/"parsing"/"complete" to the same yields
(EVENT_STATUS and EVENT_DOWNLOAD), set/return visit_dicts when "cached" or
"complete" occurs, and preserve logging using self.backend_name and logger so
the behavior in fetch_cchq_forms_as_visit_dicts, visit_dicts assignment,
EVENT_STATUS/EVENT_DOWNLOAD yields, and messages remain identical; apply the
same refactor for the other duplicated region around the second occurrence.

In `@commcare_connect/labs/tests/test_cchq_pipeline.py`:
- Line 28: The test file imports unused symbols MagicMock and patch; remove them
from the import statement in commcare_connect/labs/tests/test_cchq_pipeline.py
(the line containing "from unittest.mock import MagicMock, patch") so the module
only imports what it actually uses; if other tests later need mocking, add the
specific imports at that time.
- Around line 8-25: The manual Django bootstrap (settings.configure(...) and
django.setup() in this test module) is fragile; replace it by relying on
pytest-django fixtures or make the intent explicit: remove the inline
settings/configure and annotate tests with `@pytest.mark.django_db` and add any
required DB fixtures in conftest.py, or if this module must run standalone, add
a brief comment above the settings.configure/django.setup explaining why
standalone configuration is required and guard it with a check to avoid
reconfiguring Django (e.g., only configure when not settings.configured),
keeping the existing settings keys moved to a project conftest.py fixture named
e.g. django_db_settings to centralize configuration.

In `@commcare_connect/workflow/job_handlers/mbw_monitoring.py`:
- Around line 186-222: The access_token parameter of handle_mbw_monitoring_job
is intentionally unused but flagged by linters; update the function signature to
rename access_token to _access_token (or prefix it with an underscore) and/or
add a brief inline comment like "# intentionally unused — required by
register_job_handler interface" inside handle_mbw_monitoring_job to explicitly
signal non-use and silence static analysis warnings while keeping the required
(job_config, access_token, progress_callback) shape for `@register_job_handler`.
- Around line 357-370: The code uses "adapted_visit_rows" not in dir() to detect
whether adapted_visit_rows was created inside the try; instead initialize
adapted_visit_rows (e.g., set to None or an empty list) immediately before the
try block that defines it (the try beginning around Step 2 where
adapted_visit_rows is assigned), and in the except handler replace the dir()
check with a direct None (or empty) check (e.g., if adapted_visit_rows is None)
and then call _adapt_rows(visit_rows) as needed; update references to
adapted_visit_rows in the except block accordingly so the variable always exists
without using dir().

In `@commcare_connect/workflow/templates/__init__.py`:
- Around line 148-190: The code currently processes both pipeline_schema and
pipeline_schemas and appends into pipeline_sources, causing unexpected
duplicates; update the logic in __init__.py around pipeline_schema,
pipeline_schemas, pipeline_sources so the two modes are mutually exclusive
(e.g., if pipeline_schemas is present prefer/only process it, else process
pipeline_schema) or add an explicit guard that raises/logs when both are
provided; ensure the branch that computes pipeline_alias using
template_key/alias_map remains applied only for the singular pipeline_schema
path and that pipeline_data_access usage and pipeline_sources population occur
in the chosen exclusive branch.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 371893a and d0b56b4.

📒 Files selected for processing (35)
  • .claude/AGENTS.md
  • .gitignore
  • commcare_connect/audit/data_access.py
  • commcare_connect/custom_analysis/chc_nutrition/management/commands/test_chc_nutrition.py
  • commcare_connect/labs/analysis/__init__.py
  • commcare_connect/labs/analysis/backends/__init__.py
  • commcare_connect/labs/analysis/backends/protocol.py
  • commcare_connect/labs/analysis/backends/python_redis/__init__.py
  • commcare_connect/labs/analysis/backends/python_redis/backend.py
  • commcare_connect/labs/analysis/backends/python_redis/cache.py
  • commcare_connect/labs/analysis/backends/python_redis/computations.py
  • commcare_connect/labs/analysis/backends/python_redis/flw_analyzer.py
  • commcare_connect/labs/analysis/backends/python_redis/visit_analyzer.py
  • commcare_connect/labs/analysis/backends/sql/cchq_fetcher.py
  • commcare_connect/labs/analysis/computations.py
  • commcare_connect/labs/analysis/config.py
  • commcare_connect/labs/analysis/pipeline.py
  • commcare_connect/labs/integrations/connect/cli/client.py
  • commcare_connect/labs/management/commands/get_commcare_token.py
  • commcare_connect/labs/management/commands/test_backend_parity.py
  • commcare_connect/labs/tests/test_cchq_pipeline.py
  • commcare_connect/labs/tests/test_data_source_config.py
  • commcare_connect/workflow/data_access.py
  • commcare_connect/workflow/job_handlers/__init__.py
  • commcare_connect/workflow/job_handlers/mbw_monitoring.py
  • commcare_connect/workflow/management/commands/test_mbw_parity.py
  • commcare_connect/workflow/tasks.py
  • commcare_connect/workflow/templates/__init__.py
  • commcare_connect/workflow/templates/mbw_monitoring/data_transforms.py
  • commcare_connect/workflow/templates/mbw_monitoring/views.py
  • commcare_connect/workflow/templates/mbw_monitoring_v2.py
  • commcare_connect/workflow/tests/test_mbw_v1_v2_parity.py
  • config/settings/labs.py
  • config/settings/local.py
  • docs/LABS_ARCHITECTURE.md
💤 Files with no reviewable changes (9)
  • commcare_connect/labs/analysis/backends/python_redis/computations.py
  • config/settings/labs.py
  • commcare_connect/labs/management/commands/test_backend_parity.py
  • commcare_connect/labs/analysis/backends/python_redis/visit_analyzer.py
  • commcare_connect/labs/analysis/backends/python_redis/flw_analyzer.py
  • commcare_connect/labs/analysis/backends/python_redis/cache.py
  • commcare_connect/labs/analysis/backends/python_redis/backend.py
  • commcare_connect/labs/analysis/backends/python_redis/init.py
  • commcare_connect/labs/analysis/backends/protocol.py

Comment on lines +103 to +126
# Strategy 1: Try gs_app_id first (for GS forms in separate supervisor app)
if data_source.gs_app_id:
xmlns = client.get_form_xmlns(data_source.gs_app_id, form_name)
if xmlns:
logger.info(f"[CCHQ Fetcher] Found xmlns via gs_app_id: {xmlns}")

# Strategy 2: Try the main app_id
if not xmlns and app_id:
xmlns = client.get_form_xmlns(app_id, form_name)
if xmlns:
logger.info(f"[CCHQ Fetcher] Found xmlns via app_id: {xmlns}")

# Strategy 3: Search all apps
if not xmlns:
xmlns = client.discover_form_xmlns(form_name)
if xmlns:
logger.info(f"[CCHQ Fetcher] Discovered xmlns via search: {xmlns}")

if not xmlns:
logger.warning(f"[CCHQ Fetcher] Could not discover xmlns for '{form_name}', returning empty")
return []

forms = client.fetch_forms(xmlns=xmlns, app_id=app_id if app_id else None)
logger.info(f"[CCHQ Fetcher] Fetched {len(forms)} '{form_name}' forms from {cc_domain}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

app_id filter is applied even when xmlns came from a different app.

When xmlns is resolved via gs_app_id (or global discovery), forcing fetch_forms(..., app_id=<main app>) can filter out matching forms and return empty results.

💡 Suggested fix
-    # Strategy 1: Try gs_app_id first (for GS forms in separate supervisor app)
+    fetch_app_id = None
+
+    # Strategy 1: Try gs_app_id first (for GS forms in separate supervisor app)
     if data_source.gs_app_id:
         xmlns = client.get_form_xmlns(data_source.gs_app_id, form_name)
         if xmlns:
+            fetch_app_id = data_source.gs_app_id
             logger.info(f"[CCHQ Fetcher] Found xmlns via gs_app_id: {xmlns}")

     # Strategy 2: Try the main app_id
     if not xmlns and app_id:
         xmlns = client.get_form_xmlns(app_id, form_name)
         if xmlns:
+            fetch_app_id = app_id
             logger.info(f"[CCHQ Fetcher] Found xmlns via app_id: {xmlns}")

     # Strategy 3: Search all apps
     if not xmlns:
         xmlns = client.discover_form_xmlns(form_name)
         if xmlns:
+            fetch_app_id = None  # app unknown from discovery; don't over-filter
             logger.info(f"[CCHQ Fetcher] Discovered xmlns via search: {xmlns}")

-    forms = client.fetch_forms(xmlns=xmlns, app_id=app_id if app_id else None)
+    forms = client.fetch_forms(xmlns=xmlns, app_id=fetch_app_id)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Strategy 1: Try gs_app_id first (for GS forms in separate supervisor app)
if data_source.gs_app_id:
xmlns = client.get_form_xmlns(data_source.gs_app_id, form_name)
if xmlns:
logger.info(f"[CCHQ Fetcher] Found xmlns via gs_app_id: {xmlns}")
# Strategy 2: Try the main app_id
if not xmlns and app_id:
xmlns = client.get_form_xmlns(app_id, form_name)
if xmlns:
logger.info(f"[CCHQ Fetcher] Found xmlns via app_id: {xmlns}")
# Strategy 3: Search all apps
if not xmlns:
xmlns = client.discover_form_xmlns(form_name)
if xmlns:
logger.info(f"[CCHQ Fetcher] Discovered xmlns via search: {xmlns}")
if not xmlns:
logger.warning(f"[CCHQ Fetcher] Could not discover xmlns for '{form_name}', returning empty")
return []
forms = client.fetch_forms(xmlns=xmlns, app_id=app_id if app_id else None)
logger.info(f"[CCHQ Fetcher] Fetched {len(forms)} '{form_name}' forms from {cc_domain}")
fetch_app_id = None
# Strategy 1: Try gs_app_id first (for GS forms in separate supervisor app)
if data_source.gs_app_id:
xmlns = client.get_form_xmlns(data_source.gs_app_id, form_name)
if xmlns:
fetch_app_id = data_source.gs_app_id
logger.info(f"[CCHQ Fetcher] Found xmlns via gs_app_id: {xmlns}")
# Strategy 2: Try the main app_id
if not xmlns and app_id:
xmlns = client.get_form_xmlns(app_id, form_name)
if xmlns:
fetch_app_id = app_id
logger.info(f"[CCHQ Fetcher] Found xmlns via app_id: {xmlns}")
# Strategy 3: Search all apps
if not xmlns:
xmlns = client.discover_form_xmlns(form_name)
if xmlns:
fetch_app_id = None # app unknown from discovery; don't over-filter
logger.info(f"[CCHQ Fetcher] Discovered xmlns via search: {xmlns}")
if not xmlns:
logger.warning(f"[CCHQ Fetcher] Could not discover xmlns for '{form_name}', returning empty")
return []
forms = client.fetch_forms(xmlns=xmlns, app_id=fetch_app_id)
logger.info(f"[CCHQ Fetcher] Fetched {len(forms)} '{form_name}' forms from {cc_domain}")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/labs/analysis/backends/sql/cchq_fetcher.py` around lines 103
- 126, The fetch currently always passes app_id to client.fetch_forms even when
xmlns was resolved from data_source.gs_app_id or via client.discover_form_xmlns,
which can incorrectly filter out matches; modify the logic to track where xmlns
was found (e.g., a flag like xmlns_source or xmlns_from_app) when calling
client.get_form_xmlns(data_source.gs_app_id, ...) or
client.get_form_xmlns(app_id, ...) or client.discover_form_xmlns(...), and only
pass app_id into client.fetch_forms when xmlns was specifically discovered via
the main app_id (i.e., xmlns_from_app is True); otherwise call
client.fetch_forms(xmlns=xmlns, app_id=None) so forms from other apps are not
excluded.

"code_challenge_method": "S256",
}
auth_url = f"{production_url}/o/authorize/?{urlencode(auth_params)}"
auth_url = f"{production_url}{authorize_path}?{urlencode(auth_params)}"
Copy link

@coderabbitai coderabbitai bot Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Normalize URL joining for OAuth endpoints to avoid malformed URLs.
Current concatenation is fragile if production_url has a trailing / or endpoint paths are passed without a leading /.

Suggested hardening
 def get_oauth_token(
@@
 ) -> dict | None:
@@
-    # Build authorization URL
+    # Build authorization/token URLs safely
+    base_url = production_url.rstrip("/")
+    authorize_endpoint = authorize_path if authorize_path.startswith("/") else f"/{authorize_path}"
+    token_endpoint = token_path if token_path.startswith("/") else f"/{token_path}"
+
+    # Build authorization URL
     auth_params = {
@@
-    auth_url = f"{production_url}{authorize_path}?{urlencode(auth_params)}"
+    auth_url = f"{base_url}{authorize_endpoint}?{urlencode(auth_params)}"
@@
         response = httpx.post(
-            f"{production_url}{token_path}",
+            f"{base_url}{token_endpoint}",
             data=token_data,
             timeout=10,
         )

Also applies to: 201-201

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/labs/integrations/connect/cli/client.py` at line 153, The
current construction of auth_url via string concatenation (auth_url =
f"{production_url}{authorize_path}?{urlencode(auth_params)}") is fragile when
production_url may end with a slash or authorize_path may/may not start with
one; replace this with proper URL joining using urllib.parse.urljoin to combine
production_url and authorize_path, then append the query string built with
urlencode; do the same normalization for the token endpoint (the similar
construction around token_url or wherever production_url + token_path is used)
so both auth and token URLs are robust to trailing/leading slashes.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dismissed — both callers pass well-formed URLs with explicit leading slashes (defaults are /o/authorize/ and /oauth/authorize/). Adding urljoin would over-engineer for an internal CLI tool with two known call sites.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dismissed — both callers pass well-formed URLs with explicit leading slashes (defaults are /o/authorize/ and /oauth/authorize/). Adding urljoin would over-engineer for an internal CLI tool with two known call sites.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jjackson, understood—acknowledging that the inputs are controlled and the defaults already enforce the leading-slash convention makes the suggestion unnecessary for this internal CLI tool. I'll note this context for future reference.


✏️ Learnings added
Learnt from: jjackson
Repo: jjackson/commcare-connect PR: 24
File: commcare_connect/labs/integrations/connect/cli/client.py:153-153
Timestamp: 2026-02-26T13:45:55.693Z
Learning: In commcare_connect/labs/integrations/connect/cli/client.py, the get_oauth_token function is used by an internal CLI tool with only two known call sites. The authorize_path and token_path parameters always have explicit leading slashes by convention (defaults are "/o/authorize/" and "/o/token/"), so URL normalization via urljoin is considered over-engineering given the controlled inputs.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: akkaouim
Repo: jjackson/commcare-connect PR: 22
File: commcare_connect/workflow/data_access.py:572-572
Timestamp: 2026-02-18T16:40:33.334Z
Learning: In Python code, prefer timezone-aware UTC datetimes for timestamp fields by using datetime.now(timezone.utc) instead of naive datetime.now(). This avoids timezone ambiguity and ensures consistent UTC timestamps. In commcare_connect/workflow/data_access.py, apply this pattern for all timestamp fields, ensuring you import timezone from datetime.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jjackson, understood. For an internal tool with controlled call sites and explicit leading-slash defaults, the simple concatenation is appropriate. Thanks for clarifying the context.


🧠 Learnings used
Learnt from: akkaouim
Repo: jjackson/commcare-connect PR: 22
File: commcare_connect/workflow/data_access.py:572-572
Timestamp: 2026-02-18T16:40:33.334Z
Learning: In Python code, prefer timezone-aware UTC datetimes for timestamp fields by using datetime.now(timezone.utc) instead of naive datetime.now(). This avoids timezone ambiguity and ensures consistent UTC timestamps. In commcare_connect/workflow/data_access.py, apply this pattern for all timestamp fields, ensuring you import timezone from datetime.

Comment on lines +1745 to +1752
data_source_dict = schema.get("data_source", {})
data_source = DataSourceConfig(
type=data_source_dict.get("type", "connect_csv"),
form_name=data_source_dict.get("form_name", ""),
app_id=data_source_dict.get("app_id", ""),
app_id_source=data_source_dict.get("app_id_source", ""),
gs_app_id=data_source_dict.get("gs_app_id", ""),
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard schema["data_source"] against null/non-object values.

If persisted schema contains "data_source": null, this block crashes on data_source_dict.get(...) before config creation.

💡 Suggested fix
-        data_source_dict = schema.get("data_source", {})
+        data_source_dict = schema.get("data_source") or {}
+        if not isinstance(data_source_dict, dict):
+            raise ValueError("schema.data_source must be an object")
         data_source = DataSourceConfig(
             type=data_source_dict.get("type", "connect_csv"),
             form_name=data_source_dict.get("form_name", ""),
             app_id=data_source_dict.get("app_id", ""),
             app_id_source=data_source_dict.get("app_id_source", ""),
             gs_app_id=data_source_dict.get("gs_app_id", ""),
         )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/workflow/data_access.py` around lines 1745 - 1752, Guard
against schema["data_source"] being null/non-dict before calling .get on it:
replace the current data_source_dict = schema.get("data_source", {}) with a type
check such as data_source_raw = schema.get("data_source"); if not
isinstance(data_source_raw, dict): data_source_dict = {} else: data_source_dict
= data_source_raw, then proceed to construct DataSourceConfig(...) using
data_source_dict; this prevents AttributeError when schema contains
"data_source": null while keeping the same defaults for missing keys.

Comment on lines +151 to +153
except Exception as e:
self.stdout.write(self.style.WARNING(f" -> CCHQ fetch failed: {e}"))

Copy link

@coderabbitai coderabbitai bot Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t continue parity comparison after CCHQ fetch failure.

Swallowing this error allows the command to run with incomplete inputs and can incorrectly report parity.

💡 Suggested fix
-        except Exception as e:
-            self.stdout.write(self.style.WARNING(f"  -> CCHQ fetch failed: {e}"))
+        except Exception as e:
+            raise CommandError(f"CCHQ fetch failed: {e}") from e
🧰 Tools
🪛 Ruff (0.15.2)

[warning] 151-151: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/workflow/management/commands/test_mbw_parity.py` around
lines 151 - 153, The except block catching "except Exception as e" in the
command (inside the Command.handle method) currently only logs via
self.stdout.write(self.style.WARNING(...)) and then allows the parity comparison
to continue; change it to abort immediately after logging by raising a
CommandError (or returning from handle) with the error message so the command
stops and does not run with incomplete inputs, and ensure CommandError is
imported (or use return) to prevent further parity checks after the CCHQ fetch
failure.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By design — CCHQ is optional for the parity test. When the CommCare token isn't available, the command continues with 0 registration/GS forms and validates all other sections. The output clearly shows the counts so it's transparent.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jjackson, thank you for clarifying! That makes sense for a diagnostic command where partial data can still provide value. The transparent output with counts is a good approach.

To help future reviewers (and developers) understand this is intentional, consider adding a brief inline comment explaining that CCHQ fetch failures are expected and handled gracefully:

except Exception as e:
    # CCHQ data is optional; continue with 0 forms to validate other sections
    self.stdout.write(self.style.WARNING(f"  -> CCHQ fetch failed: {e}"))

This will make the design intent explicit in the code.


✏️ Learnings added
Learnt from: jjackson
Repo: jjackson/commcare-connect PR: 24
File: commcare_connect/workflow/management/commands/test_mbw_parity.py:151-153
Timestamp: 2026-02-26T13:46:12.419Z
Learning: In the test_mbw_parity management command (commcare_connect/workflow/management/commands/test_mbw_parity.py), CCHQ form fetching (registrations and GS forms) is intentionally optional. The command catches and logs fetch exceptions but continues with 0 forms to allow parity validation of other sections when the CommCare token is unavailable. This is by design to support partial testing scenarios.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: akkaouim
Repo: jjackson/commcare-connect PR: 22
File: commcare_connect/workflow/data_access.py:572-572
Timestamp: 2026-02-18T16:40:33.334Z
Learning: In Python code, prefer timezone-aware UTC datetimes for timestamp fields by using datetime.now(timezone.utc) instead of naive datetime.now(). This avoids timezone ambiguity and ensures consistent UTC timestamps. In commcare_connect/workflow/data_access.py, apply this pattern for all timestamp fields, ensuring you import timezone from datetime.

…eanup

- cchq_fetcher: track which app resolved xmlns and only pass that app_id
  to fetch_forms (fixes GS forms being filtered out by main app_id)
- data_access: use `or {}` instead of default={} for data_source to
  handle null values in persisted schemas
- test_mbw_parity: raise CommandError on parity failure for non-zero exit
- mbw_monitoring: initialize adapted_visit_rows before try block instead
  of using dir() check; rename unused access_token to _access_token
- test_cchq_pipeline: remove unused MagicMock/patch imports

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@jjackson
Copy link
Owner Author

Addressed CodeRabbit review — implemented 6 fixes, dismissed 8 as out-of-scope or by-design.

Implemented:

  • cchq_fetcher.py: Track which app resolved xmlns — only pass that app_id to fetch_forms
  • data_access.py: Use or {} to handle "data_source": null in persisted schemas
  • test_mbw_parity.py: Raise CommandError on parity failure for non-zero exit
  • mbw_monitoring.py: Initialize adapted_visit_rows before try block (removes dir() hack)
  • mbw_monitoring.py: Rename access_token_access_token (unused param)
  • test_cchq_pipeline.py: Remove unused MagicMock/patch imports

Dismissed (nitpicks):

  • URL normalization (client.py): Over-engineering for internal CLI with 2 known callers
  • CCHQ fetch failure abort (test_mbw_parity.py): By design — CCHQ is optional
  • use_cache/test_cache dead params (test_chc_nutrition.py): Pre-existing, not changed in this PR
  • settings.configure() fragility (test_cchq_pipeline.py): Intentional standalone test setup
  • pipeline_schema vs pipeline_schemas guard (__init__.py): By design — both are valid
  • Broad exception handling (computations.py): Intentional for batch data resilience
  • Duplicated streaming logic (pipeline.py): Refactoring suggestion, not a bug

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
commcare_connect/workflow/data_access.py (1)

1745-1752: ⚠️ Potential issue | 🟠 Major

Guard schema["data_source"] against non-dict values before .get(...).

Line 1745 still allows truthy non-dict payloads (e.g., list/string), which will crash at Line 1747 with AttributeError.

💡 Suggested fix
-        data_source_dict = schema.get("data_source") or {}
+        data_source_raw = schema.get("data_source")
+        data_source_dict = data_source_raw if isinstance(data_source_raw, dict) else {}
         data_source = DataSourceConfig(
             type=data_source_dict.get("type", "connect_csv"),
             form_name=data_source_dict.get("form_name", ""),
             app_id=data_source_dict.get("app_id", ""),
             app_id_source=data_source_dict.get("app_id_source", ""),
             gs_app_id=data_source_dict.get("gs_app_id", ""),
         )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/workflow/data_access.py` around lines 1745 - 1752, Guard
against non-dict values for schema["data_source"] before calling .get: replace
the current unconditional assignment of data_source_dict =
schema.get("data_source") or {} with a type check so that if
schema.get("data_source") is not an instance of dict you fall back to an empty
dict; then construct DataSourceConfig(...) from that safe data_source_dict. This
change touches the data_source_dict variable used to build the DataSourceConfig
in the data access code (the schema -> data_source_dict -> DataSourceConfig
flow).
🧹 Nitpick comments (1)
commcare_connect/workflow/job_handlers/mbw_monitoring.py (1)

255-256: Consider using UTC for current_date.

date.today() returns a naive date based on the server's local timezone. If the server runs in a non-UTC timezone, this could return a different calendar day than expected, affecting date comparisons in follow-up analysis.

🔧 Suggested fix using UTC
+from datetime import date, timezone, datetime
...
-    current_date = date.today()
+    current_date = datetime.now(timezone.utc).date()

Based on learnings: "In Python code, prefer timezone-aware UTC datetimes for timestamp fields by using datetime.now(timezone.utc) instead of naive datetime.now(). This avoids timezone ambiguity and ensures consistent UTC timestamps."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/workflow/job_handlers/mbw_monitoring.py` around lines 255 -
256, Replace the naive local date call current_date = date.today() with a
UTC-derived date to avoid timezone drift: import datetime/timezone and set
current_date = datetime.datetime.now(datetime.timezone.utc).date(), updating any
references in this module (e.g., in functions within mbw_monitoring.py where
current_date is used) so the date is consistently derived from UTC; ensure
necessary imports (datetime, timezone) are added at the top of the file.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@commcare_connect/workflow/data_access.py`:
- Around line 1745-1752: Guard against non-dict values for schema["data_source"]
before calling .get: replace the current unconditional assignment of
data_source_dict = schema.get("data_source") or {} with a type check so that if
schema.get("data_source") is not an instance of dict you fall back to an empty
dict; then construct DataSourceConfig(...) from that safe data_source_dict. This
change touches the data_source_dict variable used to build the DataSourceConfig
in the data access code (the schema -> data_source_dict -> DataSourceConfig
flow).

---

Nitpick comments:
In `@commcare_connect/workflow/job_handlers/mbw_monitoring.py`:
- Around line 255-256: Replace the naive local date call current_date =
date.today() with a UTC-derived date to avoid timezone drift: import
datetime/timezone and set current_date =
datetime.datetime.now(datetime.timezone.utc).date(), updating any references in
this module (e.g., in functions within mbw_monitoring.py where current_date is
used) so the date is consistently derived from UTC; ensure necessary imports
(datetime, timezone) are added at the top of the file.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d0b56b4 and 97d989a.

📒 Files selected for processing (5)
  • commcare_connect/labs/analysis/backends/sql/cchq_fetcher.py
  • commcare_connect/labs/tests/test_cchq_pipeline.py
  • commcare_connect/workflow/data_access.py
  • commcare_connect/workflow/job_handlers/mbw_monitoring.py
  • commcare_connect/workflow/management/commands/test_mbw_parity.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • commcare_connect/labs/tests/test_cchq_pipeline.py

jjackson and others added 2 commits February 26, 2026 07:16
Remove --use-cache, --test-cache flags and test_cache_functionality stub
that were left over from the removed python_redis backend. Also remove
the stale TODO comment referencing that removal.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Production streaming export is silently truncated by gunicorn worker
timeout (300s) due to N+1 BlobMeta query in UserVisitDataSerializer.
Sentry confirms SystemExit:1 on every download of large opportunities.

- Add raw CSV line counting to detect server-side truncation vs parsing drops
- Add configurable cache tolerance (PIPELINE_CACHE_TOLERANCE_PCT) so pipeline
  can work with partial downloads while server-side fix is pending
- Add configurable cache TTL (PIPELINE_CACHE_TTL_HOURS) for local dev
- Pass tolerance through all cache validation layers (raw, computed, FLW)
- Skip visit count validation for CCHQ form sources (different row counts)
- Fix mbw_monitoring_v2 render code string concatenation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
commcare_connect/workflow/templates/mbw_monitoring_v2.py (2)

955-978: Safety check for SSE references is a good defensive measure.

The code logs warnings when SSE terms remain after transformation, which helps catch incomplete migrations. However, the import inside the loop is inefficient.

♻️ Move import outside the loop
+    import logging
+    _logger = logging.getLogger(__name__)
+
     for term in _sse_terms:
         if term in code:
-            import logging
-
-            logging.getLogger(__name__).warning(
+            _logger.warning(
                 "MBW V2 render code still contains SSE reference: %s", term
             )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/workflow/templates/mbw_monitoring_v2.py` around lines 955 -
978, The import logging is inside the for loop causing repeated imports; move
the import and logger creation out of the loop (e.g., import logging once and
assign logger = logging.getLogger(__name__) before iterating over _sse_terms)
and then call logger.warning(...) inside the loop when term in code is true,
keeping the rest of the logic (_sse_terms, term, and the warning message)
unchanged.

146-162: String replacement helpers could silently fail if V1 code changes.

The _replace_between and _replace_between_inclusive functions raise ValueError when markers aren't found, which is good. However, the error messages could be more actionable by including more context.

Consider this a minor suggestion since the code does fail loudly on marker mismatch.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/workflow/templates/mbw_monitoring_v2.py` around lines 146 -
162, The ValueError raised by _replace_between and _replace_between_inclusive is
unhelpful; update the exception message to include the start and end marker
strings, their computed indices (start and end), and a short preview of the code
context (e.g., a 120-char slice around the computed start or the whole code if
short) so callers can quickly see why the markers weren't found; modify both
functions' raise ValueError lines to construct and raise this richer message
referencing _replace_between and _replace_between_inclusive so debugging marker
mismatches is easier.
commcare_connect/labs/analysis/pipeline.py (1)

361-408: CCHQ fetch logic is duplicated across three code paths.

The CCHQ form fetching code appears in three places: cache miss with filters (lines 361-408), force refresh with filters (lines 466-512), and main CCHQ path (lines 548-572). This increases maintenance burden.

Per the comments summary, this was noted as a refactoring suggestion and dismissed as out-of-scope for this PR. Consider extracting a helper method in a follow-up.

Also applies to: 466-512, 548-572

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commcare_connect/labs/analysis/pipeline.py` around lines 361 - 408, Duplicate
CCHQ fetch logic should be extracted into a single helper to avoid three copies;
create a private helper on the Pipeline class (e.g., _fetch_cchq_forms) that
encapsulates the import and call to
fetch_cchq_forms_as_visit_dicts(request=self.request,
data_source=unfiltered_config.data_source, access_token=self.access_token,
opportunity_id=opp_id) and yields or returns the visit_dicts consistently, then
replace the three inlined blocks that check unfiltered_config.data_source.type
== "cchq_forms" (the blocks around fetch_cchq_forms_as_visit_dicts, the
force-refresh path, and the main CCHQ path) with a single call to that helper;
ensure the helper preserves the same logging/yield behavior (EVENT_STATUS
messages) and assigns visit_dicts used later.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@commcare_connect/labs/analysis/pipeline.py`:
- Around line 361-408: Duplicate CCHQ fetch logic should be extracted into a
single helper to avoid three copies; create a private helper on the Pipeline
class (e.g., _fetch_cchq_forms) that encapsulates the import and call to
fetch_cchq_forms_as_visit_dicts(request=self.request,
data_source=unfiltered_config.data_source, access_token=self.access_token,
opportunity_id=opp_id) and yields or returns the visit_dicts consistently, then
replace the three inlined blocks that check unfiltered_config.data_source.type
== "cchq_forms" (the blocks around fetch_cchq_forms_as_visit_dicts, the
force-refresh path, and the main CCHQ path) with a single call to that helper;
ensure the helper preserves the same logging/yield behavior (EVENT_STATUS
messages) and assigns visit_dicts used later.

In `@commcare_connect/workflow/templates/mbw_monitoring_v2.py`:
- Around line 955-978: The import logging is inside the for loop causing
repeated imports; move the import and logger creation out of the loop (e.g.,
import logging once and assign logger = logging.getLogger(__name__) before
iterating over _sse_terms) and then call logger.warning(...) inside the loop
when term in code is true, keeping the rest of the logic (_sse_terms, term, and
the warning message) unchanged.
- Around line 146-162: The ValueError raised by _replace_between and
_replace_between_inclusive is unhelpful; update the exception message to include
the start and end marker strings, their computed indices (start and end), and a
short preview of the code context (e.g., a 120-char slice around the computed
start or the whole code if short) so callers can quickly see why the markers
weren't found; modify both functions' raise ValueError lines to construct and
raise this richer message referencing _replace_between and
_replace_between_inclusive so debugging marker mismatches is easier.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2be6a4d and 08ede0d.

📒 Files selected for processing (6)
  • .gitignore
  • commcare_connect/labs/analysis/backends/sql/backend.py
  • commcare_connect/labs/analysis/backends/sql/cache.py
  • commcare_connect/labs/analysis/pipeline.py
  • commcare_connect/workflow/templates/mbw_monitoring_v2.py
  • config/settings/local.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • .gitignore

@jjackson jjackson merged commit a538b42 into labs-main Feb 26, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant