Skip to content

Commit be6950a

Browse files
[PRMP-781] Fixed Values Metadata Processor (#908)
Co-authored-by: Robert Gaskin <[email protected]>
1 parent 97b3e58 commit be6950a

File tree

6 files changed

+457
-12
lines changed

6 files changed

+457
-12
lines changed

lambdas/handlers/bulk_upload_metadata_processor_handler.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
BulkUploadMetadataProcessorService,
44
get_formatter_service,
55
)
6+
from services.metadata_mapping_validator_service import MetadataMappingValidatorService
67
from utils.audit_logging_setup import LoggingService
78
from utils.decorators.ensure_env_var import ensure_environment_variables
89
from utils.decorators.handle_lambda_exceptions import handle_lambda_exceptions
910
from utils.decorators.override_error_check import override_error_check
1011
from utils.decorators.set_audit_arg import set_request_context_for_logging
12+
from utils.exceptions import BulkUploadMetadataException
1113

1214
logger = LoggingService(__name__)
1315

@@ -48,4 +50,16 @@ def lambda_handler(event, _context):
4850
f"Starting metadata processing for practice directory: {practice_directory}"
4951
)
5052

53+
fixed_values = event.get("fixedValues", {})
54+
55+
validator_service = MetadataMappingValidatorService()
56+
validator_service.validate_fixed_values(
57+
fixed_values, remappings)
58+
59+
metadata_formatter_service = formatter_service_class(practice_directory)
60+
metadata_service = BulkUploadMetadataProcessorService(
61+
metadata_formatter_service=metadata_formatter_service,
62+
metadata_heading_remap=remappings,
63+
fixed_values=fixed_values
64+
)
5165
metadata_service.process_metadata()

lambdas/services/bulk_upload_metadata_processor_service.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def __init__(
5656
self,
5757
metadata_formatter_service: MetadataPreprocessorService,
5858
metadata_heading_remap: dict,
59+
fixed_values: dict = None,
5960
):
6061
self.staging_bucket_name = os.getenv("STAGING_STORE_BUCKET_NAME")
6162
self.metadata_queue_url = os.getenv("METADATA_SQS_QUEUE_URL")
@@ -67,6 +68,7 @@ def __init__(
6768
self.virus_scan_service = get_virus_scan_service()
6869

6970
self.metadata_heading_remap = metadata_heading_remap
71+
self.fixed_values = fixed_values or {}
7072

7173
self.temp_download_dir = tempfile.mkdtemp()
7274
self.practice_directory = metadata_formatter_service.practice_directory
@@ -148,10 +150,10 @@ def csv_to_sqs_metadata(self, csv_file_path: str) -> list[StagingSqsMetadata]:
148150

149151
validated_rows, rejected_rows, rejected_reasons = (
150152
self.metadata_mapping_validator_service.validate_and_normalize_metadata(
151-
records, self.metadata_heading_remap
153+
records, self.fixed_values, self.metadata_heading_remap
152154
)
153155
)
154-
if rejected_reasons:
156+
if rejected_reasons:
155157
for reason in rejected_reasons:
156158
logger.warning(f"Rejected due to: {reason['REASON']}")
157159

@@ -177,6 +179,10 @@ def process_metadata_row(
177179
) -> None:
178180
"""Validate individual file metadata and attach to patient group."""
179181
file_metadata = MetadataFile.model_validate(row)
182+
183+
if self.fixed_values:
184+
file_metadata = self.apply_fixed_values(file_metadata)
185+
180186
nhs_number, ods_code = self.extract_patient_info(file_metadata)
181187

182188
try:
@@ -191,6 +197,17 @@ def process_metadata_row(
191197
sqs_metadata.file_path.lstrip("/"))
192198
patients[(nhs_number, ods_code)].append(sqs_metadata)
193199

200+
def apply_fixed_values(self, file_metadata: MetadataFile) -> MetadataFile:
201+
202+
metadata_dict = file_metadata.model_dump(by_alias=True)
203+
204+
for field_name, fixed_value in self.fixed_values.items():
205+
metadata_dict[field_name] = fixed_value
206+
logger.info(
207+
f"Applied fixed value for field '{field_name}': '{fixed_value}'")
208+
209+
return MetadataFile.model_validate(metadata_dict)
210+
194211
@staticmethod
195212
def convert_to_sqs_metadata(
196213
file: MetadataFile, stored_file_name: str

lambdas/services/bulk_upload_service.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@ def process_message_queue(self, records: list):
7474
logger.info(
7575
"Cannot validate patient due to PDS responded with Too Many Requests"
7676
)
77-
logger.info("Cannot process for now due to PDS rate limit reached.")
77+
logger.info(
78+
"Cannot process for now due to PDS rate limit reached."
79+
)
7880
logger.info(
7981
"All remaining messages in this batch will be returned to sqs queue to retry later."
8082
)

lambdas/services/metadata_mapping_validator_service.py

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@
33
from models.staging_metadata import MetadataFile
44
from pydantic import BaseModel, ConfigDict, Field, create_model
55
from utils.audit_logging_setup import LoggingService
6+
from utils.exceptions import BulkUploadMetadataException
67

78
logger = LoggingService(__name__)
89

910

1011
class MetadataMappingValidatorService:
1112
model_aliases = {f.alias for f in MetadataFile.model_fields.values() if f.alias}
13+
# Fields that should never be set as fixed by the user
14+
PROTECTED_FIELDS = {"FILEPATH", "NHS-NO"}
1215

1316
def create_metadata_model(self, alias_map) -> Type[BaseModel]:
1417
if not alias_map:
@@ -42,7 +45,7 @@ def create_metadata_model(self, alias_map) -> Type[BaseModel]:
4245
)
4346
return dynamic_model
4447

45-
def validate_and_normalize_metadata(self, records: list[dict], remappings: dict):
48+
def validate_and_normalize_metadata(self, records: list[dict],fixed_values: dict, remappings: dict):
4649
model = self.create_metadata_model(remappings)
4750
validated_rows, rejected_rows, rejected_reasons = [], [], []
4851
required_fields = [
@@ -53,8 +56,12 @@ def validate_and_normalize_metadata(self, records: list[dict], remappings: dict)
5356

5457
for row in records:
5558
try:
56-
instance = model.model_validate(row)
59+
# Merge fixed values into the row before validation to account for these.
60+
row_with_fixed = {**row, **fixed_values} if fixed_values else row
61+
62+
instance = model.model_validate(row_with_fixed)
5763
data = instance.model_dump(by_alias=False)
64+
5865
empty_required_fields = self.get_empty_required_fields(
5966
data, required_fields
6067
)
@@ -82,3 +89,87 @@ def get_empty_required_fields(
8289
if data.get(field) is None
8390
or (isinstance(data.get(field), str) and not data.get(field).strip())
8491
]
92+
93+
def validate_fixed_values(
94+
self, fixed_values: dict, remappings: dict
95+
) -> tuple[bool, list[str]]:
96+
97+
if not fixed_values:
98+
logger.info("No fixed values provided")
99+
return True, []
100+
101+
errors = []
102+
103+
errors.extend(self.check_for_protected_fields(fixed_values))
104+
105+
errors.extend(self.check_for_remapped_field_names(fixed_values, remappings))
106+
107+
errors.extend(self.check_for_remapping_conflicts(fixed_values, remappings))
108+
109+
errors.extend(self.check_for_valid_aliases(fixed_values, remappings))
110+
111+
if errors:
112+
logger.error(f"Fixed values validation failed: {errors}")
113+
raise BulkUploadMetadataException(errors)
114+
else:
115+
logger.info("Fixed values validation passed")
116+
117+
return len(errors) == 0, errors
118+
119+
def check_for_protected_fields(self, fixed_values: dict) -> list[str]:
120+
errors = []
121+
protected_in_fixed = set(fixed_values.keys()) & self.PROTECTED_FIELDS
122+
123+
if protected_in_fixed:
124+
protected_list = ", ".join(sorted(protected_in_fixed))
125+
errors.append(
126+
f"Protected fields cannot have fixed values: {protected_list}. "
127+
f"These are critical identifiers that must come from the source data."
128+
)
129+
return errors
130+
131+
def check_for_remapped_field_names(
132+
self, fixed_values: dict, remappings: dict
133+
) -> list[str]:
134+
errors = []
135+
remapped_values = set(remappings.values())
136+
137+
for field_name in fixed_values.keys():
138+
if field_name in remapped_values:
139+
errors.append(
140+
f"Fixed value field '{field_name}' is a remapped value."
141+
f"Use the original field name instead."
142+
)
143+
return errors
144+
145+
def check_for_remapping_conflicts(
146+
self, fixed_values: dict, remappings: dict
147+
) -> list[str]:
148+
errors = []
149+
conflicting_fields = set(fixed_values.keys()) & set(remappings.keys())
150+
151+
if conflicting_fields:
152+
conflicting_list = ", ".join(sorted(conflicting_fields))
153+
errors.append(
154+
f"Fixed values cannot be applied to remapped fields: {conflicting_list}"
155+
)
156+
return errors
157+
158+
def check_for_valid_aliases(
159+
self, fixed_values: dict, remappings: dict
160+
) -> list[str]:
161+
errors = []
162+
remapped_values = set(remappings.values())
163+
164+
for field_name in fixed_values.keys():
165+
if field_name not in remapped_values and field_name not in self.model_aliases:
166+
errors.append(
167+
f"Fixed value field '{field_name}' is not a valid metadata field alias. "
168+
f"Valid aliases are: {', '.join(sorted(self.model_aliases))}"
169+
)
170+
return errors
171+
172+
173+
174+
175+

lambdas/tests/unit/services/test_bulk_upload_metadata_processor_service.py

Lines changed: 100 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ def test_duplicates_csv_to_sqs_metadata(mocker, bulk_upload_service):
377377
mocker.patch.object(
378378
bulk_upload_service.metadata_mapping_validator_service,
379379
"validate_and_normalize_metadata",
380-
side_effect=lambda records, source_name: (records, [], []),
380+
side_effect=lambda records, fixed_values, remappings: (records, [], []),
381381
)
382382

383383
actual = bulk_upload_service.csv_to_sqs_metadata("fake/path.csv")
@@ -725,7 +725,7 @@ def test_csv_to_sqs_metadata_happy_path(mocker, bulk_upload_service, mock_csv_co
725725
mocker.patch.object(
726726
bulk_upload_service.metadata_mapping_validator_service,
727727
"validate_and_normalize_metadata",
728-
side_effect=lambda records, src: (records, [], []),
728+
side_effect=lambda records, fixed_values, remappings: (records, [], []),
729729
)
730730

731731
mock_process_metadata_row = mocker.patch.object(
@@ -784,7 +784,7 @@ def test_csv_to_sqs_metadata_groups_patients_correctly(mocker, bulk_upload_servi
784784
mocker.patch.object(
785785
bulk_upload_service.metadata_mapping_validator_service,
786786
"validate_and_normalize_metadata",
787-
side_effect=lambda records, src: (records, [], []),
787+
side_effect=lambda records, fixed_values, remappings: (records, [], []),
788788
)
789789

790790
mocker.patch.object(
@@ -956,7 +956,6 @@ def test_no_remapping_logic(
956956
)
957957
]
958958

959-
960959
@freeze_time("2025-02-03T10:00:00")
961960
def test_validate_expedite_file_happy_path_returns_expected_tuple(test_service):
962961
ods_code = "A12345"
@@ -1162,3 +1161,100 @@ def test_check_file_status_logs_issue_when_not_clean(mocker, test_service, caplo
11621161
f"Found an issue with the file {file_key}." in record.msg
11631162
for record in caplog.records
11641163
)
1164+
def test_apply_fixed_values_no_fixed_values(test_service, base_metadata_file):
1165+
result = test_service.apply_fixed_values(base_metadata_file)
1166+
1167+
assert result == base_metadata_file
1168+
1169+
1170+
def test_apply_fixed_values_single_field(mocker, base_metadata_file):
1171+
service = BulkUploadMetadataProcessorService(
1172+
metadata_formatter_service=MockMetadataPreprocessorService(
1173+
practice_directory="test_practice_directory"
1174+
),
1175+
metadata_heading_remap={},
1176+
fixed_values={"SECTION": "AR"},
1177+
)
1178+
mocker.patch.object(service, "s3_service")
1179+
1180+
result = service.apply_fixed_values(base_metadata_file)
1181+
1182+
assert result.section == "AR"
1183+
assert result.nhs_number == base_metadata_file.nhs_number
1184+
assert result.gp_practice_code == base_metadata_file.gp_practice_code
1185+
1186+
1187+
def test_apply_fixed_values_multiple_fields(mocker, base_metadata_file):
1188+
service = BulkUploadMetadataProcessorService(
1189+
metadata_formatter_service=MockMetadataPreprocessorService(
1190+
practice_directory="test_practice_directory"
1191+
),
1192+
metadata_heading_remap={},
1193+
fixed_values={
1194+
"SECTION": "AR",
1195+
"SUB-SECTION": "Mental Health",
1196+
"SCAN-ID": "FIXED_SCAN_ID",
1197+
},
1198+
)
1199+
mocker.patch.object(service, "s3_service")
1200+
1201+
result = service.apply_fixed_values(base_metadata_file)
1202+
1203+
assert result.section == "AR"
1204+
assert result.sub_section == "Mental Health"
1205+
assert result.scan_id == "FIXED_SCAN_ID"
1206+
assert result.nhs_number == base_metadata_file.nhs_number
1207+
1208+
1209+
def test_apply_fixed_values_overwrites_existing_value(mocker, base_metadata_file):
1210+
original_section = base_metadata_file.section
1211+
assert original_section == "LG"
1212+
1213+
service = BulkUploadMetadataProcessorService(
1214+
metadata_formatter_service=MockMetadataPreprocessorService(
1215+
practice_directory="test_practice_directory"
1216+
),
1217+
metadata_heading_remap={},
1218+
fixed_values={"SECTION": "AR"},
1219+
)
1220+
mocker.patch.object(service, "s3_service")
1221+
1222+
result = service.apply_fixed_values(base_metadata_file)
1223+
1224+
assert result.section == "AR"
1225+
assert result.section != original_section
1226+
1227+
1228+
def test_apply_fixed_values_logs_applied_values(mocker, base_metadata_file, caplog):
1229+
service = BulkUploadMetadataProcessorService(
1230+
metadata_formatter_service=MockMetadataPreprocessorService(
1231+
practice_directory="test_practice_directory"
1232+
),
1233+
metadata_heading_remap={},
1234+
fixed_values={"SECTION": "AR", "SCAN-ID": "TEST_ID"},
1235+
)
1236+
mocker.patch.object(service, "s3_service")
1237+
1238+
service.apply_fixed_values(base_metadata_file)
1239+
1240+
log_messages = [record.message for record in caplog.records]
1241+
assert any("Applied fixed value for field 'SECTION': 'AR'" in msg for msg in log_messages)
1242+
assert any("Applied fixed value for field 'SCAN-ID': 'TEST_ID'" in msg for msg in log_messages)
1243+
1244+
1245+
def test_apply_fixed_values_returns_valid_metadata_file(mocker, base_metadata_file):
1246+
service = BulkUploadMetadataProcessorService(
1247+
metadata_formatter_service=MockMetadataPreprocessorService(
1248+
practice_directory="test_practice_directory"
1249+
),
1250+
metadata_heading_remap={},
1251+
fixed_values={"SECTION": "AR"},
1252+
)
1253+
mocker.patch.object(service, "s3_service")
1254+
1255+
result = service.apply_fixed_values(base_metadata_file)
1256+
1257+
assert isinstance(result, MetadataFile)
1258+
# Ensure it can be validated again
1259+
validated = MetadataFile.model_validate(result.model_dump(by_alias=True))
1260+
assert validated.section == "AR"

0 commit comments

Comments
 (0)