Skip to content

Commit 86f0a0c

Browse files
authored
VED-755: Handling NHS Number confusions (#816)
1 parent 13262ce commit 86f0a0c

File tree

8 files changed

+702
-323
lines changed

8 files changed

+702
-323
lines changed

lambdas/id_sync/README.md

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,4 @@
2828

2929
- Code is located in the `lambdas/id_sync/src/` directory.
3030
- Unit tests are in the `lambdas/id_sync/tests/` directory.
31-
- Use the provided Makefile and Dockerfile for building, testing, and packaging.
32-
33-
## License
34-
35-
This project is maintained by NHS. See [LICENSE](../LICENSE) for details.
31+
- Use the provided Makefile and Dockerfile for building, testing, and packaging.

lambdas/id_sync/src/id_sync.py

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,50 @@
1-
from common.clients import logger
2-
from common.clients import STREAM_NAME
1+
"""
2+
- Parses the incoming AWS event into `AwsLambdaEvent` and iterate its `records`.
3+
- Delegate each record to `process_record` and collect `nhs_number` from each result.
4+
- If any record has status == "error" raise `IdSyncException` with aggregated nhs_numbers.
5+
- Any unexpected error is wrapped into `IdSyncException(message="Error processing id_sync event")`.
6+
"""
7+
8+
from typing import Any, Dict
9+
from common.clients import logger, STREAM_NAME
310
from common.log_decorator import logging_decorator
411
from common.aws_lambda_event import AwsLambdaEvent
512
from exceptions.id_sync_exception import IdSyncException
613
from record_processor import process_record
7-
'''
8-
Lambda function handler for processing SQS events.Lambda for ID Sync. Fired by SQS
9-
'''
1014

1115

1216
@logging_decorator(prefix="id_sync", stream_name=STREAM_NAME)
13-
def handler(event_data, _):
14-
17+
def handler(event_data: Dict[str, Any], _context) -> Dict[str, Any]:
1518
try:
16-
logger.info("id_sync handler invoked")
1719
event = AwsLambdaEvent(event_data)
18-
record_count = len(event.records)
19-
if record_count > 0:
20-
logger.info("id_sync processing event with %d records", record_count)
21-
error_count = 0
22-
nhs_numbers = []
23-
for record in event.records:
24-
record_result = process_record(record)
25-
nhs_numbers.append(record_result["nhs_number"])
26-
if record_result["status"] == "error":
27-
error_count += 1
28-
if error_count > 0:
29-
raise IdSyncException(message=f"Processed {record_count} records with {error_count} errors",
30-
nhs_numbers=nhs_numbers)
31-
32-
else:
33-
response = {"status": "success",
34-
"message": f"Successfully processed {record_count} records",
35-
"nhs_numbers": nhs_numbers}
36-
else:
37-
response = {"status": "success", "message": "No records found in event"}
20+
records = event.records
21+
22+
if not records:
23+
return {"status": "success", "message": "No records found in event"}
24+
25+
logger.info("id_sync processing event with %d records", len(records))
26+
27+
results = [process_record(record) for record in records]
28+
nhs_numbers = [result["nhs_number"] for result in results]
29+
error_count = sum(1 for result in results if result.get("status") == "error")
30+
31+
if error_count:
32+
raise IdSyncException(message=f"Processed {len(records)} records with {error_count} errors",
33+
nhs_numbers=nhs_numbers)
34+
35+
response = {
36+
"status": "success",
37+
"message": f"Successfully processed {len(records)} records",
38+
"nhs_numbers": nhs_numbers
39+
}
40+
3841
logger.info("id_sync handler completed: %s", response)
3942
return response
43+
4044
except IdSyncException as e:
4145
logger.exception(f"id_sync error: {e.message}")
42-
raise e
43-
except Exception as e:
46+
raise
47+
except Exception:
4448
msg = "Error processing id_sync event"
4549
logger.exception(msg)
46-
raise IdSyncException(message=msg, exception=e)
50+
raise IdSyncException(message=msg)

lambdas/id_sync/src/ieds_db_operations.py

Lines changed: 143 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
from os_vars import get_ieds_table_name
33
from common.aws_dynamodb import get_dynamodb_table
44
from common.clients import logger, dynamodb_client
5+
from utils import make_status
56
from exceptions.id_sync_exception import IdSyncException
67

78
ieds_table = None
9+
BATCH_SIZE = 25 # DynamoDB TransactWriteItems max batch size
810

911

1012
def get_ieds_table():
@@ -16,97 +18,47 @@ def get_ieds_table():
1618
return ieds_table
1719

1820

19-
def ieds_check_exist(id: str) -> bool:
20-
"""Check if a record exists in the IEDS table for the given ID."""
21-
logger.info(f"Check Id exists ID: {id}")
22-
items = get_items_from_patient_id(id, 1)
23-
24-
if items or len(items) > 0:
25-
logger.info(f"Found patient ID: {id}")
26-
return True
27-
return False
28-
29-
30-
BATCH_SIZE = 25
31-
32-
33-
def ieds_update_patient_id(old_id: str, new_id: str) -> dict:
21+
def ieds_update_patient_id(old_id: str, new_id: str, items_to_update: list | None = None) -> dict:
3422
"""Update the patient ID in the IEDS table."""
3523
logger.info(f"ieds_update_patient_id. Update patient ID from {old_id} to {new_id}")
3624
if not old_id or not new_id or not old_id.strip() or not new_id.strip():
37-
return {"status": "error", "message": "Old ID and New ID cannot be empty"}
25+
return make_status("Old ID and New ID cannot be empty", old_id, "error")
3826

3927
if old_id == new_id:
40-
return {"status": "success", "message": f"No change in patient ID: {old_id}"}
28+
return make_status(f"No change in patient ID: {old_id}", old_id)
4129

4230
try:
4331
logger.info(f"Updating patient ID in IEDS from {old_id} to {new_id}")
4432

45-
new_patient_pk = f"Patient#{new_id}"
46-
47-
logger.info("Getting items to update in IEDS table...")
48-
items_to_update = get_items_from_patient_id(old_id)
33+
if items_to_update is None:
34+
logger.info("Getting items to update in IEDS table...")
35+
items_to_update = get_items_from_patient_id(old_id)
36+
else:
37+
logger.info("Using provided items_to_update list, size=%d", len(items_to_update))
4938

5039
if not items_to_update:
5140
logger.warning(f"No items found to update for patient ID: {old_id}")
52-
return {
53-
"status": "success",
54-
"message": f"No items found to update for patient ID: {old_id}"
55-
}
56-
57-
transact_items = []
41+
return make_status(f"No items found to update for patient ID: {old_id}", old_id)
5842

5943
logger.info(f"Items to update: {len(items_to_update)}")
60-
ieds_table_name = get_ieds_table_name()
61-
for item in items_to_update:
62-
transact_items.append({
63-
'Update': {
64-
'TableName': ieds_table_name,
65-
'Key': {
66-
'PK': {'S': item['PK']},
67-
},
68-
'UpdateExpression': 'SET PatientPK = :new_val',
69-
'ExpressionAttributeValues': {
70-
':new_val': {'S': new_patient_pk}
71-
}
72-
}
73-
})
74-
75-
logger.info("Transacting items in IEDS table...")
76-
# success tracking
77-
all_batches_successful = True
78-
total_batches = 0
79-
80-
# Batch transact in chunks of BATCH_SIZE
81-
for i in range(0, len(transact_items), BATCH_SIZE):
82-
batch = transact_items[i:i+BATCH_SIZE]
83-
total_batches += 1
84-
logger.info(f"Transacting batch {total_batches} of size: {len(batch)}")
8544

86-
response = dynamodb_client.transact_write_items(TransactItems=batch)
87-
logger.info("Batch update complete. Response: %s", response)
45+
# Build transact items and execute them in batches via helpers to keep
46+
# the top-level function easy to read and test.
47+
transact_items = build_transact_items(old_id, new_id, items_to_update)
8848

89-
# Check each batch response
90-
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
91-
all_batches_successful = False
92-
logger.error(
93-
f"Batch {total_batches} failed with status: {response['ResponseMetadata']['HTTPStatusCode']}")
49+
all_batches_successful, total_batches = execute_transaction_in_batches(transact_items)
9450

9551
# Consolidated response handling
9652
logger.info(
9753
f"All batches complete. Total batches: {total_batches}, All successful: {all_batches_successful}")
9854

9955
if all_batches_successful:
100-
return {
101-
"status": "success",
102-
"message":
103-
f"IEDS update, patient ID: {old_id}=>{new_id}. {len(items_to_update)} updated {total_batches}."
104-
}
56+
return make_status(
57+
f"IEDS update, patient ID: {old_id}=>{new_id}. {len(items_to_update)} updated {total_batches}.",
58+
old_id,
59+
)
10560
else:
106-
return {
107-
"status": "error",
108-
"message": f"Failed to update some batches for patient ID: {old_id}"
109-
}
61+
return make_status(f"Failed to update some batches for patient ID: {old_id}", old_id, "error")
11062

11163
except Exception as e:
11264
logger.exception("Error updating patient ID")
@@ -118,26 +70,133 @@ def ieds_update_patient_id(old_id: str, new_id: str) -> dict:
11870
)
11971

12072

121-
def get_items_from_patient_id(id: str, limit=BATCH_SIZE) -> list:
122-
"""Get all items for patient ID."""
123-
logger.info(f"Getting items for patient id: {id}")
73+
def get_items_from_patient_id(id: str) -> list:
74+
"""Public wrapper: build PatientPK and return all matching items.
75+
76+
Delegates actual paging to the internal helper `_paginate_items_for_patient_pk`.
77+
Raises IdSyncException on error.
78+
"""
79+
logger.info("Getting items for patient id: %s", id)
12480
patient_pk = f"Patient#{id}"
12581
try:
126-
response = get_ieds_table().query(
127-
IndexName='PatientGSI', # query the GSI
128-
KeyConditionExpression=Key('PatientPK').eq(patient_pk),
129-
Limit=limit
130-
)
131-
132-
if 'Items' not in response or not response['Items']:
133-
logger.warning(f"No items found for patient PK: {patient_pk}")
134-
return []
135-
136-
return response['Items']
82+
return paginate_items_for_patient_pk(patient_pk)
83+
except IdSyncException:
84+
raise
13785
except Exception as e:
138-
logger.exception(f"Error querying items for patient PK: {patient_pk}")
86+
logger.exception("Error querying items for patient PK: %s", patient_pk)
13987
raise IdSyncException(
14088
message=f"Error querying items for patient PK: {patient_pk}",
14189
nhs_numbers=[patient_pk],
142-
exception=e
90+
exception=e,
14391
)
92+
93+
94+
def paginate_items_for_patient_pk(patient_pk: str) -> list:
95+
"""Internal helper that pages through the PatientGSI and returns all items.
96+
97+
Raises IdSyncException when the DynamoDB response is malformed.
98+
"""
99+
all_items: list = []
100+
last_evaluated_key = None
101+
while True:
102+
query_args = {
103+
"IndexName": "PatientGSI",
104+
"KeyConditionExpression": Key('PatientPK').eq(patient_pk),
105+
}
106+
if last_evaluated_key:
107+
query_args["ExclusiveStartKey"] = last_evaluated_key
108+
109+
response = get_ieds_table().query(**query_args)
110+
111+
if "Items" not in response:
112+
# Unexpected DynamoDB response shape - surface as IdSyncException
113+
logger.exception("Unexpected DynamoDB response: missing 'Items'")
114+
raise IdSyncException(
115+
message="No Items in DynamoDB response",
116+
nhs_numbers=[patient_pk],
117+
exception=response,
118+
)
119+
120+
items = response.get("Items", [])
121+
all_items.extend(items)
122+
123+
last_evaluated_key = response.get("LastEvaluatedKey")
124+
if not last_evaluated_key:
125+
break
126+
127+
if not all_items:
128+
logger.warning("No items found for patient PK: %s", patient_pk)
129+
return []
130+
131+
return all_items
132+
133+
134+
def extract_patient_resource_from_item(item: dict) -> dict | None:
135+
"""
136+
Extract a Patient resource dict from an IEDS database.
137+
"""
138+
patient_resource = item.get("Resource", None)
139+
if not isinstance(patient_resource, dict):
140+
return None
141+
142+
for response in patient_resource.get("contained", []):
143+
if isinstance(response, dict) and response.get("resourceType") == "Patient":
144+
return response
145+
146+
return None
147+
148+
149+
def build_transact_items(old_id: str, new_id: str, items_to_update: list) -> list:
150+
"""Construct the list of TransactItems for DynamoDB TransactWriteItems.
151+
152+
Each item uses a conditional expression to ensure PatientPK hasn't changed
153+
since it was read.
154+
"""
155+
transact_items = []
156+
ieds_table_name = get_ieds_table_name()
157+
new_patient_pk = f"Patient#{new_id}"
158+
159+
for item in items_to_update:
160+
old_patient_pk = item.get('PatientPK', f"Patient#{old_id}")
161+
162+
transact_items.append({
163+
'Update': {
164+
'TableName': ieds_table_name,
165+
'Key': {
166+
'PK': {'S': item['PK']},
167+
},
168+
'UpdateExpression': 'SET PatientPK = :new_val',
169+
"ConditionExpression": "PatientPK = :expected_old",
170+
'ExpressionAttributeValues': {
171+
':new_val': {'S': new_patient_pk},
172+
':expected_old': {'S': old_patient_pk}
173+
}
174+
}
175+
})
176+
177+
return transact_items
178+
179+
180+
def execute_transaction_in_batches(transact_items: list) -> tuple:
181+
"""Execute transact write items in batches of BATCH_SIZE.
182+
183+
Returns (all_batches_successful: bool, total_batches: int).
184+
"""
185+
all_batches_successful = True
186+
total_batches = 0
187+
188+
for i in range(0, len(transact_items), BATCH_SIZE):
189+
batch = transact_items[i:i+BATCH_SIZE]
190+
total_batches += 1
191+
logger.info(f"Transacting batch {total_batches} of size: {len(batch)}")
192+
193+
response = dynamodb_client.transact_write_items(TransactItems=batch)
194+
logger.info("Batch update complete. Response: %s", response)
195+
196+
# Check each batch response
197+
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
198+
all_batches_successful = False
199+
logger.error(
200+
f"Batch {total_batches} failed with status: {response['ResponseMetadata']['HTTPStatusCode']}")
201+
202+
return all_batches_successful, total_batches

lambdas/id_sync/src/pds_details.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
safe_tmp_dir = tempfile.mkdtemp(dir="/tmp") # NOSONAR
1414

1515

16+
# Get Patient details from external service PDS using NHS number from MNS notification
1617
def pds_get_patient_details(nhs_number: str) -> dict:
1718
try:
1819
logger.info(f"get patient details. nhs_number: {nhs_number}")
@@ -34,6 +35,7 @@ def pds_get_patient_details(nhs_number: str) -> dict:
3435
raise IdSyncException(message=msg, exception=e)
3536

3637

38+
# Extract Patient identifier value from PDS patient details
3739
def pds_get_patient_id(nhs_number: str) -> str:
3840
"""
3941
Get PDS patient ID from NHS number.
@@ -48,7 +50,6 @@ def pds_get_patient_id(nhs_number: str) -> str:
4850

4951
return patient_details["identifier"][0]["value"]
5052

51-
# ✅ Remove the IdSyncException catch since you're just re-raising
5253
except Exception as e:
5354
msg = f"Error getting PDS patient ID for {nhs_number}"
5455
logger.exception(msg)

0 commit comments

Comments
 (0)