Skip to content

Commit 21d0ea6

Browse files
committed
wip
1 parent 08bd14c commit 21d0ea6

21 files changed

+832
-175
lines changed

delta_backend/src/delta.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from log_firehose import FirehoseLogger
1616

1717
failure_queue_url = os.environ["AWS_SQS_QUEUE_URL"]
18+
ieds_table_name = os.environ["IEDS_TABLE_NAME"]
1819
delta_table_name = os.environ["DELTA_TABLE_NAME"]
1920
delta_source = os.environ["SOURCE"]
2021
region_name = "eu-west-2"

lambdas/id_sync/src/clients.py

Lines changed: 0 additions & 4 deletions
This file was deleted.

lambdas/id_sync/src/id_updater.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from os_vars import get_ieds_table_name, get_delta_table_name
2+
3+
4+
ieds_table_name = get_ieds_table_name()
5+
delta_table_name = get_delta_table_name()
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from boto3.dynamodb.conditions import Key
2+
from os_vars import get_ieds_table_name
3+
from common.clients import get_delta_table
4+
5+
ieds_table = None
6+
7+
8+
def get_ieds_table():
9+
"""Get the IEDS table."""
10+
global ieds_table
11+
if ieds_table is None:
12+
ieds_tablename = get_ieds_table_name()
13+
ieds_table = get_delta_table(ieds_tablename)
14+
return ieds_table
15+
16+
17+
def check_record_exist_in_IEDS(id: str) -> bool:
18+
"""Check if a record exists in the IEDS table for the given ID."""
19+
search_patient_pk = f"Patient#{id}"
20+
21+
# Only fetch 1 record to check existence
22+
response = get_ieds_table().query(
23+
KeyConditionExpression=Key("PK").eq(search_patient_pk),
24+
Limit=1
25+
)
26+
return response.get("Count", 0) > 0
27+
28+
29+
def update_patient_id_in_IEDS(old_id: str, new_id: str) -> dict:
30+
"""Update the patient ID in the IEDS table."""
31+
# check if old_id and new_id are not empty
32+
if not old_id or not new_id:
33+
return {"status": "error", "message": "Old ID and New ID cannot be empty"}
34+
else:
35+
# update the table with new id
36+
get_ieds_table().update_item(
37+
Key={"PK": f"Patient#{old_id}"},
38+
UpdateExpression="SET PK = :new_id",
39+
ExpressionAttributeValues={":new_id": f"Patient#{new_id}"}
40+
)
41+
return {"status": "success", "message": f"Updated IEDS, patient ID: {old_id} to {new_id}"}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
'''
2+
nhs number Processor
3+
'''
4+
from common.clients import logger
5+
from pds_details import get_pds_patient_details
6+
from typing import Optional
7+
from ieds_db_operations import check_record_exist_in_IEDS, update_patient_id_in_IEDS
8+
9+
10+
def process_nhs_number(nhs_number: str) -> Optional[str]:
11+
# get patient details from PDS
12+
patient_details = get_pds_patient_details(nhs_number)
13+
if not patient_details:
14+
return {"status": "error", "message": f"No records returned for ID: {nhs_number}"}
15+
16+
patient_details_id = patient_details.get("id")
17+
18+
# if patient NHS != id, update patient index of vax events to new number
19+
if patient_details_id != nhs_number:
20+
if check_record_exist_in_IEDS(patient_details_id):
21+
logger.info("Updating patient ID from %s to %s", id, patient_details_id)
22+
return update_patient_id_in_IEDS(nhs_number, patient_details_id)
23+
else:
24+
return {"status": "error", "message": f"No records returned for ID: {nhs_number}"}
25+
else:
26+
return {"status": "success", "message": "No update required"}

lambdas/id_sync/src/os_vars.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# os_vars.py - Improved approach (✅ Better)
2+
import os
3+
from typing import Optional
4+
5+
6+
def get_ieds_table_name() -> str:
7+
"""Get the IEDS table name from environment variables."""
8+
return os.environ["IEDS_TABLE_NAME"]
9+
10+
11+
def get_delta_table_name() -> str:
12+
"""Get the Delta table name from environment variables."""
13+
return os.environ["DELTA_TABLE_NAME"]
14+
15+
16+
def get_pds_env() -> str:
17+
"""Get the PDS environment from environment variables."""
18+
return os.getenv("PDS_ENV", "int")
19+
20+
21+
# Optional: Cached versions for performance
22+
_ieds_table_name: Optional[str] = None
23+
_delta_table_name: Optional[str] = None
24+
_pds_env: Optional[str] = None
25+
26+
27+
def get_ieds_table_name_cached() -> str:
28+
"""Get the IEDS table name (cached version)."""
29+
global _ieds_table_name
30+
if _ieds_table_name is None:
31+
_ieds_table_name = os.environ["IEDS_TABLE_NAME"]
32+
return _ieds_table_name
33+
34+
35+
def get_delta_table_name_cached() -> str:
36+
"""Get the Delta table name (cached version)."""
37+
global _delta_table_name
38+
if _delta_table_name is None:
39+
_delta_table_name = os.environ["DELTA_TABLE_NAME"]
40+
return _delta_table_name
41+
42+
43+
def get_pds_env_cached() -> str:
44+
"""Get the PDS environment (cached version)."""
45+
global _pds_env
46+
if _pds_env is None:
47+
_pds_env = os.getenv("PDS_ENV", "int")
48+
return _pds_env

lambdas/id_sync/src/pds_details.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
'''
44
from common.clients import logger, secrets_manager_client
55
from common.cache import Cache
6-
from clients import pds_env
6+
from os_vars import get_pds_env
77
from common.pds_service import PdsService
88
from common.authentication import AppRestrictedAuth, Service
99

10+
pds_env = get_pds_env()
11+
1012

1113
def get_pds_patient_details(nhs_number: str) -> dict:
1214
try:

lambdas/id_sync/src/record_processor.py

Lines changed: 8 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,21 @@
11
'''
22
record Processor
33
'''
4-
from common.aws_lambda_sqs_event_record import AwsLambdaSqsEventRecord
54
from common.clients import logger
6-
from pds_details import get_pds_patient_details
75
import json
86
from typing import Optional
9-
from to_do_code import check_records_exist, update_patient_index
7+
from nhs_number_processor import process_nhs_number
108

119

12-
def process_record(event_record: AwsLambdaSqsEventRecord):
13-
record = AwsLambdaSqsEventRecord(event_record) if isinstance(event_record, dict) else event_record
14-
logger.info("Processing record: %s", record)
10+
def process_record(event_record):
1511

16-
id = get_id(event_record.body)
17-
18-
if id:
19-
# TODO This code is a placeholder for checking if records exist in the database - defaulting to True for now
20-
exists = check_records_exist(id)
21-
22-
if exists:
23-
# get patient details from PDS
24-
patient_details = get_pds_patient_details(id)
25-
if not patient_details:
26-
return {"status": "error", "message": f"No records returned for ID: {id}"}
27-
28-
patient_details_id = patient_details.get("id")
29-
30-
# if patient NHS != id, update patient index of vax events to new number
31-
if patient_details_id != id:
32-
return update_patient_index(id, patient_details_id)
33-
else:
34-
return {"status": "success", "message": "No update required"}
35-
else:
36-
return {"status": "error", "message": f"No records found for ID: {id}"}
12+
logger.info("Processing record: %s", event_record)
13+
body = event_record.get('body', {})
14+
nhs_number = get_id(body)
15+
if nhs_number:
16+
return process_nhs_number(nhs_number)
3717
else:
38-
return {"status": "error", "message": "No ID found in event record"}
18+
return {"status": "error", "message": "No NHS number found in event record"}
3919

4020

4121
def get_id(event_body) -> Optional[str]:

lambdas/id_sync/src/to_do_code.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,28 @@
11
'''
22
record Processor
33
'''
4+
# from common.clients import get_delta_table
5+
from boto3.dynamodb.conditions import Key
6+
from os_vars import get_ieds_table_name, get_delta_table_name
7+
from common.clients import get_delta_table
48

9+
ieds_tablename = get_ieds_table_name()
10+
delta_tablename = get_delta_table_name()
511

6-
def check_records_exist(id: str) -> bool:
7-
# TODO: Implement logic to check if records exist in the database
8-
return True
912

13+
def check_records_exist(dynamodb_table, id: str) -> bool:
14+
# dynamodb query to check that records exist for the given ID
15+
search_patient_pk = f"Patient#{id}"
16+
response = dynamodb_table.query(
17+
KeyConditionExpression=Key("PK").eq(search_patient_pk)
18+
)
19+
return response.get("Count", 0) > 0
1020

11-
def update_patient_index(old_id: str, new_id: str):
12-
# TODO: Implement logic to update patient index in Redis or other data store
13-
return {"status": "success", "message": f"Updated patient idx from {old_id} to {new_id}", "TODO": "Implement logic"}
21+
22+
def update_patient_id(old_id: str, new_id: str):
23+
24+
# check if old_id and new_id are not empty
25+
if not old_id or not new_id:
26+
return {"status": "error", "message": "Old ID and New ID cannot be empty"}
27+
else:
28+
return {"status": "success", "message": f"Updated patient {old_id} to {new_id}", "TODO": "Implement logic"}

0 commit comments

Comments
 (0)