Skip to content

Commit eff28f9

Browse files
[PRMP-1079] Dynamodb query limit doesn't work as expected. (#958)
1 parent d6c10ee commit eff28f9

File tree

11 files changed

+562
-345
lines changed

11 files changed

+562
-345
lines changed

lambdas/handlers/generate_document_manifest_handler.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from boto3.dynamodb.types import TypeDeserializer
21
from enums.lambda_error import LambdaError
32
from enums.logging_app_interaction import LoggingAppInteraction
43
from models.zip_trace import DocumentManifestZipTrace
@@ -9,6 +8,7 @@
98
from utils.decorators.handle_lambda_exceptions import handle_lambda_exceptions
109
from utils.decorators.override_error_check import override_error_check
1110
from utils.decorators.set_audit_arg import set_request_context_for_logging
11+
from utils.dynamo_utils import deserialize_dynamodb_object
1212
from utils.lambda_exceptions import GenerateManifestZipException
1313
from utils.lambda_response import ApiGatewayResponse
1414
from utils.request_context import request_context
@@ -71,9 +71,4 @@ def manifest_zip_handler(zip_trace_item):
7171

7272

7373
def prepare_zip_trace_data(new_zip_trace: dict) -> dict:
74-
deserialize = TypeDeserializer().deserialize
75-
parsed_dynamodb_items = {
76-
key: deserialize(dynamodb_value)
77-
for key, dynamodb_value in new_zip_trace.items()
78-
}
79-
return parsed_dynamodb_items
74+
return deserialize_dynamodb_object(new_zip_trace)

lambdas/services/base/dynamo_service.py

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
create_expression_attribute_values,
1212
create_expressions,
1313
create_update_expression,
14+
deserialize_dynamodb_object,
15+
serialize_dict_to_dynamodb_object,
1416
)
1517
from utils.exceptions import DynamoServiceException
1618

@@ -29,6 +31,7 @@ def __new__(cls):
2931
def __init__(self):
3032
if not self.initialised:
3133
self.dynamodb = boto3.resource("dynamodb", region_name="eu-west-2")
34+
self.client = boto3.client("dynamodb")
3235
self.initialised = True
3336

3437
def get_table(self, table_name: str):
@@ -196,7 +199,7 @@ def update_item(
196199
table_name: str,
197200
key_pair: dict[str, str],
198201
updated_fields: dict,
199-
condition_expression: str | ConditionBase| None = None,
202+
condition_expression: str | ConditionBase | None = None,
200203
expression_attribute_values: dict | None = None,
201204
):
202205
table = self.get_table(table_name)
@@ -429,3 +432,57 @@ def build_update_transaction_item(
429432
},
430433
}
431434
}
435+
436+
def query_table_with_paginator(
437+
self,
438+
table_name: str,
439+
index_name: str,
440+
key: str,
441+
condition: str,
442+
filter_expression: str | None = None,
443+
expression_attribute_names: str | None = None,
444+
expression_attribute_values: dict | None = None,
445+
limit: int = 20,
446+
page_size: int = 1,
447+
start_key: str | None = None,
448+
) -> dict:
449+
450+
try:
451+
query_params = {
452+
"TableName": table_name,
453+
"IndexName": index_name,
454+
"KeyConditionExpression": f"{key}=:i",
455+
"PaginationConfig": {
456+
"MaxItems": limit,
457+
"PageSize": page_size,
458+
"StartingToken": start_key,
459+
},
460+
}
461+
462+
if expression_attribute_values is None:
463+
expression_attribute_values = {}
464+
465+
expression_attribute_values[":i"] = condition
466+
467+
if filter_expression:
468+
query_params["FilterExpression"] = filter_expression
469+
470+
if expression_attribute_names:
471+
query_params["ExpressionAttributeNames"] = expression_attribute_names
472+
473+
if expression_attribute_values:
474+
query_params["ExpressionAttributeValues"] = (
475+
serialize_dict_to_dynamodb_object(expression_attribute_values)
476+
)
477+
478+
paginator = self.client.get_paginator("query")
479+
response = paginator.paginate(**query_params).build_full_result()
480+
481+
response["Items"] = [
482+
deserialize_dynamodb_object(item) for item in response["Items"]
483+
]
484+
return response
485+
486+
except Exception as e:
487+
logger.error("Failed to query DynamoDB")
488+
raise e

lambdas/services/document_service.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,3 +294,45 @@ def create_dynamo_entry(
294294
except (ValidationError, ClientError) as e:
295295
logger.error(e)
296296
raise e
297+
298+
def query_table_with_paginator(
299+
self,
300+
index_name: str,
301+
search_key: str,
302+
search_condition: str,
303+
table_name: str | None = None,
304+
filter_expression: str | None = None,
305+
expression_attribute_names: dict | None = None,
306+
expression_attribute_values: dict | None = None,
307+
limit: int | None = None,
308+
page_size: int = 1,
309+
start_key: str | None = None,
310+
model_class: BaseModel | None = None,
311+
) -> tuple[list[BaseModel], str | None]:
312+
313+
try:
314+
table_name = table_name or self.table_name
315+
model_class = model_class or self.model_class
316+
317+
response = self.dynamo_service.query_table_with_paginator(
318+
table_name=table_name,
319+
index_name=index_name,
320+
key=search_key,
321+
condition=search_condition,
322+
filter_expression=filter_expression,
323+
expression_attribute_names=expression_attribute_names,
324+
expression_attribute_values=expression_attribute_values,
325+
limit=limit,
326+
page_size=page_size,
327+
start_key=start_key,
328+
)
329+
330+
references = [
331+
model_class.model_validate(item) for item in response["Items"]
332+
]
333+
334+
return references, response.get("NextToken")
335+
336+
except (ValidationError, ClientError) as e:
337+
logger.error(e)
338+
raise e

lambdas/services/document_upload_review_service.py

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from utils.audit_logging_setup import LoggingService
1414
from utils.aws_transient_error_check import is_transient_error
1515
from utils.dynamo_query_filter_builder import DynamoQueryFilterBuilder
16-
from utils.dynamo_utils import build_transaction_item
16+
from utils.dynamo_utils import build_mixed_condition_expression, build_transaction_item
1717
from utils.exceptions import DocumentReviewException
1818

1919
logger = LoggingService(__name__)
@@ -41,35 +41,34 @@ def model_class(self) -> type:
4141
def s3_bucket(self) -> str:
4242
return self._s3_bucket
4343

44-
def query_docs_pending_review_by_custodian_with_limit(
44+
def query_docs_pending_review_with_paginator(
4545
self,
4646
ods_code: str,
4747
limit: int = DEFAULT_QUERY_LIMIT,
48-
start_key: dict | None = None,
48+
start_key: str | None = None,
4949
nhs_number: str | None = None,
5050
uploader: str | None = None,
51-
) -> tuple[list[DocumentUploadReviewReference], dict | None]:
52-
logger.info(f"Getting review document references for custodian: {ods_code}")
53-
54-
filter_expression = self.build_review_dynamo_filter(
55-
nhs_number=nhs_number, uploader=uploader
56-
)
51+
) -> tuple[list[DocumentUploadReviewReference], str | None]:
5752

5853
try:
59-
response = self.dynamo_service.query_table_single(
60-
table_name=self.table_name,
54+
logger.info(f"Getting review document references for custodian: {ods_code}")
55+
56+
filter_expression, condition_attribute_names, condition_attribute_values = (
57+
self.build_paginator_query_filter(
58+
nhs_number=nhs_number, uploader=uploader
59+
)
60+
)
61+
references, last_evaluated_key = self.query_table_with_paginator(
62+
index_name="CustodianIndex",
6163
search_key="Custodian",
6264
search_condition=ods_code,
63-
index_name="CustodianIndex",
65+
filter_expression=filter_expression,
66+
expression_attribute_names=condition_attribute_names,
67+
expression_attribute_values=condition_attribute_values,
6468
limit=limit,
6569
start_key=start_key,
66-
query_filter=filter_expression,
6770
)
6871

69-
references = self._validate_review_references(response["Items"])
70-
71-
last_evaluated_key = response.get("LastEvaluatedKey", None)
72-
7372
return references, last_evaluated_key
7473

7574
except ClientError as e:
@@ -89,6 +88,36 @@ def _validate_review_references(
8988
logger.error(e)
9089
raise DocumentReviewException(ErrorMessage.FAILED_TO_VALIDATE.value)
9190

91+
def build_paginator_query_filter(
92+
self, nhs_number: str | None = None, uploader: str | None = None
93+
):
94+
conditions = [
95+
{
96+
"field": "ReviewStatus",
97+
"operator": "=",
98+
"value": DocumentReviewStatus.PENDING_REVIEW.value,
99+
}
100+
]
101+
if nhs_number:
102+
conditions.append(
103+
{
104+
"field": "NhsNumber",
105+
"operator": "=",
106+
"value": nhs_number,
107+
}
108+
)
109+
110+
if uploader:
111+
conditions.append(
112+
{
113+
"field": "Author",
114+
"operator": "=",
115+
"value": uploader,
116+
}
117+
)
118+
119+
return build_mixed_condition_expression(conditions)
120+
92121
def get_document(
93122
self, document_id: str, version: int | None
94123
) -> DocumentUploadReviewReference | None:

lambdas/services/search_document_review_service.py

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
import base64
2-
import decimal
3-
import json
4-
51
from enums.lambda_error import LambdaError
62
from pydantic import ValidationError
73
from services.document_upload_review_service import DocumentUploadReviewService
@@ -20,14 +16,13 @@ def process_request(
2016
self, ods_code: str, params: dict
2117
) -> tuple[list[str], str | None]:
2218
try:
23-
24-
decoded_start_key = self.decode_start_key(params.get("nextPageToken", None))
19+
start_key = params.get("nextPageToken", None)
2520

2621
str_limit = params.get("limit", self.document_service.DEFAULT_QUERY_LIMIT)
2722
limit = int(str_limit)
2823

2924
references, last_evaluated_key = self.get_review_document_references(
30-
start_key=decoded_start_key,
25+
start_key=start_key,
3126
ods_code=ods_code,
3227
limit=limit,
3328
nhs_number=params.get("nhsNumber", None),
@@ -50,9 +45,7 @@ def process_request(
5045
for reference in references
5146
]
5247

53-
encoded_exclusive_start_key = self.encode_start_key(last_evaluated_key)
54-
55-
return output_refs, encoded_exclusive_start_key
48+
return output_refs, last_evaluated_key
5649

5750
except ValidationError as e:
5851
logger.error(e)
@@ -69,33 +62,14 @@ def get_review_document_references(
6962
self,
7063
ods_code: str,
7164
limit: int | None = None,
72-
start_key: dict | None = None,
65+
start_key: str | None = None,
7366
nhs_number: str | None = None,
7467
uploader: str | None = None,
7568
):
76-
return self.document_service.query_docs_pending_review_by_custodian_with_limit(
69+
return self.document_service.query_docs_pending_review_with_paginator(
7770
ods_code=ods_code,
7871
limit=limit,
7972
start_key=start_key,
8073
nhs_number=nhs_number,
8174
uploader=uploader,
8275
)
83-
84-
def decode_start_key(self, encoded_start_key: str | None) -> dict[str, str] | None:
85-
return (
86-
json.loads(
87-
base64.b64decode(encoded_start_key.encode("ascii")).decode("utf-8")
88-
)
89-
if encoded_start_key
90-
else None
91-
)
92-
93-
def encode_start_key(self, start_key: dict) -> str | None:
94-
if start_key:
95-
for key, value in start_key.items():
96-
if isinstance(value, decimal.Decimal):
97-
start_key[key] = int(value)
98-
return base64.b64encode(json.dumps(start_key).encode("ascii")).decode(
99-
"utf-8"
100-
)
101-
return None

0 commit comments

Comments
 (0)