Skip to content

Commit 81d01b1

Browse files
authored
[PRMT-611] Implement pagination for DynamoDB queries for the statistical reports (#780)
1 parent 2382df3 commit 81d01b1

File tree

5 files changed

+87
-19
lines changed

5 files changed

+87
-19
lines changed

lambdas/services/base/dynamo_service.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import boto3
55
from boto3.dynamodb.conditions import Attr, ConditionBase, Key
66
from botocore.exceptions import ClientError
7+
78
from utils.audit_logging_setup import LoggingService
89
from utils.dynamo_utils import (
910
create_expression_attribute_values,
@@ -76,6 +77,34 @@ def query_table_by_index(
7677
logger.error(str(e), {"Result": f"Unable to query table: {table_name}"})
7778
raise e
7879

80+
def query_with_pagination(
81+
self, table_name: str, search_key: str, search_condition: str
82+
):
83+
84+
try:
85+
table = self.get_table(table_name)
86+
results = table.query(
87+
KeyConditionExpression=Key(search_key).eq(search_condition)
88+
)
89+
if results is None or "Items" not in results:
90+
logger.error(f"Unusable results in DynamoDB: {results!r}")
91+
raise DynamoServiceException("Unrecognised response from DynamoDB")
92+
93+
dynamodb_scan_result = results["Items"]
94+
95+
while "LastEvaluatedKey" in results:
96+
start_key_for_next_page = results["LastEvaluatedKey"]
97+
results = table.query(
98+
KeyConditionExpression=Key(search_key).eq(search_condition),
99+
ExclusiveStartKey=start_key_for_next_page,
100+
)
101+
dynamodb_scan_result.extend(results["Items"])
102+
return dynamodb_scan_result
103+
104+
except ClientError as e:
105+
logger.error(str(e), {"Result": f"Unable to query table: {table_name}"})
106+
raise e
107+
79108
def query_all_fields(self, table_name: str, search_key: str, search_condition: str):
80109
"""
81110
Allow querying dynamodb table without explicitly defining the fields to retrieve.

lambdas/services/statistical_report_service.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import polars as pl
77
import polars.selectors as column_select
88
from inflection import humanize
9+
910
from models.report.statistics import (
1011
ApplicationData,
1112
LoadedStatisticData,
@@ -22,6 +23,7 @@
2223

2324
logger = LoggingService(__name__)
2425

26+
2527
class StatisticalReportService:
2628
def __init__(self):
2729
self.dynamo_service = DynamoDBService()
@@ -67,12 +69,12 @@ def get_statistic_data(self) -> LoadedStatisticData:
6769
logger.info(f"The period to report: {self.dates_to_collect}")
6870
dynamodb_items = []
6971
for date in self.dates_to_collect:
70-
response = self.dynamo_service.query_all_fields(
72+
dynamodb_items_for_date = self.dynamo_service.query_with_pagination(
7173
table_name=self.statistic_table,
7274
search_key="Date",
7375
search_condition=date,
7476
)
75-
dynamodb_items.extend(response["Items"])
77+
dynamodb_items.extend(dynamodb_items_for_date)
7678

7779
loaded_data = load_from_dynamodb_items(dynamodb_items)
7880

@@ -159,7 +161,9 @@ def summarise_application_data(
159161
pl.concat_list("active_user_ids_hashed")
160162
.flatten()
161163
.unique()
162-
.map_elements(lambda col: str(col.sort().to_list()), return_dtype=pl.Utf8)
164+
.map_elements(
165+
lambda col: str(col.sort().to_list()), return_dtype=pl.Utf8
166+
)
163167
.alias("unique_active_user_ids_hashed"),
164168
pl.concat_list("active_user_ids_hashed")
165169
.flatten()
@@ -179,7 +183,7 @@ def join_dataframes_by_ods_code(
179183

180184
for other_dataframe in data_to_report[1:]:
181185
joined_dataframe = joined_dataframe.join(
182-
other_dataframe, on="ods_code", how='full', coalesce=True
186+
other_dataframe, on="ods_code", how="full", coalesce=True
183187
)
184188

185189
return joined_dataframe

lambdas/tests/unit/helpers/data/statistic/mock_statistic_data.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,8 +299,8 @@
299299
)
300300

301301
MOCK_DYNAMODB_QUERY_RESPONSE = [
302-
{"Items": [item for item in MOCK_DYNAMODB_ITEMS if item["Date"] == "20240510"]},
303-
{"Items": [item for item in MOCK_DYNAMODB_ITEMS if item["Date"] == "20240511"]},
302+
[item for item in MOCK_DYNAMODB_ITEMS if item["Date"] == "20240510"],
303+
[item for item in MOCK_DYNAMODB_ITEMS if item["Date"] == "20240511"],
304304
]
305305

306306
EXPECTED_WEEKLY_SUMMARY = pl.DataFrame(

lambdas/tests/unit/services/base/test_dynamo_service.py

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
from typing import Optional
1+
import copy
22
from unittest.mock import call
33

44
import pytest
55
from boto3.dynamodb.conditions import Attr, Key
66
from botocore.exceptions import ClientError
7+
78
from enums.dynamo_filter import AttributeOperator
89
from enums.metadata_field_names import DocumentReferenceMetadataFields
910
from services.base.dynamo_service import DynamoDBService
@@ -25,6 +26,7 @@ def mock_service(set_env, mocker):
2526
mocker.patch("boto3.resource")
2627
service = DynamoDBService()
2728
yield service
29+
DynamoDBService.instance = None
2830

2931

3032
@pytest.fixture
@@ -55,6 +57,18 @@ def mock_filter_expression():
5557
).build()
5658
yield filter_expression
5759

60+
def mock_scan_implementation(
61+
**kwargs
62+
):
63+
key = kwargs.get("ExclusiveStartKey")
64+
if not key:
65+
return copy.deepcopy(MOCK_PAGINATED_RESPONSE_1)
66+
elif key.get("ID") == "id_token_for_page_2":
67+
return copy.deepcopy(MOCK_PAGINATED_RESPONSE_2)
68+
elif key.get("ID") == "id_token_for_page_3":
69+
return copy.deepcopy(MOCK_PAGINATED_RESPONSE_3)
70+
return None
71+
5872

5973
def test_query_with_requested_fields_returns_items_from_dynamo(
6074
mock_service, mock_table
@@ -492,16 +506,6 @@ def test_scan_whole_table_return_items_in_response(
492506
def test_scan_whole_table_handles_pagination(
493507
mock_service, mock_scan_method, mock_filter_expression
494508
):
495-
def mock_scan_implementation(
496-
ExclusiveStartKey: Optional[dict[str, str]] = None, **_kwargs
497-
):
498-
if not ExclusiveStartKey:
499-
return MOCK_PAGINATED_RESPONSE_1
500-
elif ExclusiveStartKey.get("ID") == "id_token_for_page_2":
501-
return MOCK_PAGINATED_RESPONSE_2
502-
elif ExclusiveStartKey.get("ID") == "id_token_for_page_3":
503-
return MOCK_PAGINATED_RESPONSE_3
504-
505509
mock_project_expression = "mock_project_expression"
506510
mock_scan_method.side_effect = mock_scan_implementation
507511

@@ -577,3 +581,32 @@ def test_dynamo_service_singleton_instance(mocker):
577581
instance_2 = DynamoDBService()
578582

579583
assert instance_1 is instance_2
584+
585+
586+
def test_query_with_pagination(mock_service, mock_table):
587+
mock_table.return_value.query.side_effect = mock_scan_implementation
588+
expected_result = EXPECTED_ITEMS_FOR_PAGINATED_RESULTS
589+
search_key_obj = Key("NhsNumber").eq(TEST_NHS_NUMBER)
590+
591+
expected_calls = [
592+
call(
593+
KeyConditionExpression=search_key_obj,
594+
),
595+
call(
596+
KeyConditionExpression=search_key_obj,
597+
ExclusiveStartKey={"ID": "id_token_for_page_2"},
598+
),
599+
call(
600+
KeyConditionExpression=search_key_obj,
601+
ExclusiveStartKey={"ID": "id_token_for_page_3"},
602+
),
603+
]
604+
605+
actual = mock_service.query_with_pagination(
606+
table_name=MOCK_TABLE_NAME,
607+
search_key="NhsNumber",
608+
search_condition=TEST_NHS_NUMBER,
609+
)
610+
assert expected_result == actual
611+
mock_table.assert_called_with(MOCK_TABLE_NAME)
612+
mock_table.return_value.query.assert_has_calls(expected_calls)

lambdas/tests/unit/services/test_statistical_report_service.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@ def test_make_weekly_summary(set_env, mocker):
103103

104104
def test_get_statistic_data(mock_dynamodb_service, mock_service):
105105
mock_service.dates_to_collect = ["20240510", "20240511"]
106-
mock_dynamodb_service.query_all_fields.side_effect = MOCK_DYNAMODB_QUERY_RESPONSE
106+
mock_dynamodb_service.query_with_pagination.side_effect = (
107+
MOCK_DYNAMODB_QUERY_RESPONSE
108+
)
107109

108110
actual = mock_service.get_statistic_data()
109111
expected = ALL_MOCKED_STATISTIC_DATA
@@ -123,7 +125,7 @@ def test_get_statistic_data(mock_dynamodb_service, mock_service):
123125
),
124126
]
125127

126-
mock_dynamodb_service.query_all_fields.assert_has_calls(expected_calls)
128+
mock_dynamodb_service.query_with_pagination.assert_has_calls(expected_calls)
127129

128130

129131
def test_get_statistic_data_raise_error_if_all_data_are_empty(

0 commit comments

Comments
 (0)