Skip to content

Commit 4437a1b

Browse files
committed
transact write
1 parent 38053ca commit 4437a1b

File tree

5 files changed

+325
-263
lines changed

5 files changed

+325
-263
lines changed

lambdas/id_sync/src/ieds_db_operations.py

Lines changed: 100 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,13 @@ def get_ieds_table():
1818

1919
def ieds_check_exist(id: str) -> bool:
2020
"""Check if a record exists in the IEDS table for the given ID."""
21+
logger.info(f"ieds_check_exist. Get ID: {id}")
2122
search_patient_pk = f"Patient#{id}"
2223

23-
# Only fetch 1 record to check existence
24-
response = get_ieds_table().query(
25-
KeyConditionExpression=Key("PK").eq(search_patient_pk),
26-
Limit=1
27-
)
28-
return response.get("Count", 0) > 0
24+
response = get_ieds_table().get_item(Key={'PatientPk': search_patient_pk})
25+
found = 'Item' in response
26+
logger.info(f"ieds_check_exist. Record found: {found} for ID: {id}")
27+
return found
2928

3029

3130
def ieds_update_patient_id(old_id: str, new_id: str) -> dict:
@@ -38,23 +37,72 @@ def ieds_update_patient_id(old_id: str, new_id: str) -> dict:
3837
return {"status": "success", "message": f"No change in patient ID: {old_id}"}
3938

4039
try:
41-
# Update the table with new id
42-
response = get_ieds_table().update_item(
43-
Key={"PK": f"Patient#{old_id}"},
44-
UpdateExpression="SET PK = :new_id",
45-
ExpressionAttributeValues={":new_id": f"Patient#{new_id}"}
46-
)
40+
logger.info(f"Updating patient ID in IEDS from {old_id} to {new_id}")
41+
ieds_table = get_ieds_table()
42+
new_patient_pk = f"Patient#{new_id}"
43+
old_patient_pk = f"Patient#{old_id}"
4744

48-
# Check update successful
49-
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
45+
logger.info("Getting items to update in IEDS table...")
46+
items_to_update = get_items_to_update(old_patient_pk)
47+
48+
if not items_to_update:
49+
logger.warning(f"No items found to update for patient ID: {old_id}")
5050
return {
5151
"status": "success",
52-
"message": f"Updated IEDS, patient ID: {old_id} to {new_id}"
52+
"message": f"No items found to update for patient ID: {old_id}"
53+
}
54+
55+
transact_items = []
56+
57+
logger.info("loop through items to update...")
58+
logger.info(f"Items to update: {len(items_to_update)}")
59+
for item in items_to_update:
60+
logger.info("Update item")
61+
logger.info(f"Updating item: {item['PatientPK']}")
62+
transact_items.append({
63+
'Update': {
64+
'TableName': get_ieds_table_name(),
65+
'Key': {
66+
'PK': {'S': item['PK']},
67+
},
68+
'UpdateExpression': 'SET PatientPK = :new_val',
69+
'ExpressionAttributeValues': {
70+
':new_val': {'S': new_patient_pk}
71+
}
72+
}
73+
})
74+
75+
logger.info("Transacting items in IEDS table...")
76+
# ✅ Fix: Initialize success tracking
77+
all_batches_successful = True
78+
total_batches = 0
79+
80+
# Batch transact in chunks of 25
81+
for i in range(0, len(transact_items), 25):
82+
batch = transact_items[i:i+25]
83+
total_batches += 1
84+
logger.info(f"Transacting batch {total_batches} of size: {len(batch)}")
85+
86+
response = ieds_table.transact_write_items(TransactItems=batch)
87+
logger.info("Batch update complete. Response: %s", response)
88+
89+
# ✅ Fix: Check each batch response
90+
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
91+
all_batches_successful = False
92+
logger.error(f"Batch {total_batches} failed with status: {response['ResponseMetadata']['HTTPStatusCode']}")
93+
94+
# ✅ Fix: Consolidated response handling outside the loop
95+
logger.info(f"All batches complete. Total batches: {total_batches}, All successful: {all_batches_successful}")
96+
97+
if all_batches_successful:
98+
return {
99+
"status": "success",
100+
"message": f"Updated IEDS, patient ID: {old_id} to {new_id}. {len(items_to_update)} items updated in {total_batches} batches."
53101
}
54102
else:
55103
return {
56104
"status": "error",
57-
"message": f"Failed to update patient ID: {old_id}"
105+
"message": f"Failed to update some batches for patient ID: {old_id}"
58106
}
59107

60108
except Exception as e:
@@ -64,24 +112,39 @@ def ieds_update_patient_id(old_id: str, new_id: str) -> dict:
64112
nhs_numbers=[old_id, new_id],
65113
exception=e
66114
)
67-
raise e
68-
69-
70-
def test_ieds_insert_patient(patient_id: str) -> dict: # NOSONAR
71-
"""Test function for inserting patient ID."""
72-
logger.info("insert to db...")
73-
# write the patient id to table
74-
res = '{"resourceType": "Immunization"}'
75-
result = get_ieds_table().put_item(Item={
76-
"PK": f"Patient#{patient_id}",
77-
"PatientPK": f"Patient#{patient_id}",
78-
"PatientSK": f"Patient#{patient_id}",
79-
"Resource": res,
80-
"IdentifierPK": "https://www.ieds.england.nhs.uk/#a7e06f66-339f-4b81-b2f6-016b88bfc422",
81-
"Operation": "CREATE",
82-
"Version": "1",
83-
"SupplierSystem": "RAVS",
84-
})
85-
86-
logger.info(f"Test result: {result}")
87-
return result
115+
# ✅ Fix: Remove duplicate raise
116+
# raise e # Remove this line
117+
118+
# def test_ieds_insert_patient(patient_id: str) -> dict: # NOSONAR
119+
# """Test function for inserting patient ID."""
120+
# logger.info("insert to db...")
121+
# # write the patient id to table
122+
# res = '{"resourceType": "Immunization"}'
123+
# result = get_ieds_table().put_item(Item={
124+
# "PK": f"Patient#{patient_id}",
125+
# "PatientPK": f"Patient#{patient_id}",
126+
# "PatientSK": f"Patient#{patient_id}",
127+
# "Resource": res,
128+
# "IdentifierPK": "https://www.ieds.england.nhs.uk/#a7e06f66-339f-4b81-b2f6-016b88bfc422",
129+
# "Operation": "CREATE",
130+
# "Version": "1",
131+
# "SupplierSystem": "RAVS",
132+
# })
133+
134+
# logger.info(f"Test result: {result}")
135+
# return result
136+
137+
138+
def get_items_to_update(old_patient_pk: str) -> list:
139+
"""Get items that need to be updated in the IEDS table."""
140+
logger.info(f"Getting items to update for old patient PK: {old_patient_pk}")
141+
response = get_ieds_table().query(
142+
KeyConditionExpression=Key('PatientPK').eq(old_patient_pk),
143+
Limit=25 # Adjust limit as needed
144+
)
145+
146+
if 'Items' not in response or not response['Items']:
147+
logger.warning(f"No items found for old patient PK: {old_patient_pk}")
148+
return []
149+
150+
return response['Items']

lambdas/id_sync/src/junk.json

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
{
2+
"resourceType": "Immunization",
3+
"contained": [
4+
{
5+
"resourceType": "Practitioner",
6+
"id": "Pract1",
7+
"name": [{ "family": "iucds", "given": ["Russell"] }]
8+
},
9+
{
10+
"resourceType": "Patient",
11+
"id": "Pat1",
12+
"identifier": [
13+
{ "system": "https://fhir.nhs.uk/Id/nhs-number", "value": "9726629101" }
14+
],
15+
"name": [{ "family": "VAREY", "given": ["DIANA"] }],
16+
"gender": "unknown",
17+
"birthDate": "2021-10-24",
18+
"address": [{ "postalCode": "DN15 8EZ" }]
19+
}
20+
],
21+
"extension": [
22+
{
23+
"url": "https://fhir.hl7.org.uk/StructureDefinition/Extension-UKCore-VaccinationProcedure",
24+
"valueCodeableConcept": {
25+
"coding": [
26+
{
27+
"system": "http://snomed.info/sct",
28+
"code": "1303503001",
29+
"display": "Administration of vaccine product containing only Human orthopneumovirus antigen (procedure)"
30+
}
31+
]
32+
}
33+
}
34+
],
35+
"identifier": [
36+
{
37+
"system": "https://www.ieds.england.nhs.uk/",
38+
"value": "a7e06f66-339f-4b81-b2f6-016b88bfc422"
39+
}
40+
],
41+
"status": "completed",
42+
"vaccineCode": {
43+
"coding": [
44+
{
45+
"system": "http://snomed.info/sct",
46+
"code": "42605811000001100",
47+
"display": "Abrysvo vaccine powder and solvent for solution for injection 0.5ml vials (Pfizer Ltd) (product)"
48+
}
49+
]
50+
},
51+
"patient": { "reference": "#Pat1" },
52+
"occurrenceDateTime": "2025-05-27T14:53:26.271+00:00",
53+
"recorded": "2025-07-10T11:25:56.000+00:00",
54+
"primarySource": true,
55+
"manufacturer": { "display": "Pfizer" },
56+
"location": {
57+
"identifier": {
58+
"value": "X8E5B",
59+
"system": "https://fhir.nhs.uk/Id/ods-organization-code"
60+
}
61+
},
62+
"lotNumber": "RSVAPITEST",
63+
"expirationDate": "2025-06-01",
64+
"site": {
65+
"coding": [
66+
{
67+
"system": "http://snomed.info/sct",
68+
"code": "368208006",
69+
"display": "Left upper arm structure (body structure)"
70+
}
71+
]
72+
},
73+
"route": {
74+
"coding": [
75+
{
76+
"system": "http://snomed.info/sct",
77+
"code": "78421000",
78+
"display": "Intramuscular route (qualifier value)"
79+
}
80+
]
81+
},
82+
"doseQuantity": {
83+
"value": 0.5,
84+
"unit": "milliliter",
85+
"system": "http://unitsofmeasure.org",
86+
"code": "ml"
87+
},
88+
"performer": [
89+
{ "actor": { "reference": "#Pract1" } },
90+
{
91+
"actor": {
92+
"type": "Organization",
93+
"display": "UNIVERSITY HOSPITAL OF WALES",
94+
"identifier": {
95+
"system": "https://fhir.nhs.uk/Id/ods-organization-code",
96+
"value": "B0C4P"
97+
}
98+
}
99+
}
100+
],
101+
"reasonCode": [
102+
{
103+
"coding": [
104+
{
105+
"code": "443684005",
106+
"display": "Disease outbreak (event)",
107+
"system": "http://snomed.info/sct"
108+
}
109+
]
110+
}
111+
],
112+
"protocolApplied": [
113+
{
114+
"targetDisease": [
115+
{
116+
"coding": [
117+
{
118+
"system": "http://snomed.info/sct",
119+
"code": "55735004",
120+
"display": "Disease caused by severe acute respiratory syndrome coronavirus 2 (disorder)"
121+
}
122+
]
123+
}
124+
],
125+
"doseNumberPositiveInt": 11
126+
}
127+
],
128+
"id": "088414ed-7d3d-40a5-9968-b3d2b9a266b2"
129+
}

lambdas/id_sync/src/record_processor.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from common.clients import logger
55
from typing import Optional
66
from pds_details import pds_get_patient_id
7-
from ieds_db_operations import ieds_check_exist, ieds_update_patient_id, test_ieds_insert_patient
7+
from ieds_db_operations import ieds_check_exist, ieds_update_patient_id
88
import json
99
import ast
1010

@@ -47,8 +47,9 @@ def process_nhs_number(nhs_number: str) -> Optional[str]:
4747
# if patient NHS != id, update patient index of vax events to new number
4848
if patient_details_id != nhs_number and patient_details_id:
4949
logger.info(f"process_nhs_number. Update patient ID from {nhs_number} to {patient_details_id}")
50-
test_ieds_insert_patient(patient_details_id) # NOSONAR
50+
# test_ieds_insert_patient(patient_details_id) # NOSONAR
5151
if ieds_check_exist(patient_details_id):
52+
logger.info("process_nhs_number. IEDS record found, updating patient ID")
5253
response = ieds_update_patient_id(patient_details_id, nhs_number)
5354
else:
5455
logger.info("process_nhs_number. No ieds record found for: %s", nhs_number)

0 commit comments

Comments
 (0)