|
| 1 | +from urllib import response |
| 2 | +from responses import logger |
1 | 3 | import simplejson as json |
2 | 4 | import os |
3 | 5 | import time |
@@ -397,20 +399,46 @@ def find_immunizations(self, patient_identifier: str, vaccine_types: list): |
397 | 399 | condition = Key("PatientPK").eq(_make_patient_pk(patient_identifier)) |
398 | 400 | is_not_deleted = Attr("DeletedAt").not_exists() | Attr("DeletedAt").eq("reinstated") |
399 | 401 |
|
400 | | - response = self.table.query( |
401 | | - IndexName="PatientGSI", |
402 | | - KeyConditionExpression=condition, |
403 | | - FilterExpression=is_not_deleted, |
404 | | - ) |
| 402 | + raw_items = self.get_all_items(condition, is_not_deleted) |
405 | 403 |
|
406 | | - if "Items" in response: |
| 404 | + if raw_items: |
407 | 405 | # Filter the response to contain only the requested vaccine types |
408 | | - items = [x for x in response["Items"] if x["PatientSK"].split("#")[0] in vaccine_types] |
| 406 | + items = [x for x in raw_items if x["PatientSK"].split("#")[0] in vaccine_types] |
409 | 407 |
|
410 | 408 | # Return a list of the FHIR immunization resource JSON items |
411 | | - return [json.loads(item["Resource"]) for item in items] |
| 409 | + final_resources = [json.loads(item["Resource"]) for item in items] |
| 410 | + |
| 411 | + return final_resources |
412 | 412 | else: |
413 | | - raise UnhandledResponseError(message=f"Unhandled error. Query failed", response=response) |
| 413 | + logger.warning("no items matched patient_identifier filter!") |
| 414 | + return [] |
| 415 | + |
| 416 | + def get_all_items(self, condition, is_not_deleted): |
| 417 | + """Query DynamoDB and paginate through all results.""" |
| 418 | + all_items = [] |
| 419 | + last_evaluated_key = None |
| 420 | + |
| 421 | + while True: |
| 422 | + query_args = { |
| 423 | + "IndexName": "PatientGSI", |
| 424 | + "KeyConditionExpression": condition, |
| 425 | + "FilterExpression": is_not_deleted, |
| 426 | + } |
| 427 | + if last_evaluated_key: |
| 428 | + query_args["ExclusiveStartKey"] = last_evaluated_key |
| 429 | + |
| 430 | + response = self.table.query(**query_args) |
| 431 | + if "Items" not in response: |
| 432 | + raise UnhandledResponseError(message="No Items in DynamoDB response", response=response) |
| 433 | + |
| 434 | + items = response.get("Items", []) |
| 435 | + all_items.extend(items) |
| 436 | + |
| 437 | + last_evaluated_key = response.get("LastEvaluatedKey") |
| 438 | + if not last_evaluated_key: |
| 439 | + break |
| 440 | + |
| 441 | + return all_items |
414 | 442 |
|
415 | 443 | @staticmethod |
416 | 444 | def _handle_dynamo_response(response): |
|
0 commit comments