Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions commcare_connect/labs/analysis/backends/csv_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,16 @@ def parse_csv_bytes(
visits = []

try:
csv_reader = pd.read_csv(io.BytesIO(csv_bytes), usecols=usecols, chunksize=chunksize)
csv_reader = pd.read_csv(
io.BytesIO(csv_bytes), usecols=usecols, chunksize=chunksize, on_bad_lines="warn"
)
except ValueError as e:
# Handle case where some slim columns don't exist in CSV
if "not in list" in str(e) and skip_form_json:
logger.warning(f"Some slim columns not found in CSV, falling back to all columns: {e}")
csv_reader = pd.read_csv(io.BytesIO(csv_bytes), chunksize=chunksize)
csv_reader = pd.read_csv(
io.BytesIO(csv_bytes), chunksize=chunksize, on_bad_lines="warn"
)
else:
raise

Expand Down
330 changes: 326 additions & 4 deletions commcare_connect/labs/integrations/commcare/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import httpx
from django.conf import settings
from django.http import HttpRequest
from django.utils import timezone

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -43,22 +44,85 @@ def check_token_valid(self) -> bool:
"""
Check if CommCare OAuth token is configured and not expired.

If the token is expired, attempts automatic refresh using the stored
refresh token before returning False.

Returns:
True if token is valid, False otherwise
True if token is valid (or was successfully refreshed), False otherwise
"""
from django.utils import timezone

if not self.access_token:
return False

# Check expiration
expires_at = self.commcare_oauth.get("expires_at", 0)
if timezone.now().timestamp() >= expires_at:
logger.warning(f"CommCare OAuth token expired at {expires_at}")
logger.info("CommCare OAuth token expired, attempting refresh...")
if self._refresh_token():
logger.info("Successfully refreshed CommCare OAuth token")
return True
logger.warning(f"CommCare OAuth token expired at {expires_at} and refresh failed")
return False

return True

def _refresh_token(self) -> bool:
"""
Attempt to refresh the CommCare OAuth token using the stored refresh token.

Updates both the instance state and the session so the new token persists.

Returns:
True if refresh succeeded, False otherwise
"""
refresh_token = self.commcare_oauth.get("refresh_token")
if not refresh_token:
logger.debug("No refresh token available for CommCare OAuth")
return False

client_id = getattr(settings, "COMMCARE_OAUTH_CLIENT_ID", "")
client_secret = getattr(settings, "COMMCARE_OAUTH_CLIENT_SECRET", "")
if not client_id or not client_secret:
logger.warning("CommCare OAuth client credentials not configured for token refresh")
return False

try:
response = httpx.post(
f"{self.base_url}/oauth/token/",
data={
"grant_type": "refresh_token",
"client_id": client_id,
"client_secret": client_secret,
"refresh_token": refresh_token,
},
timeout=30.0,
)

if response.status_code != 200:
logger.warning(f"CommCare token refresh failed: {response.status_code} - {response.text}")
return False

token_data = response.json()
new_oauth = {
"access_token": token_data["access_token"],
"refresh_token": token_data.get("refresh_token", refresh_token),
"expires_at": timezone.now().timestamp() + token_data.get("expires_in", 3600),
"token_type": token_data.get("token_type", "Bearer"),
}

# Update instance state
self.access_token = new_oauth["access_token"]
self.commcare_oauth = new_oauth

# Update session so it persists across requests
self.request.session["commcare_oauth"] = new_oauth
if hasattr(self.request.session, "modified"):
self.request.session.modified = True

return True
except Exception as e:
logger.warning(f"CommCare token refresh error: {e}")
return False

def fetch_cases(
self,
case_type: str,
Expand Down Expand Up @@ -128,6 +192,264 @@ def fetch_cases(
logger.info(f"Fetched total of {len(all_cases)} {case_type} cases from CommCare")
return all_cases

def fetch_cases_by_ids(self, case_ids: list[str], batch_size: int = 100) -> list[dict]:
"""
Fetch multiple cases by their IDs in batches using comma-separated IDs.

Uses GET /api/case/v2/{id1},{id2},.../ to fetch many cases per request.
Batch size is limited to ~100 to stay within URL length limits.

Args:
case_ids: List of case IDs to fetch
batch_size: Number of cases per request (default 100)

Returns:
List of case dictionaries

Raises:
ValueError: If OAuth token is not configured or expired
"""
if not self.check_token_valid():
raise ValueError(
"CommCare OAuth not configured or expired. "
"Please authorize CommCare access at /labs/commcare/initiate/"
)

if not case_ids:
return []

headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
}

all_cases = []
total = len(case_ids)

for i in range(0, total, batch_size):
batch = case_ids[i : i + batch_size]
batch_num = i // batch_size + 1
total_batches = (total + batch_size - 1) // batch_size
logger.info(
f"Bulk-fetching case batch {batch_num}/{total_batches} "
f"({len(batch)} cases) from CommCare"
)

ids_param = ",".join(batch)
url = f"{self.base_url}/a/{self.domain}/api/case/v2/{ids_param}/"

try:
# Follow pagination within this batch
while url:
response = httpx.get(url, headers=headers, timeout=60.0)
response.raise_for_status()
data = response.json()

if isinstance(data, dict):
cases = data.get("cases", [])
all_cases.extend(cases)
url = data.get("next") # follow pagination
elif isinstance(data, list):
all_cases.extend(data)
url = None
else:
url = None
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
logger.warning(f"Batch {batch_num}: some cases not found")
else:
logger.error(f"Bulk fetch batch {batch_num} failed: {e}")
except httpx.TimeoutException:
logger.warning(f"Timeout on bulk fetch batch {batch_num}")

logger.info(f"Fetched {len(all_cases)}/{total} cases from CommCare")
return all_cases

def fetch_forms(
self,
xmlns: str | None = None,
app_id: str | None = None,
limit: int = 1000,
received_on_start: str | None = None,
received_on_end: str | None = None,
) -> list[dict]:
"""Fetch form submissions from CCHQ Form API v1.

Uses /a/{domain}/api/form/v1/ with optional xmlns filter.
Paginates via meta.next URLs.
"""
if not self.check_token_valid():
raise ValueError(
"CommCare OAuth not configured or expired. "
"Please authorize CommCare access at /labs/commcare/initiate/"
)

endpoint = f"{self.base_url}/a/{self.domain}/api/form/v1/"
params = {"limit": limit}
if xmlns:
params["xmlns"] = xmlns
if app_id:
params["app_id"] = app_id
if received_on_start:
params["received_on_start"] = received_on_start
if received_on_end:
params["received_on_end"] = received_on_end

headers = {"Authorization": f"Bearer {self.access_token}"}

all_forms = []
next_url = endpoint
page = 0
while next_url:
page += 1
logger.info(f"Fetching forms page {page} from {next_url}")

response = httpx.get(
next_url,
params=params if next_url == endpoint else None,
headers=headers,
timeout=60.0,
)
response.raise_for_status()
data = response.json()
forms = data.get("objects", [])
all_forms.extend(forms)

logger.info(f"Retrieved {len(forms)} forms (total so far: {len(all_forms)})")

next_url = data.get("meta", {}).get("next")
if next_url and not next_url.startswith("http"):
if next_url.startswith("?"):
# Query-params-only relative URL — prepend full endpoint path
next_url = f"{endpoint}{next_url}"
else:
# Path-based relative URL (e.g., /a/domain/api/...)
next_url = f"{self.base_url}{next_url}"

logger.info(f"Fetched total of {len(all_forms)} forms from CommCare")
return all_forms

def get_form_xmlns(self, app_id: str, form_name: str = "Register Mother") -> str | None:
"""Look up a form's xmlns from the Application Structure API.

Calls GET /a/{domain}/api/application/v1/{app_id}/ and walks
modules[] -> forms[] matching by the form's multilingual name dict.

Args:
app_id: CommCare application ID
form_name: Human-readable form name to search for (matched against
the values of each form's ``name`` dict, e.g. ``{"en": "Register Mother"}``)

Returns:
The xmlns string for the matching form, or None if not found.
"""
if not self.check_token_valid():
logger.warning("Cannot look up form xmlns: OAuth token invalid")
return None

url = f"{self.base_url}/a/{self.domain}/api/application/v1/{app_id}/"
headers = {"Authorization": f"Bearer {self.access_token}"}

try:
response = httpx.get(url, headers=headers, timeout=30.0)
response.raise_for_status()
except httpx.HTTPStatusError as e:
logger.warning(f"Application Structure API error for app {app_id}: {e.response.status_code}")
return None
except httpx.TimeoutException:
logger.warning(f"Timeout fetching application structure for app {app_id}")
return None

app_data = response.json()

for module in app_data.get("modules", []):
for form in module.get("forms", []):
name_dict = form.get("name", {})
# name_dict is multilingual, e.g. {"en": "Register Mother"}
if isinstance(name_dict, dict):
if form_name in name_dict.values():
xmlns = form.get("xmlns")
if xmlns:
logger.info(f"Discovered xmlns for '{form_name}': {xmlns}")
return xmlns
elif isinstance(name_dict, str) and name_dict == form_name:
xmlns = form.get("xmlns")
if xmlns:
logger.info(f"Discovered xmlns for '{form_name}': {xmlns}")
return xmlns

logger.warning(f"Form '{form_name}' not found in app {app_id}")
return None

def list_applications(self) -> list[dict]:
"""List all applications in the domain via Application API v1.

Returns list of app summary dicts (each has 'id', 'name', etc.).
Paginates through results using meta.next.
"""
if not self.check_token_valid():
logger.warning("Cannot list applications: OAuth token invalid")
return []

endpoint = f"{self.base_url}/a/{self.domain}/api/application/v1/"
headers = {"Authorization": f"Bearer {self.access_token}"}
params = {"limit": 100}

all_apps = []
next_url = endpoint
page = 0
while next_url:
page += 1
logger.info(f"Fetching applications page {page} from {next_url}")

try:
response = httpx.get(
next_url,
params=params if next_url == endpoint else None,
headers=headers,
timeout=30.0,
)
response.raise_for_status()
except httpx.HTTPStatusError as e:
logger.warning(f"Application API error: {e.response.status_code}")
break
except httpx.TimeoutException:
logger.warning("Timeout fetching applications list")
break

data = response.json()
apps = data.get("objects", [])
all_apps.extend(apps)

logger.info(f"Retrieved {len(apps)} applications (total so far: {len(all_apps)})")

next_url = data.get("meta", {}).get("next")
if next_url and not next_url.startswith("http"):
if next_url.startswith("?"):
next_url = f"{endpoint}{next_url}"
else:
next_url = f"{self.base_url}{next_url}"

logger.info(f"Listed {len(all_apps)} applications in domain {self.domain}")
return all_apps

def discover_form_xmlns(self, form_name: str) -> str | None:
"""Search all apps in the domain for a form by name, return its xmlns.

Useful when the form is in a different app than the deliver app.
Calls list_applications(), then get_form_xmlns() for each app until found.
"""
apps = self.list_applications()
for app in apps:
app_id = app.get("id")
if app_id:
xmlns = self.get_form_xmlns(app_id, form_name)
if xmlns:
logger.info(f"Discovered xmlns for '{form_name}' in app {app_id}")
return xmlns
logger.warning(f"Form '{form_name}' not found in any of {len(apps)} apps in domain {self.domain}")
return None

def fetch_case_by_id(self, case_id: str) -> dict | None:
"""
Fetch a single case by ID.
Expand Down
Loading