Skip to content

Commit b08c49f

Browse files
committed
add more expression rules and fields
1 parent 9d3ea26 commit b08c49f

File tree

8 files changed

+196
-78
lines changed

8 files changed

+196
-78
lines changed

backend/src/models/fhir_immunization_pre_validators.py

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,9 @@ def validate(self):
9696
self.pre_validate_value_codeable_concept,
9797
self.pre_validate_extension_length,
9898
self.pre_validate_vaccination_procedure_code,
99+
self.pre_validate_vaccination_procedure_display,
99100
self.pre_validate_vaccine_code,
101+
self.pre_validate_vaccine_display,
100102
]
101103

102104
for method in validation_methods:
@@ -590,7 +592,7 @@ def pre_validate_vaccination_procedure_code(self, values: dict) -> dict:
590592
(legacy CSV field name: VACCINATION_PROCEDURE_CODE) exists, then it is a non-empty string
591593
"""
592594
url = "https://fhir.hl7.org.uk/StructureDefinition/Extension-UKCore-" + "VaccinationProcedure"
593-
system = "http://snomed.info/sct"
595+
system = Urls.snomed
594596
field_type = "code"
595597
field_location = generate_field_location_for_extension(url, system, field_type)
596598
try:
@@ -600,14 +602,30 @@ def pre_validate_vaccination_procedure_code(self, values: dict) -> dict:
600602
except (KeyError, IndexError):
601603
pass
602604

605+
def pre_validate_vaccination_procedure_display(self, values: dict) -> dict:
606+
"""
607+
Pre-validate that, if extension[?(@.url=='https://fhir.hl7.org.uk/StructureDefinition/Extension-UKCore-
608+
VaccinationProcedure')].valueCodeableConcept.coding[?(@.system=='http://snomed.info/sct')].display
609+
(legacy CSV field name: VACCINATION_PROCEDURE_TERM) exists, then it is a non-empty string
610+
"""
611+
url = "https://fhir.hl7.org.uk/StructureDefinition/Extension-UKCore-" + "VaccinationProcedure"
612+
system = Urls.snomed
613+
field_type = "display"
614+
field_location = generate_field_location_for_extension(url, system, field_type)
615+
try:
616+
field_value = get_generic_extension_value(values, url, system, field_type)
617+
PreValidation.for_string(field_value, field_location)
618+
except (KeyError, IndexError):
619+
pass
620+
603621
def pre_validate_vaccination_situation_code(self, values: dict) -> dict:
604622
"""
605623
Pre-validate that, if extension[?(@.url=='https://fhir.hl7.org.uk/StructureDefinition/Extension-UKCore-
606624
VaccinationSituation')].valueCodeableConcept.coding[?(@.system=='http://snomed.info/sct')].code
607625
(legacy CSV field name: VACCINATION_SITUATION_CODE) exists, then it is a non-empty string
608626
"""
609627
url = "https://fhir.hl7.org.uk/StructureDefinition/Extension-UKCore-VaccinationSituation"
610-
system = "http://snomed.info/sct"
628+
system = Urls.snomed
611629
field_type = "code"
612630
field_location = generate_field_location_for_extension(url, system, field_type)
613631
try:
@@ -623,7 +641,7 @@ def pre_validate_vaccination_situation_display(self, values: dict) -> dict:
623641
(legacy CSV field name: VACCINATION_SITUATION_TERM) exists, then it is a non-empty string
624642
"""
625643
url = "https://fhir.hl7.org.uk/StructureDefinition/Extension-UKCore-VaccinationSituation"
626-
system = "http://snomed.info/sct"
644+
system = Urls.snomed
627645
field_type = "display"
628646
field_location = generate_field_location_for_extension(url, system, field_type)
629647
try:
@@ -702,7 +720,7 @@ def pre_validate_disease_type_coding_codes(self, values: dict) -> dict:
702720
Pre-validate that, if protocolApplied[0].targetDisease[{i}].coding[?(@.system=='http://snomed.info/sct')].code
703721
exists, then it is a non-empty string
704722
"""
705-
url = "http://snomed.info/sct"
723+
url = Urls.snomed
706724
try:
707725
for i in range(len(values["protocolApplied"][0]["targetDisease"])):
708726
field_location = f"protocolApplied[0].targetDisease[{i}].coding[?(@.system=='{url}')].code"
@@ -761,7 +779,7 @@ def pre_validate_site_coding_code(self, values: dict) -> dict:
761779
Pre-validate that, if site.coding[?(@.system=='http://snomed.info/sct')].code
762780
(legacy CSV field name: SITE_OF_VACCINATION_CODE) exists, then it is a non-empty string
763781
"""
764-
url = "http://snomed.info/sct"
782+
url = Urls.snomed
765783
field_location = f"site.coding[?(@.system=='{url}')].code"
766784
try:
767785
site_coding_code = [x for x in values["site"]["coding"] if x.get("system") == url][0]["code"]
@@ -774,7 +792,7 @@ def pre_validate_site_coding_display(self, values: dict) -> dict:
774792
Pre-validate that, if site.coding[?(@.system=='http://snomed.info/sct')].display
775793
(legacy CSV field name: SITE_OF_VACCINATION_TERM) exists, then it is a non-empty string
776794
"""
777-
url = "http://snomed.info/sct"
795+
url = Urls.snomed
778796
field_location = f"site.coding[?(@.system=='{url}')].display"
779797
try:
780798
field_value = [x for x in values["site"]["coding"] if x.get("system") == url][0]["display"]
@@ -795,7 +813,7 @@ def pre_validate_route_coding_code(self, values: dict) -> dict:
795813
Pre-validate that, if route.coding[?(@.system=='http://snomed.info/sct')].code
796814
(legacy CSV field name: ROUTE_OF_VACCINATION_CODE) exists, then it is a non-empty string
797815
"""
798-
url = "http://snomed.info/sct"
816+
url = Urls.snomed
799817
field_location = f"route.coding[?(@.system=='{url}')].code"
800818
try:
801819
field_value = [x for x in values["route"]["coding"] if x.get("system") == url][0]["code"]
@@ -808,7 +826,7 @@ def pre_validate_route_coding_display(self, values: dict) -> dict:
808826
Pre-validate that, if route.coding[?(@.system=='http://snomed.info/sct')].display
809827
(legacy CSV field name: ROUTE_OF_VACCINATION_TERM) exists, then it is a non-empty string
810828
"""
811-
url = "http://snomed.info/sct"
829+
url = Urls.snomed
812830
field_location = f"route.coding[?(@.system=='{url}')].display"
813831
try:
814832
field_value = [x for x in values["route"]["coding"] if x.get("system") == url][0]["display"]
@@ -951,11 +969,24 @@ def pre_validate_vaccine_code(self, values: dict) -> dict:
951969
NOTE: vaccineCode is a mandatory FHIR field. A value of None will be rejected by the
952970
FHIR model before pre-validators are run.
953971
"""
954-
url = "http://snomed.info/sct"
972+
url = Urls.snomed
955973
field_location = f"vaccineCode.coding[?(@.system=='{url}')].code"
956974
try:
957975
field_value = [x for x in values["vaccineCode"]["coding"] if x.get("system") == url][0]["code"]
958976
PreValidation.for_string(field_value, field_location)
959977
PreValidation.for_snomed_code(field_value, field_location)
960978
except (KeyError, IndexError):
961979
pass
980+
981+
def pre_validate_vaccine_display(self, values: dict) -> dict:
982+
"""
983+
Pre-validate that, if vaccineCode.coding[?(@.system=='http://snomed.info/sct')].display
984+
(legacy CSV field : VACCINE_PRODUCT_TERM) exists, then it is a non-empty string
985+
"""
986+
url = Urls.snomed
987+
field_location = f"vaccineCode.coding[?(@.system=='{url}')].display"
988+
try:
989+
field_value = [x for x in values["vaccineCode"]["coding"] if x.get("system") == url][0]["display"]
990+
PreValidation.for_string(field_value, field_location)
991+
except (KeyError, IndexError):
992+
pass

lambdas/shared/src/common/validator/constants/constants.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,28 @@ class Constants:
22
NHS_NUMBER_LENGTH = 10
33
PERSON_NAME_ELEMENT_MAX_LENGTH = 35
44
GENDERS = ["male", "female", "other", "unknown"]
5+
DATETIME_FORMAT = "%Y-%m-%dT%H:%M"
6+
DATETIME_FORMAT = [
7+
"%Y-%m-%d",
8+
"%Y-%m-%dT%H:%M:%S%z",
9+
"%Y-%m-%dT%H:%M:%S.%f%z",
10+
]
11+
ALLOWED_SUFFIXES = {
12+
"+00:00",
13+
"+01:00",
14+
"+0000",
15+
"+0100",
16+
}
17+
field_name = "FIELD_TO_REPLACE"
18+
19+
DATETIME_ERROR_MESSAGE = (
20+
f"{field_name} must be a valid datetime in one of the following formats:"
21+
"- 'YYYY-MM-DD' — Full date only"
22+
"- 'YYYY-MM-DDThh:mm:ss%z' — Full date and time with timezone (e.g. +00:00 or +01:00)"
23+
"- 'YYYY-MM-DDThh:mm:ss.f%z' — Full date and time with milliseconds and timezone"
24+
"- Date must not be in the future."
25+
)
26+
STRICT_DATETIME_ERROR_MESSAGE = (
27+
"Only '+00:00' and '+01:00' are accepted as valid timezone offsets.\n"
28+
f"Note that partial dates are not allowed for {field_name} in this service.\n"
29+
)

lambdas/shared/src/common/validator/error_report/record_error.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ def __init__(
2121
self.id = None
2222
self.error_level = error_level
2323

24+
def __repr__(self):
25+
return f"<ErrorReport code={self.code}, field={self.field}, message={self.message!r}, details={self.details!r}>"
26+
2427
# function to return the object as a dictionary
2528
def to_dict(self):
2629
ret = {"code": self.code, "message": self.message}

lambdas/shared/src/common/validator/expression_checker.py

Lines changed: 41 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from common.validator.lookup_expressions.key_data import KeyData
1010
from common.validator.lookup_expressions.lookup_data import LookUpData
1111
from common.validator.validation_utils import check_if_future_date
12+
from src.common.validator.constants.constants import Constants
1213

1314

1415
class ExpressionChecker:
@@ -51,7 +52,7 @@ def validate_expression(
5152
return "Schema expression not found! Check your expression type : " + expression_type
5253

5354
# ISO 8601 date/datetime validate (currently date-only)
54-
def validation_for_date(self, _expression_rule, field_name, field_value):
55+
def validation_for_date(self, _expression_rule, field_name, field_value) -> ErrorReport:
5556
"""
5657
Apply pre-validation to a date field to ensure that it is a string (JSON dates must be
5758
written as strings) containing a valid date in the format "YYYY-MM-DD"
@@ -201,63 +202,50 @@ def validation_for_list(self, expression_rule: str, field_name: str, field_value
201202
message = MESSAGES[ExceptionLevels.UNEXPECTED_EXCEPTION] % (e.__class__.__name__, e)
202203
return ErrorReport(ExceptionLevels.UNEXPECTED_EXCEPTION, message, None, field_name, "")
203204

204-
def validation_for_date_time(
205-
self, expression_rule: str, field_name: str, field_value: str, row: dict, strict_timezone: bool = True
206-
):
205+
def validation_for_date_time(self, expression_rule: str, field_name: str, field_value: str) -> ErrorReport:
207206
"""
208207
Apply pre-validation to a datetime field to ensure that it is a string (JSON dates must be written as strings)
209208
containing a valid datetime. Note that partial dates are valid for FHIR, but are not allowed for this API.
210-
Valid formats are any of the following:
211-
* 'YYYY-MM-DD' - Full date only
212-
* 'YYYY-MM-DDThh:mm:ss%z' - Full date, time without milliseconds, timezone
209+
Valid formats are any of the following: * 'YYYY-MM-DD' - Full date only * 'YYYY-MM-DDThh:mm:ss%z' - Full date, time without milliseconds, timezone
213210
* 'YYYY-MM-DDThh:mm:ss.f%z' - Full date, time with milliseconds (any level of precision), timezone
214211
"""
212+
rules = expression_rule_per_field(expression_rule) if expression_rule else {}
213+
strict_timezone = rules.get("strict_time_zone", False)
214+
try:
215+
if not isinstance(field_value, str):
216+
raise TypeError(f"{field_name} must be a string")
215217

216-
if not isinstance(field_value, str):
217-
raise TypeError(f"{field_name} must be a string")
218-
219-
error_message = (
220-
f"{field_name} must be a valid datetime in one of the following formats:"
221-
"- 'YYYY-MM-DD' — Full date only"
222-
"- 'YYYY-MM-DDThh:mm:ss%z' — Full date and time with timezone (e.g. +00:00 or +01:00)"
223-
"- 'YYYY-MM-DDThh:mm:ss.f%z' — Full date and time with milliseconds and timezone"
224-
"- Date must not be in the future."
225-
)
226-
if strict_timezone:
227-
error_message += (
228-
"Only '+00:00' and '+01:00' are accepted as valid timezone offsets.\n"
229-
f"Note that partial dates are not allowed for {field_name} in this service.\n"
230-
)
231-
232-
allowed_suffixes = {
233-
"+00:00",
234-
"+01:00",
235-
"+0000",
236-
"+0100",
237-
}
238-
239-
# List of accepted strict formats
240-
formats = [
241-
"%Y-%m-%d",
242-
"%Y-%m-%dT%H:%M:%S%z",
243-
"%Y-%m-%dT%H:%M:%S.%f%z",
244-
]
245-
246-
for fmt in formats:
247-
try:
248-
fhir_date = datetime.strptime(field_value, fmt)
249-
# Enforce future-date rule using central checker after successful parse
250-
if check_if_future_date(fhir_date):
251-
raise ValueError(f"{field_name} must not be in the future")
252-
# After successful parse, enforce timezone and future-date rules
253-
if strict_timezone and fhir_date.tzinfo is not None:
254-
if not any(field_value.endswith(suffix) for suffix in allowed_suffixes):
255-
raise ValueError(error_message)
256-
return fhir_date.isoformat()
257-
except ValueError:
258-
continue
259-
260-
raise ValueError(error_message)
218+
error_message = Constants.DATETIME_ERROR_MESSAGE.replace("FIELD_TO_REPLACE", field_name)
219+
if strict_timezone:
220+
error_message += Constants.STRICT_DATETIME_ERROR_MESSAGE.replace("FIELD_TO_REPLACE", field_name)
221+
222+
# List of accepted strict formats and suffixes
223+
allowed_suffixes = Constants.ALLOWED_SUFFIXES
224+
formats = Constants.DATETIME_FORMAT
225+
226+
for fmt in formats:
227+
try:
228+
fhir_date = datetime.strptime(field_value, fmt)
229+
# Enforce future-date rule using central checker after successful parse
230+
if check_if_future_date(fhir_date):
231+
raise ValueError(f"{field_name} must not be in the future")
232+
# After successful parse, enforce timezone and future-date rules
233+
if strict_timezone and fhir_date.tzinfo is not None:
234+
if not any(field_value.endswith(suffix) for suffix in allowed_suffixes):
235+
raise ValueError(error_message)
236+
return None
237+
except ValueError:
238+
continue
239+
raise ValueError(error_message)
240+
except (TypeError, ValueError) as e:
241+
code = ExceptionLevels.RECORD_CHECK_FAILED
242+
message = MESSAGES[ExceptionLevels.RECORD_CHECK_FAILED]
243+
details = str(e)
244+
return ErrorReport(code, message, None, field_name, details)
245+
except Exception as e:
246+
if self.report_unexpected_exception:
247+
message = MESSAGES[ExceptionLevels.UNEXPECTED_EXCEPTION] % (e.__class__.__name__, e)
248+
return ErrorReport(ExceptionLevels.UNEXPECTED_EXCEPTION, message, None, field_name)
261249

262250
# Not Equal Validate
263251
def _validate_not_equal(self, expression_rule: str, field_name: str, field_value: str, row: dict) -> ErrorReport:
@@ -365,7 +353,7 @@ def validation_for_string_values(self, expression_rule: str, field_name: str, fi
365353
defined_length = rules.get("defined_length", None)
366354
max_length = rules.get("max_length", None)
367355
predefined_values = rules.get("predefined_values", None)
368-
spaces_allowed = rules.get("spaces_allowed", None)
356+
spaces_allowed = rules.get("spaces_allowed", True)
369357

370358
try:
371359
if not isinstance(field_value, str):

lambdas/shared/src/common/validator/expression_rule.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,7 @@ def expression_rule_per_field(expression_type: str) -> dict:
1515
return {"max_length": Constants.PERSON_NAME_ELEMENT_MAX_LENGTH}
1616
case "GENDER":
1717
return {"predefined_values": Constants.GENDERS}
18+
case "DATETIME":
19+
return {"strict_time_zone": True}
1820
case _:
1921
raise ValueError(f"Expression rule not found for type: {expression_type}")

lambdas/shared/tests/test_common/validator/test_expression_checker.py

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -75,24 +75,67 @@ def test_list_valid_and_invalid(self):
7575
self.assertIsInstance(checker.validate_expression("LIST", "PERSON_NAME", "PERSON_FORENAME", []), ErrorReport)
7676
self.assertIsInstance(checker.validate_expression("LIST", "", "PERSON_FORENAME", "Alice"), ErrorReport)
7777

78-
# # DATE
79-
# def test_date_valid_and_invalid(self):
80-
# checker = self.make_checker()
81-
# self.assertIsNone(checker.validate_expression("DATE", "", "contained|#:Patient|name|#:official|given|0", "2025-01-01"))
82-
# self.assertIsInstance(checker.validate_expression("DATE", "", "date_field", "2025-13-01"), ErrorReport)
83-
78+
# DATE
79+
def test_date_valid_and_invalid(self):
80+
checker = self.make_checker()
81+
self.assertIsNone(checker.validate_expression("DATE", "", "contained|#:Patient|birthDate", "2025-01-01"))
82+
self.assertIsNone(checker.validate_expression("DATE", "", "PERSON_DOB", "2025-01-01"))
83+
self.assertIsInstance(
84+
checker.validate_expression("DATE", "", "contained|#:Patient|birthDate", "2025-13-01"), ErrorReport
85+
)
86+
self.assertIsInstance(checker.validate_expression("DATE", "", "PERSON_DOB", "2025-02-30"), ErrorReport)
8487

85-
# # DATETIME
86-
# def test_datetime_valid_and_invalid(self):
87-
# checker = self.make_checker()
88-
# # Full date only allowed
89-
# self.assertIsNone(checker.validate_expression("DATETIME", "", "dt_field", "2025-01-01", 1))
90-
# # Bad format should raise
91-
# with self.assertRaises(Exception):
92-
# checker.validate_expression("DATETIME", "", "dt_field", "2025-01-01T10:00:00Z", 1)
88+
# DATETIME
89+
def test_datetime_valid_and_invalid(self):
90+
checker = self.make_checker()
91+
# Full date only allowed
92+
self.assertIsNone(
93+
checker.validate_expression("DATETIME", "DATETIME", "occurrenceDateTime", "2025-01-01T05:00:00+00:00")
94+
)
95+
self.assertIsNone(
96+
checker.validate_expression("DATETIME", "DATETIME", "DATE_AND_TIME", "2025-01-01T05:00:00+00:00")
97+
)
98+
# Bad format should raise
99+
self.assertIsInstance(
100+
checker.validate_expression("DATETIME", "", "occurrenceDateTime", "2026-01-01T10:00:00Z"), ErrorReport
101+
)
102+
self.assertIsInstance(
103+
checker.validate_expression("DATETIME", "", "DATE_AND_TIME", "2026-01-01T10:00:00Z"), ErrorReport
104+
)
93105

94106

95107
# # BOOLEAN
108+
109+
# # STRING with GENDER rule on real field
110+
# def test_gender_string_rule_valid_and_invalid(self):
111+
# checker = self.make_checker()
112+
# field_path = "contained|#:Patient|gender"
113+
# # Valid genders per schema constants (male, female, other, unknown)
114+
# self.assertIsNone(checker.validate_expression("STRING", "GENDER", field_path, "male"))
115+
# self.assertIsNone(checker.validate_expression("STRING", "GENDER", field_path, "female"))
116+
# # Invalid values should error
117+
# self.assertIsInstance(
118+
# checker.validate_expression("STRING", "GENDER", field_path, "M"),
119+
# ErrorReport,
120+
# )
121+
122+
# # STRING with no rule for PERSON_POSTCODE on real field
123+
# def test_postcode_string_rule_valid_and_invalid(self):
124+
# checker = self.make_checker()
125+
# field_path = "contained|#:Patient|address|#:postalCode|postalCode"
126+
# # With empty rule, generic string constraints apply: non-empty and no spaces
127+
# self.assertIsNone(checker.validate_expression("STRING", "", field_path, "SW1A1AA"))
128+
# # Real-world postcode with a space should fail as spaces are not allowed without a rule override
129+
# field_path = "POST_CODE"
130+
# self.assertIsInstance(
131+
# checker.validate_expression("STRING", "", field_path, "AB12 3CD"),
132+
# ErrorReport,
133+
# )
134+
# # Empty should also fail
135+
# self.assertIsInstance(
136+
# checker.validate_expression("STRING", "", field_path, ""),
137+
# ErrorReport,
138+
# )
96139
# def test_boolean_valid_and_invalid(self):
97140
# checker = self.make_checker()
98141
# self.assertIsNone(checker.validate_expression("BOOLEAN", "", "bool_field", True, 1))

0 commit comments

Comments
 (0)