Skip to content

Commit 34e0c27

Browse files
authored
Merge pull request #22 from akkaouim/labs-mbw-workflow-v2
Labs mbw workflow v2
2 parents aa1af6f + 0841302 commit 34e0c27

File tree

27 files changed

+7556
-39
lines changed

27 files changed

+7556
-39
lines changed

commcare_connect/labs/analysis/backends/csv_parsing.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,12 +179,16 @@ def parse_csv_bytes(
179179
visits = []
180180

181181
try:
182-
csv_reader = pd.read_csv(io.BytesIO(csv_bytes), usecols=usecols, chunksize=chunksize)
182+
csv_reader = pd.read_csv(
183+
io.BytesIO(csv_bytes), usecols=usecols, chunksize=chunksize, on_bad_lines="warn"
184+
)
183185
except ValueError as e:
184186
# Handle case where some slim columns don't exist in CSV
185187
if "not in list" in str(e) and skip_form_json:
186188
logger.warning(f"Some slim columns not found in CSV, falling back to all columns: {e}")
187-
csv_reader = pd.read_csv(io.BytesIO(csv_bytes), chunksize=chunksize)
189+
csv_reader = pd.read_csv(
190+
io.BytesIO(csv_bytes), chunksize=chunksize, on_bad_lines="warn"
191+
)
188192
else:
189193
raise
190194

commcare_connect/labs/integrations/commcare/api_client.py

Lines changed: 326 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import httpx
1010
from django.conf import settings
1111
from django.http import HttpRequest
12+
from django.utils import timezone
1213

1314
logger = logging.getLogger(__name__)
1415

@@ -43,22 +44,85 @@ def check_token_valid(self) -> bool:
4344
"""
4445
Check if CommCare OAuth token is configured and not expired.
4546
47+
If the token is expired, attempts automatic refresh using the stored
48+
refresh token before returning False.
49+
4650
Returns:
47-
True if token is valid, False otherwise
51+
True if token is valid (or was successfully refreshed), False otherwise
4852
"""
49-
from django.utils import timezone
50-
5153
if not self.access_token:
5254
return False
5355

5456
# Check expiration
5557
expires_at = self.commcare_oauth.get("expires_at", 0)
5658
if timezone.now().timestamp() >= expires_at:
57-
logger.warning(f"CommCare OAuth token expired at {expires_at}")
59+
logger.info("CommCare OAuth token expired, attempting refresh...")
60+
if self._refresh_token():
61+
logger.info("Successfully refreshed CommCare OAuth token")
62+
return True
63+
logger.warning(f"CommCare OAuth token expired at {expires_at} and refresh failed")
5864
return False
5965

6066
return True
6167

68+
def _refresh_token(self) -> bool:
69+
"""
70+
Attempt to refresh the CommCare OAuth token using the stored refresh token.
71+
72+
Updates both the instance state and the session so the new token persists.
73+
74+
Returns:
75+
True if refresh succeeded, False otherwise
76+
"""
77+
refresh_token = self.commcare_oauth.get("refresh_token")
78+
if not refresh_token:
79+
logger.debug("No refresh token available for CommCare OAuth")
80+
return False
81+
82+
client_id = getattr(settings, "COMMCARE_OAUTH_CLIENT_ID", "")
83+
client_secret = getattr(settings, "COMMCARE_OAUTH_CLIENT_SECRET", "")
84+
if not client_id or not client_secret:
85+
logger.warning("CommCare OAuth client credentials not configured for token refresh")
86+
return False
87+
88+
try:
89+
response = httpx.post(
90+
f"{self.base_url}/oauth/token/",
91+
data={
92+
"grant_type": "refresh_token",
93+
"client_id": client_id,
94+
"client_secret": client_secret,
95+
"refresh_token": refresh_token,
96+
},
97+
timeout=30.0,
98+
)
99+
100+
if response.status_code != 200:
101+
logger.warning(f"CommCare token refresh failed: {response.status_code} - {response.text}")
102+
return False
103+
104+
token_data = response.json()
105+
new_oauth = {
106+
"access_token": token_data["access_token"],
107+
"refresh_token": token_data.get("refresh_token", refresh_token),
108+
"expires_at": timezone.now().timestamp() + token_data.get("expires_in", 3600),
109+
"token_type": token_data.get("token_type", "Bearer"),
110+
}
111+
112+
# Update instance state
113+
self.access_token = new_oauth["access_token"]
114+
self.commcare_oauth = new_oauth
115+
116+
# Update session so it persists across requests
117+
self.request.session["commcare_oauth"] = new_oauth
118+
if hasattr(self.request.session, "modified"):
119+
self.request.session.modified = True
120+
121+
return True
122+
except Exception as e:
123+
logger.warning(f"CommCare token refresh error: {e}")
124+
return False
125+
62126
def fetch_cases(
63127
self,
64128
case_type: str,
@@ -128,6 +192,264 @@ def fetch_cases(
128192
logger.info(f"Fetched total of {len(all_cases)} {case_type} cases from CommCare")
129193
return all_cases
130194

195+
def fetch_cases_by_ids(self, case_ids: list[str], batch_size: int = 100) -> list[dict]:
196+
"""
197+
Fetch multiple cases by their IDs in batches using comma-separated IDs.
198+
199+
Uses GET /api/case/v2/{id1},{id2},.../ to fetch many cases per request.
200+
Batch size is limited to ~100 to stay within URL length limits.
201+
202+
Args:
203+
case_ids: List of case IDs to fetch
204+
batch_size: Number of cases per request (default 100)
205+
206+
Returns:
207+
List of case dictionaries
208+
209+
Raises:
210+
ValueError: If OAuth token is not configured or expired
211+
"""
212+
if not self.check_token_valid():
213+
raise ValueError(
214+
"CommCare OAuth not configured or expired. "
215+
"Please authorize CommCare access at /labs/commcare/initiate/"
216+
)
217+
218+
if not case_ids:
219+
return []
220+
221+
headers = {
222+
"Authorization": f"Bearer {self.access_token}",
223+
"Content-Type": "application/json",
224+
}
225+
226+
all_cases = []
227+
total = len(case_ids)
228+
229+
for i in range(0, total, batch_size):
230+
batch = case_ids[i : i + batch_size]
231+
batch_num = i // batch_size + 1
232+
total_batches = (total + batch_size - 1) // batch_size
233+
logger.info(
234+
f"Bulk-fetching case batch {batch_num}/{total_batches} "
235+
f"({len(batch)} cases) from CommCare"
236+
)
237+
238+
ids_param = ",".join(batch)
239+
url = f"{self.base_url}/a/{self.domain}/api/case/v2/{ids_param}/"
240+
241+
try:
242+
# Follow pagination within this batch
243+
while url:
244+
response = httpx.get(url, headers=headers, timeout=60.0)
245+
response.raise_for_status()
246+
data = response.json()
247+
248+
if isinstance(data, dict):
249+
cases = data.get("cases", [])
250+
all_cases.extend(cases)
251+
url = data.get("next") # follow pagination
252+
elif isinstance(data, list):
253+
all_cases.extend(data)
254+
url = None
255+
else:
256+
url = None
257+
except httpx.HTTPStatusError as e:
258+
if e.response.status_code == 404:
259+
logger.warning(f"Batch {batch_num}: some cases not found")
260+
else:
261+
logger.error(f"Bulk fetch batch {batch_num} failed: {e}")
262+
except httpx.TimeoutException:
263+
logger.warning(f"Timeout on bulk fetch batch {batch_num}")
264+
265+
logger.info(f"Fetched {len(all_cases)}/{total} cases from CommCare")
266+
return all_cases
267+
268+
def fetch_forms(
269+
self,
270+
xmlns: str | None = None,
271+
app_id: str | None = None,
272+
limit: int = 1000,
273+
received_on_start: str | None = None,
274+
received_on_end: str | None = None,
275+
) -> list[dict]:
276+
"""Fetch form submissions from CCHQ Form API v1.
277+
278+
Uses /a/{domain}/api/form/v1/ with optional xmlns filter.
279+
Paginates via meta.next URLs.
280+
"""
281+
if not self.check_token_valid():
282+
raise ValueError(
283+
"CommCare OAuth not configured or expired. "
284+
"Please authorize CommCare access at /labs/commcare/initiate/"
285+
)
286+
287+
endpoint = f"{self.base_url}/a/{self.domain}/api/form/v1/"
288+
params = {"limit": limit}
289+
if xmlns:
290+
params["xmlns"] = xmlns
291+
if app_id:
292+
params["app_id"] = app_id
293+
if received_on_start:
294+
params["received_on_start"] = received_on_start
295+
if received_on_end:
296+
params["received_on_end"] = received_on_end
297+
298+
headers = {"Authorization": f"Bearer {self.access_token}"}
299+
300+
all_forms = []
301+
next_url = endpoint
302+
page = 0
303+
while next_url:
304+
page += 1
305+
logger.info(f"Fetching forms page {page} from {next_url}")
306+
307+
response = httpx.get(
308+
next_url,
309+
params=params if next_url == endpoint else None,
310+
headers=headers,
311+
timeout=60.0,
312+
)
313+
response.raise_for_status()
314+
data = response.json()
315+
forms = data.get("objects", [])
316+
all_forms.extend(forms)
317+
318+
logger.info(f"Retrieved {len(forms)} forms (total so far: {len(all_forms)})")
319+
320+
next_url = data.get("meta", {}).get("next")
321+
if next_url and not next_url.startswith("http"):
322+
if next_url.startswith("?"):
323+
# Query-params-only relative URL — prepend full endpoint path
324+
next_url = f"{endpoint}{next_url}"
325+
else:
326+
# Path-based relative URL (e.g., /a/domain/api/...)
327+
next_url = f"{self.base_url}{next_url}"
328+
329+
logger.info(f"Fetched total of {len(all_forms)} forms from CommCare")
330+
return all_forms
331+
332+
def get_form_xmlns(self, app_id: str, form_name: str = "Register Mother") -> str | None:
333+
"""Look up a form's xmlns from the Application Structure API.
334+
335+
Calls GET /a/{domain}/api/application/v1/{app_id}/ and walks
336+
modules[] -> forms[] matching by the form's multilingual name dict.
337+
338+
Args:
339+
app_id: CommCare application ID
340+
form_name: Human-readable form name to search for (matched against
341+
the values of each form's ``name`` dict, e.g. ``{"en": "Register Mother"}``)
342+
343+
Returns:
344+
The xmlns string for the matching form, or None if not found.
345+
"""
346+
if not self.check_token_valid():
347+
logger.warning("Cannot look up form xmlns: OAuth token invalid")
348+
return None
349+
350+
url = f"{self.base_url}/a/{self.domain}/api/application/v1/{app_id}/"
351+
headers = {"Authorization": f"Bearer {self.access_token}"}
352+
353+
try:
354+
response = httpx.get(url, headers=headers, timeout=30.0)
355+
response.raise_for_status()
356+
except httpx.HTTPStatusError as e:
357+
logger.warning(f"Application Structure API error for app {app_id}: {e.response.status_code}")
358+
return None
359+
except httpx.TimeoutException:
360+
logger.warning(f"Timeout fetching application structure for app {app_id}")
361+
return None
362+
363+
app_data = response.json()
364+
365+
for module in app_data.get("modules", []):
366+
for form in module.get("forms", []):
367+
name_dict = form.get("name", {})
368+
# name_dict is multilingual, e.g. {"en": "Register Mother"}
369+
if isinstance(name_dict, dict):
370+
if form_name in name_dict.values():
371+
xmlns = form.get("xmlns")
372+
if xmlns:
373+
logger.info(f"Discovered xmlns for '{form_name}': {xmlns}")
374+
return xmlns
375+
elif isinstance(name_dict, str) and name_dict == form_name:
376+
xmlns = form.get("xmlns")
377+
if xmlns:
378+
logger.info(f"Discovered xmlns for '{form_name}': {xmlns}")
379+
return xmlns
380+
381+
logger.warning(f"Form '{form_name}' not found in app {app_id}")
382+
return None
383+
384+
def list_applications(self) -> list[dict]:
385+
"""List all applications in the domain via Application API v1.
386+
387+
Returns list of app summary dicts (each has 'id', 'name', etc.).
388+
Paginates through results using meta.next.
389+
"""
390+
if not self.check_token_valid():
391+
logger.warning("Cannot list applications: OAuth token invalid")
392+
return []
393+
394+
endpoint = f"{self.base_url}/a/{self.domain}/api/application/v1/"
395+
headers = {"Authorization": f"Bearer {self.access_token}"}
396+
params = {"limit": 100}
397+
398+
all_apps = []
399+
next_url = endpoint
400+
page = 0
401+
while next_url:
402+
page += 1
403+
logger.info(f"Fetching applications page {page} from {next_url}")
404+
405+
try:
406+
response = httpx.get(
407+
next_url,
408+
params=params if next_url == endpoint else None,
409+
headers=headers,
410+
timeout=30.0,
411+
)
412+
response.raise_for_status()
413+
except httpx.HTTPStatusError as e:
414+
logger.warning(f"Application API error: {e.response.status_code}")
415+
break
416+
except httpx.TimeoutException:
417+
logger.warning("Timeout fetching applications list")
418+
break
419+
420+
data = response.json()
421+
apps = data.get("objects", [])
422+
all_apps.extend(apps)
423+
424+
logger.info(f"Retrieved {len(apps)} applications (total so far: {len(all_apps)})")
425+
426+
next_url = data.get("meta", {}).get("next")
427+
if next_url and not next_url.startswith("http"):
428+
if next_url.startswith("?"):
429+
next_url = f"{endpoint}{next_url}"
430+
else:
431+
next_url = f"{self.base_url}{next_url}"
432+
433+
logger.info(f"Listed {len(all_apps)} applications in domain {self.domain}")
434+
return all_apps
435+
436+
def discover_form_xmlns(self, form_name: str) -> str | None:
437+
"""Search all apps in the domain for a form by name, return its xmlns.
438+
439+
Useful when the form is in a different app than the deliver app.
440+
Calls list_applications(), then get_form_xmlns() for each app until found.
441+
"""
442+
apps = self.list_applications()
443+
for app in apps:
444+
app_id = app.get("id")
445+
if app_id:
446+
xmlns = self.get_form_xmlns(app_id, form_name)
447+
if xmlns:
448+
logger.info(f"Discovered xmlns for '{form_name}' in app {app_id}")
449+
return xmlns
450+
logger.warning(f"Form '{form_name}' not found in any of {len(apps)} apps in domain {self.domain}")
451+
return None
452+
131453
def fetch_case_by_id(self, case_id: str) -> dict | None:
132454
"""
133455
Fetch a single case by ID.

0 commit comments

Comments
 (0)