Skip to content

Commit dd413ee

Browse files
NogaNHSrobg-test
andauthored
[PRMP-175] Implementing consistent pagination when querying DynamoDB (#779)
Co-authored-by: Robert Gaskin <[email protected]>
1 parent 7287cdf commit dd413ee

16 files changed

+167
-212
lines changed

lambdas/services/base/dynamo_service.py

Lines changed: 21 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,15 @@ def get_table(self, table_name):
3636
logger.error(str(e), {"Result": "Unable to connect to DB"})
3737
raise e
3838

39-
def query_table_by_index(
40-
self,
41-
table_name,
42-
index_name,
43-
search_key,
44-
search_condition: str,
45-
requested_fields: list[str] = None,
46-
query_filter: Attr | ConditionBase = None,
47-
exclusive_start_key: dict = None,
48-
):
39+
def query_table(
40+
self,
41+
table_name,
42+
search_key,
43+
search_condition: str,
44+
index_name: str = None,
45+
requested_fields: list[str] = None,
46+
query_filter: Attr | ConditionBase = None,
47+
) -> list[dict]:
4948
try:
5049
table = self.get_table(table_name)
5150

@@ -62,61 +61,21 @@ def query_table_by_index(
6261

6362
if query_filter:
6463
query_params["FilterExpression"] = query_filter
65-
if exclusive_start_key:
66-
query_params["ExclusiveStartKey"] = exclusive_start_key
67-
68-
results = table.query(**query_params)
69-
70-
if results is None or "Items" not in results:
71-
logger.error(f"Unusable results in DynamoDB: {results!r}")
72-
raise DynamoServiceException("Unrecognised response from DynamoDB")
73-
74-
return results
75-
except ClientError as e:
76-
logger.error(str(e), {"Result": f"Unable to query table: {table_name}"})
77-
raise e
78-
79-
def query_with_pagination(
80-
self, table_name: str, search_key: str, search_condition: str
81-
):
82-
83-
try:
84-
table = self.get_table(table_name)
85-
results = table.query(
86-
KeyConditionExpression=Key(search_key).eq(search_condition)
87-
)
88-
if results is None or "Items" not in results:
89-
logger.error(f"Unusable results in DynamoDB: {results!r}")
90-
raise DynamoServiceException("Unrecognised response from DynamoDB")
64+
items = []
65+
while True:
66+
results = table.query(**query_params)
9167

92-
dynamodb_scan_result = results["Items"]
68+
if results is None or "Items" not in results:
69+
logger.error(f"Unusable results in DynamoDB: {results!r}")
70+
raise DynamoServiceException("Unrecognised response from DynamoDB")
9371

94-
while "LastEvaluatedKey" in results:
95-
start_key_for_next_page = results["LastEvaluatedKey"]
96-
results = table.query(
97-
KeyConditionExpression=Key(search_key).eq(search_condition),
98-
ExclusiveStartKey=start_key_for_next_page,
99-
)
100-
dynamodb_scan_result.extend(results["Items"])
101-
return dynamodb_scan_result
102-
103-
except ClientError as e:
104-
logger.error(str(e), {"Result": f"Unable to query table: {table_name}"})
105-
raise e
72+
items += results["Items"]
10673

107-
def query_all_fields(self, table_name: str, search_key: str, search_condition: str):
108-
"""
109-
Allow querying dynamodb table without explicitly defining the fields to retrieve.
110-
"""
111-
try:
112-
table = self.get_table(table_name)
113-
results = table.query(
114-
KeyConditionExpression=Key(search_key).eq(search_condition)
115-
)
116-
if results is None or "Items" not in results:
117-
logger.error(f"Unusable results in DynamoDB: {results!r}")
118-
raise DynamoServiceException("Unrecognised response from DynamoDB")
119-
return results
74+
if "LastEvaluatedKey" in results:
75+
query_params["ExclusiveStartKey"] = results["LastEvaluatedKey"]
76+
else:
77+
break
78+
return items
12079
except ClientError as e:
12180
logger.error(str(e), {"Result": f"Unable to query table: {table_name}"})
12281
raise e

lambdas/services/document_manifest_job_service.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ def query_zip_trace(self, job_id: str, nhs_number: str) -> DocumentManifestZipTr
170170
attr_operator=AttributeOperator.EQUAL,
171171
filter_value=nhs_number,
172172
).build()
173-
response = self.dynamo_service.query_table_by_index(
173+
response = self.dynamo_service.query_table(
174174
table_name=self.zip_trace_table,
175175
index_name="JobIdIndex",
176176
search_key="JobId",
@@ -180,7 +180,7 @@ def query_zip_trace(self, job_id: str, nhs_number: str) -> DocumentManifestZipTr
180180
)
181181

182182
try:
183-
zip_trace = DocumentManifestZipTrace.model_validate(response["Items"][0])
183+
zip_trace = DocumentManifestZipTrace.model_validate(response[0])
184184
return zip_trace
185185
except (KeyError, IndexError, ValidationError) as e:
186186
logger.error(

lambdas/services/document_service.py

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -59,30 +59,22 @@ def fetch_documents_from_table(
5959
query_filter: Attr | ConditionBase = None,
6060
) -> list[DocumentReference]:
6161
documents = []
62-
exclusive_start_key = None
63-
64-
while True:
65-
response = self.dynamo_service.query_table_by_index(
66-
table_name=table,
67-
index_name=index_name,
68-
search_key=search_key,
69-
search_condition=search_condition,
70-
query_filter=query_filter,
71-
exclusive_start_key=exclusive_start_key,
72-
)
7362

74-
for item in response["Items"]:
75-
try:
76-
document = DocumentReference.model_validate(item)
77-
documents.append(document)
78-
except ValidationError as e:
79-
logger.error(f"Validation error on document: {item}")
80-
logger.error(f"{e}")
81-
continue
82-
if "LastEvaluatedKey" in response:
83-
exclusive_start_key = response["LastEvaluatedKey"]
84-
else:
85-
break
63+
response = self.dynamo_service.query_table(
64+
table_name=table,
65+
index_name=index_name,
66+
search_key=search_key,
67+
search_condition=search_condition,
68+
query_filter=query_filter,
69+
)
70+
for item in response:
71+
try:
72+
document = DocumentReference.model_validate(item)
73+
documents.append(document)
74+
except ValidationError as e:
75+
logger.error(f"Validation error on document: {item}")
76+
logger.error(f"{e}")
77+
continue
8678
return documents
8779

8880
def get_nhs_numbers_based_on_ods_code(self, ods_code: str) -> list[str]:

lambdas/services/im_alerting_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ def get_alarm_history(self, alarm_name: str):
235235
logger.info(f"Checking if {alarm_name} already exists on alarm table")
236236

237237
try:
238-
results = self.dynamo_service.query_all_fields(
238+
results = self.dynamo_service.query_table(
239239
table_name=self.table_name,
240240
search_key="AlarmNameMetric",
241241
search_condition=alarm_name,

lambdas/services/lloyd_george_stitch_job_service.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,10 @@ def process_stitch_trace_response(self, stitch_trace: StitchTrace):
115115
totalFileSizeInBytes=stitch_trace.total_file_size_in_bytes,
116116
)
117117

118-
def validate_stitch_trace(self, response: dict) -> list[StitchTrace] | None:
118+
def validate_stitch_trace(
119+
self, stitch_trace_dynamo_response: list
120+
) -> list[StitchTrace] | None:
119121
try:
120-
stitch_trace_dynamo_response = response.get("Items", [])
121122
if not stitch_trace_dynamo_response:
122123
return None
123124
return [
@@ -149,7 +150,7 @@ def query_stitch_trace_with_nhs_number(self, nhs_number: str) -> list[StitchTrac
149150
attr_operator=AttributeOperator.EQUAL,
150151
filter_value=False,
151152
).build()
152-
response = self.dynamo_service.query_table_by_index(
153+
response = self.dynamo_service.query_table(
153154
table_name=self.stitch_trace_table,
154155
index_name="NhsNumberIndex",
155156
search_key="NhsNumber",

lambdas/services/login_service.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,11 @@ def generate_session(self, state, auth_code) -> dict:
140140
def have_matching_state_value_in_record(self, state: str) -> bool:
141141
state_table_name = os.environ["AUTH_STATE_TABLE_NAME"]
142142

143-
query_response = self.db_service.query_all_fields(
143+
query_response = self.db_service.query_table(
144144
table_name=state_table_name, search_key="State", search_condition=state
145145
)
146146

147-
state_match = "Count" in query_response and query_response["Count"] == 1
147+
state_match = len(query_response) == 1
148148

149149
if state_match:
150150
try:

lambdas/services/manage_user_session_access.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,18 @@ def find_login_session(self, ndr_session_id: str):
2222
logger.info(
2323
f"Retrieving session for session ID ending in: {redact_id_to_last_4_chars(ndr_session_id)}"
2424
)
25-
query_response = self.db_service.query_all_fields(
25+
query_response = self.db_service.query_table(
2626
table_name=self.session_table_name,
2727
search_key="NDRSessionId",
2828
search_condition=ndr_session_id,
2929
)
3030

31-
try:
32-
current_session = query_response["Items"][0]
31+
if query_response and len(query_response) > 0:
32+
current_session = query_response[0]
3333
return current_session
34-
except (KeyError, IndexError):
35-
raise AuthorisationException(
36-
f"Unable to find session for session ID ending in: {redact_id_to_last_4_chars(ndr_session_id)}"
37-
)
34+
raise AuthorisationException(
35+
f"Unable to find session for session ID ending in: {redact_id_to_last_4_chars(ndr_session_id)}"
36+
)
3837

3938
def update_auth_session_with_permitted_search(
4039
self,

lambdas/services/ods_report_service.py

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -101,25 +101,14 @@ def query_table_by_index(self, ods_code: str):
101101
]
102102
results = []
103103
for ods_code in ods_codes:
104-
response = self.dynamo_service.query_table_by_index(
104+
response = self.dynamo_service.query_table(
105105
table_name=self.table_name,
106106
index_name="OdsCodeIndex",
107107
search_key=DocumentReferenceMetadataFields.CURRENT_GP_ODS.value,
108108
search_condition=ods_code,
109109
query_filter=NotDeleted,
110110
)
111-
results += response["Items"]
112-
113-
while "LastEvaluatedKey" in response:
114-
response = self.dynamo_service.query_table_by_index(
115-
table_name=self.table_name,
116-
index_name="OdsCodeIndex",
117-
exclusive_start_key=response["LastEvaluatedKey"],
118-
search_key=DocumentReferenceMetadataFields.CURRENT_GP_ODS.value,
119-
search_condition=ods_code,
120-
query_filter=NotDeleted,
121-
)
122-
results += response["Items"]
111+
results += response
123112

124113
if not results:
125114
logger.info("No records found for ODS code {}".format(ods_code))

lambdas/services/statistical_report_service.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,12 @@ def get_statistic_data(self) -> LoadedStatisticData:
6868
logger.info(f"The period to report: {self.dates_to_collect}")
6969
dynamodb_items = []
7070
for date in self.dates_to_collect:
71-
dynamodb_items_for_date = self.dynamo_service.query_with_pagination(
71+
response = self.dynamo_service.query_table(
7272
table_name=self.statistic_table,
7373
search_key="Date",
7474
search_condition=date,
7575
)
76-
dynamodb_items.extend(dynamodb_items_for_date)
76+
dynamodb_items.extend(response)
7777

7878
loaded_data = load_from_dynamodb_items(dynamodb_items)
7979

0 commit comments

Comments
 (0)