diff --git a/src/eligibility_signposting_api/config/contants.py b/src/eligibility_signposting_api/config/contants.py index 3ac35987..6d34f650 100644 --- a/src/eligibility_signposting_api/config/contants.py +++ b/src/eligibility_signposting_api/config/contants.py @@ -1,3 +1,6 @@ +from typing import Literal + MAGIC_COHORT_LABEL = "elid_all_people" RULE_STOP_DEFAULT = False NHS_NUMBER_HEADER = "nhs-login-nhs-number" +ALLOWED_CONDITIONS = Literal["COVID", "FLU", "MMR", "RSV"] diff --git a/src/eligibility_signposting_api/model/campaign_config.py b/src/eligibility_signposting_api/model/campaign_config.py index 989f2e53..1fe137d8 100644 --- a/src/eligibility_signposting_api/model/campaign_config.py +++ b/src/eligibility_signposting_api/model/campaign_config.py @@ -11,7 +11,7 @@ from pydantic import BaseModel, Field, HttpUrl, RootModel, field_serializer, field_validator, model_validator -from eligibility_signposting_api.config.contants import MAGIC_COHORT_LABEL, RULE_STOP_DEFAULT +from eligibility_signposting_api.config.contants import ALLOWED_CONDITIONS, MAGIC_COHORT_LABEL, RULE_STOP_DEFAULT if typing.TYPE_CHECKING: # pragma: no cover from pydantic import SerializationInfo @@ -184,7 +184,7 @@ class CampaignConfig(BaseModel): version: CampaignVersion = Field(..., alias="Version") name: CampaignName = Field(..., alias="Name") type: Literal["V", "S"] = Field(..., alias="Type") - target: Literal["COVID", "FLU", "MMR", "RSV"] = Field(..., alias="Target") + target: ALLOWED_CONDITIONS = Field(..., alias="Target") manager: list[str] | None = Field(None, alias="Manager") approver: list[str] | None = Field(None, alias="Approver") reviewer: list[str] | None = Field(None, alias="Reviewer") diff --git a/src/eligibility_signposting_api/services/calculators/eligibility_calculator.py b/src/eligibility_signposting_api/services/calculators/eligibility_calculator.py index 66cba466..ab312295 100644 --- a/src/eligibility_signposting_api/services/calculators/eligibility_calculator.py +++ b/src/eligibility_signposting_api/services/calculators/eligibility_calculator.py @@ -23,6 +23,7 @@ from eligibility_signposting_api.services.processors.action_rule_handler import ActionRuleHandler from eligibility_signposting_api.services.processors.campaign_evaluator import CampaignEvaluator from eligibility_signposting_api.services.processors.rule_processor import RuleProcessor +from eligibility_signposting_api.services.processors.token_processor import TokenProcessor if TYPE_CHECKING: from collections.abc import Collection @@ -34,6 +35,7 @@ ) from eligibility_signposting_api.model.person import Person + logger = logging.getLogger(__name__) @@ -99,12 +101,16 @@ def get_eligibility_status(self, include_actions: str, conditions: list[str], ca include_actions_flag=include_actions_flag, ) + best_iteration_result = TokenProcessor.find_and_replace_tokens(self.person, best_iteration_result) + matched_action_detail = TokenProcessor.find_and_replace_tokens(self.person, matched_action_detail) + condition_results[condition_name] = best_iteration_result.iteration_result condition_results[condition_name].actions = matched_action_detail.actions condition: Condition = self.build_condition( iteration_result=condition_results[condition_name], condition_name=condition_name ) + final_result.append(condition) AuditContext.append_audit_condition( diff --git a/src/eligibility_signposting_api/services/processors/token_parser.py b/src/eligibility_signposting_api/services/processors/token_parser.py new file mode 100644 index 00000000..3f873ba3 --- /dev/null +++ b/src/eligibility_signposting_api/services/processors/token_parser.py @@ -0,0 +1,72 @@ +import re +from dataclasses import dataclass + + +@dataclass +class ParsedToken: + """ + A class to represent a parsed token. + ... + Attributes + ---------- + attribute_level : str + Example: "PERSON" or "TARGET" + attribute_name : str + Example: "POSTCODE" or "RSV" + attribute_value : int + Example: "LAST_SUCCESSFUL_DATE" if attribute_level is TARGET + format : str + Example: "%d %B %Y" if DATE formatting is used + """ + + attribute_level: str + attribute_name: str + attribute_value: str | None + format: str | None + + +class TokenParser: + MIN_TOKEN_PARTS = 2 + + @staticmethod + def parse(token: str) -> ParsedToken: + """Parses a token into its parts. + Steps: + Strip the surrounding [[ ]] + Check for empty body after stripping, e.g., '[[]]' + Check for empty parts created by leading/trailing dots or tokens with no dot + Check if the name contains a date format + Return a ParsedToken object + """ + + token_body = token[2:-2] + if not token_body: + message = "Invalid token." + raise ValueError(message) + + token_parts = token_body.split(".") + + if len(token_parts) < TokenParser.MIN_TOKEN_PARTS or not all(token_parts): + message = "Invalid token." + raise ValueError(message) + + token_level = token_parts[0].upper() + token_name = token_parts[-1] + + format_match = re.search(r":DATE\(([^()]*)\)", token_name, re.IGNORECASE) + if not format_match and len(token_name.split(":")) > 1: + message = "Invalid token format." + raise ValueError(message) + + format_str = format_match.group(1) if format_match else None + + last_part = re.sub(r":DATE\(.*?\)", "", token_name, flags=re.IGNORECASE) + + if len(token_parts) == TokenParser.MIN_TOKEN_PARTS: + name = last_part.upper() + value = None + else: + name = token_parts[1].upper() + value = last_part.upper() + + return ParsedToken(attribute_level=token_level, attribute_name=name, attribute_value=value, format=format_str) diff --git a/src/eligibility_signposting_api/services/processors/token_processor.py b/src/eligibility_signposting_api/services/processors/token_processor.py new file mode 100644 index 00000000..de509daf --- /dev/null +++ b/src/eligibility_signposting_api/services/processors/token_processor.py @@ -0,0 +1,134 @@ +import re +from dataclasses import Field, fields, is_dataclass +from datetime import UTC, datetime +from typing import Any, Never, TypeVar + +from wireup import service + +from eligibility_signposting_api.config.contants import ALLOWED_CONDITIONS +from eligibility_signposting_api.model.person import Person +from eligibility_signposting_api.services.processors.token_parser import ParsedToken, TokenParser + +T = TypeVar("T") + + +TARGET_ATTRIBUTE_LEVEL = "TARGET" +PERSON_ATTRIBUTE_LEVEL = "PERSON" +ALLOWED_TARGET_ATTRIBUTES = { + "ATTRIBUTE_TYPE", + "VALID_DOSES_COUNT", + "INVALID_DOSES_COUNT", + "LAST_SUCCESSFUL_DATE", + "LAST_VALID_DOSE_DATE", + "BOOKED_APPOINTMENT_DATE", + "BOOKED_APPOINTMENT_PROVIDER", + "LAST_INVITE_DATE", + "LAST_INVITE_STATUS", +} + + +@service +class TokenProcessor: + @staticmethod + def find_and_replace_tokens(person: Person, data_class: T) -> T: + if not is_dataclass(data_class): + return data_class + for class_field in fields(data_class): + value = getattr(data_class, class_field.name) + if isinstance(value, str): + setattr(data_class, class_field.name, TokenProcessor.replace_token(value, person)) + elif isinstance(value, list): + TokenProcessor.process_list(class_field, data_class, person, value) + elif isinstance(value, dict): + TokenProcessor.process_dict(class_field, data_class, person, value) + elif is_dataclass(value): + setattr(data_class, class_field.name, TokenProcessor.find_and_replace_tokens(person, value)) + return data_class + + @staticmethod + def process_dict(class_field: Field, data_class: object, person: Person, value: dict[Any, Any]) -> None: + for key, dict_value in value.items(): + if isinstance(dict_value, str): + value[key] = TokenProcessor.replace_token(dict_value, person) + elif is_dataclass(dict_value): + value[key] = TokenProcessor.find_and_replace_tokens(person, dict_value) + setattr(data_class, class_field.name, value) + + @staticmethod + def process_list(class_field: Field, data_class: object, person: Person, value: list[Any]) -> None: + for i, item in enumerate(value): + if is_dataclass(item): + value[i] = TokenProcessor.find_and_replace_tokens(person, item) + elif isinstance(item, str): + value[i] = TokenProcessor.replace_token(item, person) + setattr(data_class, class_field.name, value) + + @staticmethod + def replace_token(text: str, person: Person) -> str: + if not isinstance(text, str): + return text + + pattern = r"\[\[.*?\]\]" + all_tokens = re.findall(pattern, text, re.IGNORECASE) + present_attributes = [attribute.get("ATTRIBUTE_TYPE") for attribute in person.data] + + for token in all_tokens: + parsed_token = TokenParser.parse(token) + found_attribute, key_to_replace, replace_with = None, None, None + + attribute_level_map = { + TARGET_ATTRIBUTE_LEVEL: parsed_token.attribute_value, + PERSON_ATTRIBUTE_LEVEL: parsed_token.attribute_name, + } + + key_to_find = attribute_level_map.get(parsed_token.attribute_level) + + if ( + parsed_token.attribute_level == TARGET_ATTRIBUTE_LEVEL + and parsed_token.attribute_name in ALLOWED_CONDITIONS.__args__ + and parsed_token.attribute_value in ALLOWED_TARGET_ATTRIBUTES + and parsed_token.attribute_name not in present_attributes + ): + replace_with = "" + + if replace_with != "": + for attribute in person.data: + is_person_attribute = attribute.get("ATTRIBUTE_TYPE") == PERSON_ATTRIBUTE_LEVEL + is_allowed_target = parsed_token.attribute_name.upper() in ALLOWED_CONDITIONS.__args__ + + if (is_allowed_target or is_person_attribute) and key_to_find in attribute: + found_attribute = attribute + key_to_replace = key_to_find + break + + if not found_attribute or key_to_replace is None: + TokenProcessor.handle_token_not_found(parsed_token, token) + + replace_with = TokenProcessor.apply_formatting(found_attribute, key_to_replace, parsed_token.format) + text = text.replace(token, str(replace_with)) + return text + + @staticmethod + def handle_token_not_found(parsed_token: ParsedToken, token: str) -> Never: + if parsed_token.attribute_level == TARGET_ATTRIBUTE_LEVEL: + message = f"Invalid attribute name '{parsed_token.attribute_value}' in token '{token}'." + raise ValueError(message) + if parsed_token.attribute_level == PERSON_ATTRIBUTE_LEVEL: + message = f"Invalid attribute name '{parsed_token.attribute_name}' in token '{token}'." + raise ValueError(message) + message = f"Invalid attribute level '{parsed_token.attribute_level}' in token '{token}'." + raise ValueError(message) + + @staticmethod + def apply_formatting(attribute: dict[str, T], attribute_value: str, date_format: str | None) -> str: + try: + attribute_data = attribute.get(attribute_value) + if (date_format or date_format == "") and attribute_data: + replace_with_date_object = datetime.strptime(str(attribute_data), "%Y%m%d").replace(tzinfo=UTC) + replace_with = replace_with_date_object.strftime(str(date_format)) + else: + replace_with = attribute_data if attribute_data else "" + return str(replace_with) + except AttributeError as error: + message = "Invalid token format" + raise AttributeError(message) from error diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index b7ee1826..24efd3fa 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -19,6 +19,7 @@ from eligibility_signposting_api.model import eligibility_status from eligibility_signposting_api.model.campaign_config import ( + AvailableAction, CampaignConfig, EndDate, RuleType, @@ -395,6 +396,38 @@ def persisted_person_all_cohorts(person_table: Any, faker: Faker) -> Generator[e person_table.delete_item(Key={"NHS_NUMBER": row["NHS_NUMBER"], "ATTRIBUTE_TYPE": row["ATTRIBUTE_TYPE"]}) +@pytest.fixture +def person_with_all_data(person_table: Any, faker: Faker) -> Generator[eligibility_status.NHSNumber]: + nhs_number = eligibility_status.NHSNumber(faker.nhs_number()) + date_of_birth = eligibility_status.DateOfBirth(datetime.date(1990, 2, 28)) + + for row in ( + rows := person_rows_builder( + nhs_number=nhs_number, + date_of_birth=date_of_birth, + gender="0", + postcode="SW18", + cohorts=["cohort_label1", "cohort_label2"], + vaccines=[("RSV", None)], + icb="QE1", + gp_practice="C81002", + pcn="U78207", + comissioning_region="Y60", + thirteen_q=True, + care_home=True, + de=False, + msoa="E02001562", + lsoa="E01030316", + ).data + ): + person_table.put_item(Item=row) + + yield nhs_number + + for row in rows: + person_table.delete_item(Key={"NHS_NUMBER": row["NHS_NUMBER"], "ATTRIBUTE_TYPE": row["ATTRIBUTE_TYPE"]}) + + @pytest.fixture def persisted_person_no_cohorts(person_table: Any, faker: Faker) -> Generator[eligibility_status.NHSNumber]: nhs_number = eligibility_status.NHSNumber(faker.nhs_number()) @@ -585,6 +618,108 @@ def campaign_config_with_and_rule(s3_client: BaseClient, rules_bucket: BucketNam s3_client.delete_object(Bucket=rules_bucket, Key=f"{campaign.name}.json") +@pytest.fixture(scope="class") +def campaign_config_with_tokens(s3_client: BaseClient, rules_bucket: BucketName) -> Generator[CampaignConfig]: + campaign: CampaignConfig = rule.CampaignConfigFactory.build( + target="RSV", + iterations=[ + rule.IterationFactory.build( + actions_mapper=rule.ActionsMapperFactory.build( + root={ + "TOKEN_TEST": AvailableAction( + ActionType="ButtonAuthLink", + ExternalRoutingCode="BookNBS", + ActionDescription="## Token - PERSON.POSTCODE: [[PERSON.POSTCODE]].", + UrlLabel=( + "Token - PERSON.DATE_OF_BIRTH:DATE(%d %B %Y): [[PERSON.DATE_OF_BIRTH:DATE(%d %B %Y)]]." + ), + ), + "TOKEN_TEST2": AvailableAction( + ActionType="ButtonAuthLink", + ExternalRoutingCode="BookNBS", + ActionDescription="## Token - PERSON.GENDER: [[PERSON.GENDER]].", + UrlLabel="Token - PERSON.DATE_OF_BIRTH: [[PERSON.DATE_OF_BIRTH]].", + ), + } + ), + iteration_rules=[ + rule.PostcodeSuppressionRuleFactory.build(), + rule.PersonAgeSuppressionRuleFactory.build(), + rule.ICBNonEligibleActionRuleFactory.build(comms_routing="TOKEN_TEST|TOKEN_TEST2"), + rule.ICBNonActionableActionRuleFactory.build(comms_routing="TOKEN_TEST"), + ], + iteration_cohorts=[ + rule.IterationCohortFactory.build( + cohort_label="cohort1", + cohort_group="cohort_group1", + positive_description="Positive Description", + negative_description=( + "Token - TARGET.RSV.LAST_SUCCESSFUL_DATE: [[TARGET.RSV.LAST_SUCCESSFUL_DATE]]" + ), + ), + rule.IterationCohortFactory.build( + cohort_label="cohort2", + cohort_group="cohort_group2", + positive_description="Positive Description", + negative_description=( + "Token - TARGET.RSV.LAST_SUCCESSFUL_DATE:DATE(%d %B %Y): " + "[[TARGET.RSV.LAST_SUCCESSFUL_DATE:DATE(%d %B %Y)]]" + ), + ), + ], + ) + ], + ) + campaign_data = {"CampaignConfig": campaign.model_dump(by_alias=True)} + s3_client.put_object( + Bucket=rules_bucket, Key=f"{campaign.name}.json", Body=json.dumps(campaign_data), ContentType="application/json" + ) + yield campaign + s3_client.delete_object(Bucket=rules_bucket, Key=f"{campaign.name}.json") + + +@pytest.fixture(scope="class") +def campaign_config_with_invalid_tokens(s3_client: BaseClient, rules_bucket: BucketName) -> Generator[CampaignConfig]: + campaign: CampaignConfig = rule.CampaignConfigFactory.build( + target="RSV", + iterations=[ + rule.IterationFactory.build( + actions_mapper=rule.ActionsMapperFactory.build( + root={ + "TOKEN_TEST": AvailableAction( + ActionType="ButtonAuthLink", + ExternalRoutingCode="BookNBS", + ActionDescription="## Token - PERSON.ICECREAM: [[PERSON.ICECREAM]].", + UrlLabel=( + "Token - PERSON.DATE_OF_BIRTH:DATE(%d %B %Y): [[PERSON.DATE_OF_BIRTH:DATE(%d %B %Y)]]." + ), + ) + } + ), + iteration_rules=[ + rule.ICBNonEligibleActionRuleFactory.build(comms_routing="TOKEN_TEST"), + ], + iteration_cohorts=[ + rule.IterationCohortFactory.build( + cohort_label="cohort1", + cohort_group="cohort_group1", + positive_description="Positive Description", + negative_description=( + "Token - TARGET.RSV.LAST_SUCCESSFUL_DATE: [[TARGET.RSV.LAST_SUCCESSFUL_DATE]]" + ), + ) + ], + ) + ], + ) + campaign_data = {"CampaignConfig": campaign.model_dump(by_alias=True)} + s3_client.put_object( + Bucket=rules_bucket, Key=f"{campaign.name}.json", Body=json.dumps(campaign_data), ContentType="application/json" + ) + yield campaign + s3_client.delete_object(Bucket=rules_bucket, Key=f"{campaign.name}.json") + + @pytest.fixture(scope="class") def multiple_campaign_configs(s3_client: BaseClient, rules_bucket: BucketName) -> Generator[list[CampaignConfig]]: """Create and upload multiple campaign configs to S3, then clean up after tests.""" @@ -597,8 +732,10 @@ def multiple_campaign_configs(s3_client: BaseClient, rules_bucket: BucketName) - rule.PostcodeSuppressionRuleFactory.build(type=RuleType.filter, priority=8, cohort_label="cohort_label4"), ], targets[1]: [ - rule.PersonAgeSuppressionRuleFactory.build(description="TOO YOUNG"), - rule.PostcodeSuppressionRuleFactory.build(priority=12, cohort_label="cohort_label2"), + rule.PersonAgeSuppressionRuleFactory.build(description="TOO YOUNG, your icb is: [[PERSON.ICB]]"), + rule.PostcodeSuppressionRuleFactory.build( + priority=12, cohort_label="cohort_label2", description="Your postcode is: [[PERSON.POSTCODE]]" + ), ], targets[2]: [rule.ICBRedirectRuleFactory.build()], } diff --git a/tests/integration/lambda/test_app_running_as_lambda.py b/tests/integration/lambda/test_app_running_as_lambda.py index b53ce381..166afd26 100644 --- a/tests/integration/lambda/test_app_running_as_lambda.py +++ b/tests/integration/lambda/test_app_running_as_lambda.py @@ -495,8 +495,16 @@ def test_given_person_has_unique_status_for_different_conditions_with_audit( # ], "filterRules": None, "suitabilityRules": [ - {"rulePriority": "10", "ruleName": "Exclude too young less than 75", "ruleMessage": "TOO YOUNG"}, - {"rulePriority": "12", "ruleName": "Excluded postcode In SW19", "ruleMessage": "In SW19"}, + { + "rulePriority": "10", + "ruleName": "Exclude too young less than 75", + "ruleMessage": "TOO YOUNG, your icb is: QE1", + }, + { + "rulePriority": "12", + "ruleName": "Excluded postcode In SW19", + "ruleMessage": "Your postcode is: SW19", + }, ], "actionRule": None, "actions": [], @@ -577,3 +585,109 @@ def test_no_active_iteration_returns_empty_processed_suggestions( has_entries("condition", "FLU"), ), ) + + +def test_token_formatting_in_eligibility_response_and_audit( # noqa: PLR0913 + lambda_client: BaseClient, # noqa:ARG001 + person_with_all_data: NHSNumber, + campaign_config_with_tokens: CampaignConfig, # noqa:ARG001 + s3_client: BaseClient, + audit_bucket: BucketName, + api_gateway_endpoint: URL, + flask_function: str, # noqa:ARG001 + logs_client: BaseClient, # noqa:ARG001 +): + invoke_url = f"{api_gateway_endpoint}/patient-check/{person_with_all_data}" + response = httpx.get( + invoke_url, + headers={"nhs-login-nhs-number": str(person_with_all_data)}, + timeout=10, + ) + + assert_that( + response, + is_response().with_status_code(HTTPStatus.OK).and_body(is_json_that(has_key("processedSuggestions"))), + ) + + processed_suggestions = response.json()["processedSuggestions"][0] + response_actions = processed_suggestions["actions"] + response_eligibility_cohorts = processed_suggestions["eligibilityCohorts"] + + assert response_actions[0]["description"] == "## Token - PERSON.POSTCODE: SW18." + assert response_actions[0]["urlLabel"] == "Token - PERSON.DATE_OF_BIRTH:DATE(%d %B %Y): 28 February 1990." + assert response_actions[1]["description"] == "## Token - PERSON.GENDER: 0." + assert response_actions[1]["urlLabel"] == "Token - PERSON.DATE_OF_BIRTH: 19900228." + assert response_eligibility_cohorts[0]["cohortText"] == "Token - TARGET.RSV.LAST_SUCCESSFUL_DATE: " + assert response_eligibility_cohorts[1]["cohortText"] == "Token - TARGET.RSV.LAST_SUCCESSFUL_DATE:DATE(%d %B %Y): " + + objects = s3_client.list_objects_v2(Bucket=audit_bucket).get("Contents", []) + object_keys = [obj["Key"] for obj in objects] + latest_key = sorted(object_keys)[-1] + audit_data = json.loads(s3_client.get_object(Bucket=audit_bucket, Key=latest_key)["Body"].read()) + + audit_condition = audit_data["response"]["condition"][0] + audit_actions = audit_condition["actions"] + audit_eligibility_cohorts = audit_condition["eligibilityCohortGroups"] + + assert audit_actions[0]["actionDescription"] == "## Token - PERSON.POSTCODE: SW18." + assert audit_actions[0]["actionUrlLabel"] == "Token - PERSON.DATE_OF_BIRTH:DATE(%d %B %Y): 28 February 1990." + assert audit_actions[1]["actionDescription"] == "## Token - PERSON.GENDER: 0." + assert audit_actions[1]["actionUrlLabel"] == "Token - PERSON.DATE_OF_BIRTH: 19900228." + assert audit_eligibility_cohorts[0]["cohortText"] == "Token - TARGET.RSV.LAST_SUCCESSFUL_DATE: " + assert audit_eligibility_cohorts[1]["cohortText"] == "Token - TARGET.RSV.LAST_SUCCESSFUL_DATE:DATE(%d %B %Y): " + + +def test_incorrect_token_causes_internal_server_error( # noqa: PLR0913 + lambda_client: BaseClient, # noqa:ARG001 + person_with_all_data: NHSNumber, + campaign_config_with_invalid_tokens: CampaignConfig, # noqa:ARG001 + s3_client: BaseClient, + audit_bucket: BucketName, + api_gateway_endpoint: URL, + flask_function: str, + logs_client: BaseClient, +): + invoke_url = f"{api_gateway_endpoint}/patient-check/{person_with_all_data}" + response = httpx.get( + invoke_url, + headers={"nhs-login-nhs-number": str(person_with_all_data)}, + timeout=10, + ) + + assert_that( + response, + is_response() + .with_status_code(HTTPStatus.INTERNAL_SERVER_ERROR) + .with_headers(has_entries({"Content-Type": "application/fhir+json"})) + .and_body( + is_json_that( + has_entries( + resourceType="OperationOutcome", + issue=contains_exactly( + has_entries( + severity="error", + code="processing", + diagnostics="An unexpected error occurred.", + details={ + "coding": [ + { + "system": "https://fhir.nhs.uk/STU3/ValueSet/Spine-ErrorOrWarningCode-1", + "code": "INTERNAL_SERVER_ERROR", + "display": "An unexpected internal server error occurred.", + } + ] + }, + ) + ), + ) + ), + ), + ) + + objects = s3_client.list_objects_v2(Bucket=audit_bucket).get("Contents", []) + assert len(objects) == 0 # Check there are no audit logs + + assert_that( + get_log_messages(flask_function, logs_client), + has_item(contains_string("Invalid attribute name 'ICECREAM' in token '[[PERSON.ICECREAM]]'.")), + ) diff --git a/tests/unit/services/calculators/test_eligibility_calculator.py b/tests/unit/services/calculators/test_eligibility_calculator.py index 37a13e8f..74c6fb1d 100644 --- a/tests/unit/services/calculators/test_eligibility_calculator.py +++ b/tests/unit/services/calculators/test_eligibility_calculator.py @@ -4,13 +4,15 @@ import pytest from faker import Faker -from flask import Flask +from flask import Flask, g from freezegun import freeze_time from hamcrest import assert_that, contains_exactly, contains_inanyorder, has_item, has_items, is_, is_in +from pydantic import HttpUrl from eligibility_signposting_api.model import campaign_config as rules_model from eligibility_signposting_api.model import eligibility_status from eligibility_signposting_api.model.campaign_config import ( + AvailableAction, CohortLabel, Description, RuleAttributeLevel, @@ -676,6 +678,148 @@ def test_no_active_campaign(faker: Faker): assert_that(actual, is_eligibility_status().with_conditions([])) +def test_eligibility_status_replaces_tokens_with_attribute_data(faker: Faker): + # Given + nhs_number = NHSNumber(faker.nhs_number()) + date_of_birth = DateOfBirth(datetime.date(2025, 5, 10)) + + person_rows = person_rows_builder( + nhs_number, + date_of_birth=date_of_birth, + cohorts=["cohort_1", "cohort_2", "cohort_3"], + vaccines=[("RSV", datetime.date(2024, 1, 3))], + icb="QE1", + gp_practice=None, + ) + + person_attribute_token = "DOB: [[PERSON.DATE_OF_BIRTH]]" # noqa: S105 + target_attribute_token = "LAST_SUCCESSFUL_DATE: [[TARGET.RSV.LAST_SUCCESSFUL_DATE:DATE(%d %B %Y)]]" # noqa: S105 + available_action = AvailableAction( + ActionType="ButtonAuthLink", + ExternalRoutingCode="BookNBS", + ActionDescription="## Get vaccinated at your GP surgery in [[PERSON.ICB]].", + UrlLink=HttpUrl("https://www.nhs.uk/book-rsv"), + UrlLabel="Your GP practice code is [[PERSON.GP_PRACTICE]].", + ) + + campaign_configs = [ + rule_builder.CampaignConfigFactory.build( + target="RSV", + iterations=[ + rule_builder.IterationFactory.build( + iteration_cohorts=[ + rule_builder.IterationCohortFactory.build( + cohort_label="cohort_1", positive_description=person_attribute_token + ), + rule_builder.IterationCohortFactory.build( + cohort_label="cohort_2", positive_description=target_attribute_token + ), + ], + iteration_rules=[ + rule_builder.PersonAgeSuppressionRuleFactory.build(), + rule_builder.ICBNonActionableActionRuleFactory.build(comms_routing="TOKEN_TEST"), + ], + actions_mapper=rule_builder.ActionsMapperFactory.build(root={"TOKEN_TEST": available_action}), + ) + ], + ) + ] + + calculator = EligibilityCalculator(person_rows, campaign_configs) + + # When + actual = calculator.get_eligibility_status("Y", ["ALL"], "ALL") + + # Then + assert_that( + actual, + is_eligibility_status().with_conditions( + has_item(is_condition().with_condition_name(ConditionName("RSV")).and_status(Status.not_actionable)) + ), + ) + + assert actual.conditions[0].cohort_results[0].description == "DOB: 20250510" + assert actual.conditions[0].cohort_results[1].description == "LAST_SUCCESSFUL_DATE: 03 January 2024" + assert actual.conditions[0].actions[0].action_description == "## Get vaccinated at your GP surgery in QE1." + assert actual.conditions[0].actions[0].url_label == "Your GP practice code is ." + + audit_condition = g.audit_log.response.condition[0] + assert audit_condition.eligibility_cohort_groups[0].cohort_text in [ + "DOB: 20250510", + "LAST_SUCCESSFUL_DATE: 03 January 2024", + ] + assert audit_condition.eligibility_cohort_groups[1].cohort_text in [ + "DOB: 20250510", + "LAST_SUCCESSFUL_DATE: 03 January 2024", + ] + assert audit_condition.actions[0].action_description == "## Get vaccinated at your GP surgery in QE1." + assert audit_condition.actions[0].action_url_label == "Your GP practice code is ." + + +def test_eligibility_status_with_invalid_tokens_raises_attribute_error(faker: Faker): + # Given + nhs_number = NHSNumber(faker.nhs_number()) + date_of_birth = DateOfBirth(datetime.date(2025, 5, 10)) + + person_rows = person_rows_builder( + nhs_number, date_of_birth=date_of_birth, cohorts=["cohort_1"], vaccines=[("RSV", datetime.date(2024, 1, 3))] + ) + + target_attribute_token = "LAST_SUCCESSFUL_DATE: [[TARGET.RSV.LAST_SUCCESSFUL_DATE:INVALID_DATE_FORMAT(%d %B %Y)]]" # noqa: S105 + campaign_configs = [ + rule_builder.CampaignConfigFactory.build( + target="RSV", + iterations=[ + rule_builder.IterationFactory.build( + iteration_cohorts=[ + rule_builder.IterationCohortFactory.build( + cohort_label="cohort_1", positive_description=target_attribute_token + ), + ], + iteration_rules=[rule_builder.PersonAgeSuppressionRuleFactory.build()], + ) + ], + ) + ] + + calculator = EligibilityCalculator(person_rows, campaign_configs) + + with pytest.raises(ValueError, match="Invalid token."): + calculator.get_eligibility_status("Y", ["ALL"], "ALL") + + +def test_eligibility_status_with_invalid_person_attribute_name_raises_value_error(faker: Faker): + # Given + nhs_number = NHSNumber(faker.nhs_number()) + date_of_birth = DateOfBirth(datetime.date(2025, 5, 10)) + + person_rows = person_rows_builder( + nhs_number, date_of_birth=date_of_birth, cohorts=["cohort_1"], vaccines=[("RSV", datetime.date(2024, 1, 3))] + ) + + target_attribute_token = "LAST_SUCCESSFUL_DATE: [[TARGET.RSV.ICECREAM]]" # noqa: S105 + campaign_configs = [ + rule_builder.CampaignConfigFactory.build( + target="RSV", + iterations=[ + rule_builder.IterationFactory.build( + iteration_cohorts=[ + rule_builder.IterationCohortFactory.build( + cohort_label="cohort_1", positive_description=target_attribute_token + ), + ], + iteration_rules=[rule_builder.PersonAgeSuppressionRuleFactory.build()], + ) + ], + ) + ] + + calculator = EligibilityCalculator(person_rows, campaign_configs) + + with pytest.raises(ValueError): # noqa: PT011 + calculator.get_eligibility_status("Y", ["ALL"], "ALL") + + class TestEligibilityResultBuilder: def test_build_condition_results_single_condition_single_cohort_actionable(self): cohort_group_results = [CohortGroupResult("COHORT_A", Status.actionable, [], "Cohort A Description", [])] @@ -851,131 +995,124 @@ def test_build_condition_results_cohorts_status_not_matching_iteration_status(se assert_that(result.cohort_results[0].cohort_code, is_("COHORT_X")) assert_that(result.cohort_results[0].status, is_(Status.not_eligible)) - -@pytest.mark.parametrize( - ("reason_1", "reason_2", "reason_3", "expected_reasons"), - [ - # Same rule name, type, and priority, different description - ( - ReasonFactory.build(rule_description="description1", matcher_matched=True), - ReasonFactory.build(rule_description="description2", matcher_matched=True), - ReasonFactory.build(rule_description="description3", matcher_matched=True), - [ReasonFactory.build(rule_description="description1", matcher_matched=True)], - ), - # Different rule name, same type, same priority - ( - ReasonFactory.build(rule_name="Supress Rule 1", rule_description="description1", matcher_matched=True), - ReasonFactory.build(rule_name="Supress Rule 2", rule_description="description2", matcher_matched=True), - ReasonFactory.build(rule_name="Supress Rule 1", rule_description="description3", matcher_matched=True), - [ReasonFactory.build(rule_name="Supress Rule 1", rule_description="description1", matcher_matched=True)], - ), - # Same rule name, same type, different priority - ( - ReasonFactory.build(rule_priority="1", rule_description="description1", matcher_matched=True), - ReasonFactory.build(rule_priority="2", rule_description="description2", matcher_matched=True), - ReasonFactory.build(rule_priority="1", rule_description="description3", matcher_matched=True), - [ + @pytest.mark.parametrize( + ("reason_1", "reason_2", "reason_3", "expected_reasons"), + [ + # Same rule name, type, and priority, different description + ( + ReasonFactory.build(rule_description="description1", matcher_matched=True), + ReasonFactory.build(rule_description="description2", matcher_matched=True), + ReasonFactory.build(rule_description="description3", matcher_matched=True), + [ReasonFactory.build(rule_description="description1", matcher_matched=True)], + ), + # Different rule name, same type, same priority + ( + ReasonFactory.build(rule_name="Supress Rule 1", rule_description="description1", matcher_matched=True), + ReasonFactory.build(rule_name="Supress Rule 2", rule_description="description2", matcher_matched=True), + ReasonFactory.build(rule_name="Supress Rule 1", rule_description="description3", matcher_matched=True), + [ + ReasonFactory.build( + rule_name="Supress Rule 1", rule_description="description1", matcher_matched=True + ) + ], + ), + # Same rule name, same type, different priority + ( ReasonFactory.build(rule_priority="1", rule_description="description1", matcher_matched=True), ReasonFactory.build(rule_priority="2", rule_description="description2", matcher_matched=True), - ], - ), - # Same rule name, same priority, different type - ( - ReasonFactory.build(rule_type=RuleType.suppression, rule_description="description1", matcher_matched=True), - ReasonFactory.build(rule_type=RuleType.filter, rule_description="description2", matcher_matched=True), - ReasonFactory.build(rule_type=RuleType.suppression, rule_description="description3", matcher_matched=True), - [ + ReasonFactory.build(rule_priority="1", rule_description="description3", matcher_matched=True), + [ + ReasonFactory.build(rule_priority="1", rule_description="description1", matcher_matched=True), + ReasonFactory.build(rule_priority="2", rule_description="description2", matcher_matched=True), + ], + ), + # Same rule name, same priority, different type + ( ReasonFactory.build( rule_type=RuleType.suppression, rule_description="description1", matcher_matched=True ), ReasonFactory.build(rule_type=RuleType.filter, rule_description="description2", matcher_matched=True), - ], - ), - ], -) -def test_build_condition_results_grouping_reasons(reason_1, reason_2, reason_3, expected_reasons): - cohort_group_results = [ - CohortGroupResult( - "COHORT_X", - Status.not_actionable, - [reason_1, reason_3], - "Cohort X Description", - [], - ), - CohortGroupResult( - "COHORT_Y", - Status.not_actionable, - [reason_2, reason_3], - "Cohort Y Description", - [], - ), - ] - - iteration_result = IterationResult(Status.not_actionable, cohort_group_results, []) + ReasonFactory.build( + rule_type=RuleType.suppression, rule_description="description3", matcher_matched=True + ), + [ + ReasonFactory.build( + rule_type=RuleType.suppression, rule_description="description1", matcher_matched=True + ), + ReasonFactory.build( + rule_type=RuleType.filter, rule_description="description2", matcher_matched=True + ), + ], + ), + ], + ) + def test_build_condition_results_grouping_reasons(self, reason_1, reason_2, reason_3, expected_reasons): + cohort_group_results = [ + CohortGroupResult( + "COHORT_X", + Status.not_actionable, + [reason_1, reason_3], + "Cohort X Description", + [], + ), + CohortGroupResult( + "COHORT_Y", + Status.not_actionable, + [reason_2, reason_3], + "Cohort Y Description", + [], + ), + ] - result: Condition = EligibilityCalculator.build_condition(iteration_result, ConditionName("RSV")) + iteration_result = IterationResult(Status.not_actionable, cohort_group_results, []) - assert_that(result.suitability_rules, contains_inanyorder(*expected_reasons)) + result: Condition = EligibilityCalculator.build_condition(iteration_result, ConditionName("RSV")) + assert_that(result.suitability_rules, contains_inanyorder(*expected_reasons)) -@pytest.mark.parametrize( - ("reason_2", "expected_reasons"), - [ - # Same rule name, type, and priority, different description - ( - ReasonFactory.build( - rule_type=RuleType.suppression, - rule_description="Matching", - rule_name="Supress Rule 1", - rule_priority="1", - matcher_matched=True, - ), - [ - ReasonFactory.build( - rule_type=RuleType.suppression, - rule_description="Not matching", - rule_name="Supress Rule 1", - rule_priority="1", - matcher_matched=True, - ) - ], - ), - # Different rule name - ( - ReasonFactory.build( - rule_type=RuleType.suppression, - rule_description="Matching", - rule_name="Supress Rule 2", - rule_priority="1", - matcher_matched=True, - ), - [ + @pytest.mark.parametrize( + ("reason_2", "expected_reasons"), + [ + # Same rule name, type, and priority, different description + ( ReasonFactory.build( rule_type=RuleType.suppression, - rule_description="Not matching", + rule_description="Matching", rule_name="Supress Rule 1", rule_priority="1", matcher_matched=True, - ) - ], - ), - # Different priority - ( - ReasonFactory.build( - rule_type=RuleType.suppression, - rule_description="Matching", - rule_name="Supress Rule 1", - rule_priority="2", - matcher_matched=True, + ), + [ + ReasonFactory.build( + rule_type=RuleType.suppression, + rule_description="Not matching", + rule_name="Supress Rule 1", + rule_priority="1", + matcher_matched=True, + ) + ], ), - [ + # Different rule name + ( ReasonFactory.build( rule_type=RuleType.suppression, - rule_description="Not matching", - rule_name="Supress Rule 1", + rule_description="Matching", + rule_name="Supress Rule 2", rule_priority="1", matcher_matched=True, ), + [ + ReasonFactory.build( + rule_type=RuleType.suppression, + rule_description="Not matching", + rule_name="Supress Rule 1", + rule_priority="1", + matcher_matched=True, + ) + ], + ), + # Different priority + ( ReasonFactory.build( rule_type=RuleType.suppression, rule_description="Matching", @@ -983,25 +1120,25 @@ def test_build_condition_results_grouping_reasons(reason_1, reason_2, reason_3, rule_priority="2", matcher_matched=True, ), - ], - ), - # Different type - ( - ReasonFactory.build( - rule_type=RuleType.filter, - rule_description="Matching", - rule_name="Supress Rule 1", - rule_priority="2", - matcher_matched=True, + [ + ReasonFactory.build( + rule_type=RuleType.suppression, + rule_description="Not matching", + rule_name="Supress Rule 1", + rule_priority="1", + matcher_matched=True, + ), + ReasonFactory.build( + rule_type=RuleType.suppression, + rule_description="Matching", + rule_name="Supress Rule 1", + rule_priority="2", + matcher_matched=True, + ), + ], ), - [ - ReasonFactory.build( - rule_type=RuleType.suppression, - rule_description="Not matching", - rule_name="Supress Rule 1", - rule_priority="1", - matcher_matched=True, - ), + # Different type + ( ReasonFactory.build( rule_type=RuleType.filter, rule_description="Matching", @@ -1009,25 +1146,40 @@ def test_build_condition_results_grouping_reasons(reason_1, reason_2, reason_3, rule_priority="2", matcher_matched=True, ), - ], - ), - ], -) -def test_build_condition_results_single_cohort(reason_2, expected_reasons): - reason_1 = ReasonFactory.build( - rule_type=RuleType.suppression, - rule_description="Not matching", - rule_name="Supress Rule 1", - rule_priority="1", - matcher_matched=True, + [ + ReasonFactory.build( + rule_type=RuleType.suppression, + rule_description="Not matching", + rule_name="Supress Rule 1", + rule_priority="1", + matcher_matched=True, + ), + ReasonFactory.build( + rule_type=RuleType.filter, + rule_description="Matching", + rule_name="Supress Rule 1", + rule_priority="2", + matcher_matched=True, + ), + ], + ), + ], ) + def test_build_condition_results_single_cohort(self, reason_2, expected_reasons): + reason_1 = ReasonFactory.build( + rule_type=RuleType.suppression, + rule_description="Not matching", + rule_name="Supress Rule 1", + rule_priority="1", + matcher_matched=True, + ) - cohort_group_results = [ - CohortGroupResult("COHORT_Y", Status.not_actionable, [reason_1, reason_2], "Cohort Y Description", []) - ] + cohort_group_results = [ + CohortGroupResult("COHORT_Y", Status.not_actionable, [reason_1, reason_2], "Cohort Y Description", []) + ] - iteration_result = IterationResult(Status.not_actionable, cohort_group_results, []) - result = EligibilityCalculator.build_condition(iteration_result, ConditionName("RSV")) + iteration_result = IterationResult(Status.not_actionable, cohort_group_results, []) + result = EligibilityCalculator.build_condition(iteration_result, ConditionName("RSV")) - assert_that(len(result.cohort_results), is_(1)) - assert_that(result.cohort_results[0].reasons, contains_inanyorder(*expected_reasons)) + assert_that(len(result.cohort_results), is_(1)) + assert_that(result.cohort_results[0].reasons, contains_inanyorder(*expected_reasons)) diff --git a/tests/unit/services/processors/test_token_parser.py b/tests/unit/services/processors/test_token_parser.py new file mode 100644 index 00000000..d81465b3 --- /dev/null +++ b/tests/unit/services/processors/test_token_parser.py @@ -0,0 +1,57 @@ +import pytest + +from eligibility_signposting_api.services.processors.token_parser import TokenParser + + +class TestTokenParser: + @pytest.mark.parametrize( + ("token", "expected_level", "expected_name", "expected_value", "expected_format"), + [ + ("[[PERSON.AGE]]", "PERSON", "AGE", None, None), + ("[[TARGET.RSV.LAST_SUCCESSFUL_DATE]]", "TARGET", "RSV", "LAST_SUCCESSFUL_DATE", None), + ("[[PERSON.DATE_OF_BIRTH:DATE(%Y-%m-%d)]]", "PERSON", "DATE_OF_BIRTH", None, "%Y-%m-%d"), + ("[[TARGET.RSV.LAST_SUCCESSFUL_DATE:DATE(%d %B %Y)]]", "TARGET", "RSV", "LAST_SUCCESSFUL_DATE", "%d %B %Y"), + ("[[PERSON.DATE_OF_BIRTH:DATE()]]", "PERSON", "DATE_OF_BIRTH", None, ""), + ("[[person.age]]", "PERSON", "AGE", None, None), + ("[[PERSON.age]]", "PERSON", "AGE", None, None), + ("[[TARGET.RSV.last_successful_date]]", "TARGET", "RSV", "LAST_SUCCESSFUL_DATE", None), + ("[[PERSON.DATE_OF_BIRTH:date(%Y-%m-%d)]]", "PERSON", "DATE_OF_BIRTH", None, "%Y-%m-%d"), + ("[[PERSON.AGE.EXTRA]]", "PERSON", "AGE", "EXTRA", None), + ], + ) + def test_parse_valid_tokens(self, token, expected_level, expected_name, expected_value, expected_format): + parsed_token = TokenParser.parse(token) + assert parsed_token.attribute_level == expected_level + assert parsed_token.attribute_name == expected_name + assert parsed_token.attribute_value == expected_value + assert parsed_token.format == expected_format + + @pytest.mark.parametrize( + "token", + [ + "[[.AGE]]", + "[[PERSON.]]", + "[[]]", + "[[PERSON]]", + "[[.PERSON.AGE]]", + "[[PERSON.AGE.]]", + ], + ) + def test_parse_invalid_tokens_raises_error(self, token): + with pytest.raises(ValueError, match="Invalid token."): + TokenParser.parse(token) + + @pytest.mark.parametrize( + "token", + [ + "[[PERSON.DATE_OF_BIRTH:DATE(]]", + "[[PERSON.DATE_OF_BIRTH:DATE)]]", + "[[PERSON.DATE_OF_BIRTH:DATE]]", + "[[PERSON.DATE_OF_BIRTH:INVALID_FORMAT(abc)]]", + "[[PERSON.DATE_OF_BIRTH:INVALID_FORMAT(a (b) c)]]", + "[[PERSON.DATE_OF_BIRTH:DATE(a (b) c)]]", + ], + ) + def test_parse_invalid_token_format_raises_error(self, token): + with pytest.raises(ValueError, match="Invalid token format."): + TokenParser.parse(token) diff --git a/tests/unit/services/processors/test_token_processor.py b/tests/unit/services/processors/test_token_processor.py new file mode 100644 index 00000000..78dbbeff --- /dev/null +++ b/tests/unit/services/processors/test_token_processor.py @@ -0,0 +1,387 @@ +import re + +import pytest + +from eligibility_signposting_api.model import eligibility_status +from eligibility_signposting_api.model.eligibility_status import ( + CohortGroupResult, + Condition, + ConditionName, + Reason, + RuleDescription, + RulePriority, + RuleType, + Status, + StatusText, +) +from eligibility_signposting_api.model.person import Person +from eligibility_signposting_api.services.processors.token_processor import TokenProcessor + + +class TestTokenProcessor: + def test_simple_token_replacement(self): + person = Person([{"ATTRIBUTE_TYPE": "PERSON", "AGE": "30"}]) + + condition = Condition( + condition_name=ConditionName("RSV"), + status=Status.actionable, + status_text=StatusText("Your age is [[PERSON.AGE]]."), + cohort_results=[], + suitability_rules=[], + actions=[], + ) + + expected = Condition( + condition_name=ConditionName("RSV"), + status=Status.actionable, + status_text=StatusText("Your age is 30."), + cohort_results=[], + suitability_rules=[], + actions=[], + ) + + actual = TokenProcessor.find_and_replace_tokens(person, condition) + + assert actual == expected + + def test_deep_nesting_token_replacement(self): + person = Person([{"ATTRIBUTE_TYPE": "PERSON", "AGE": "30", "DEGREE": "DOCTOR", "QUALITY": "NICE"}]) + + reason1 = Reason( + RuleType.suppression, + eligibility_status.RuleName("Rule1"), + RulePriority("1"), + RuleDescription("This is a rule."), + matcher_matched=False, + ) + reason2 = Reason( + RuleType.filter, + eligibility_status.RuleName("Rule2"), + RulePriority("1"), + RuleDescription("Rule [[PERSON.AGE]] here."), + matcher_matched=True, + ) + + cohort_result = CohortGroupResult( + cohort_code="CohortCode", + status=Status.actionable, + reasons=[reason1, reason2], + description="Results for cohort [[PERSON.AGE]].", + audit_rules=[], + ) + + condition = Condition( + condition_name=ConditionName("ConditionName"), + status=Status.not_actionable, + status_text=StatusText("Everything is [[PERSON.QUALITY]]."), + cohort_results=[cohort_result], + suitability_rules=[], + actions=[], + ) + + actual = TokenProcessor.find_and_replace_tokens(person, condition) + + assert actual.cohort_results[0].description == "Results for cohort 30." + assert actual.cohort_results[0].reasons[1].rule_description == "Rule 30 here." + assert actual.status_text == StatusText("Everything is NICE.") + + def test_invalid_token_on_person_attribute_should_raise_error(self): + person = Person([{"ATTRIBUTE_TYPE": "PERSON", "AGE": "30"}]) + + condition = Condition( + condition_name=ConditionName("RSV"), + status=Status.actionable, + status_text=StatusText("Your age is [[PERSON.ICECREAM]]."), + cohort_results=[], + suitability_rules=[], + actions=[], + ) + + expected_error = re.escape("Invalid attribute name 'ICECREAM' in token '[[PERSON.ICECREAM]]'.") + + with pytest.raises(ValueError, match=expected_error): + TokenProcessor.find_and_replace_tokens(person, condition) + + def test_invalid_token_should_raise_error(self): + person = Person([{"ATTRIBUTE_TYPE": "PERSON", "AGE": "30"}]) + + condition = Condition( + condition_name=ConditionName("RSV"), + status=Status.actionable, + status_text=StatusText("Your favourite flavor is: [[ICECREAM.FLAVOR]]."), + cohort_results=[], + suitability_rules=[], + actions=[], + ) + + expected_error = re.escape("Invalid attribute level 'ICECREAM' in token '[[ICECREAM.FLAVOR]]'.") + with pytest.raises(ValueError, match=expected_error): + TokenProcessor.find_and_replace_tokens(person, condition) + + def test_invalid_token_on_target_attribute_should_raise_error(self): + person = Person([{"ATTRIBUTE_TYPE": "RSV", "LAST_SUCCESSFUL_DATE": "20250101"}]) + + condition = Condition( + condition_name=ConditionName("Condition name is [[TARGET.RSV.ICECREAM]]"), + status=Status.actionable, + status_text=StatusText("Some status"), + cohort_results=[], + suitability_rules=[], + actions=[], + ) + + expected_error = re.escape("Invalid attribute name 'ICECREAM' in token '[[TARGET.RSV.ICECREAM]]'.") + with pytest.raises(ValueError, match=expected_error): + TokenProcessor.find_and_replace_tokens(person, condition) + + def test_missing_target_attribute_and_invalid_token_should_raise_error(self): + person = Person([{"ATTRIBUTE_TYPE": "PERSON", "AGE": "30"}]) + + condition = Condition( + condition_name=ConditionName("Condition name is [[TARGET.RSV.ICECREAM]]"), + status=Status.actionable, + status_text=StatusText("Some status"), + cohort_results=[], + suitability_rules=[], + actions=[], + ) + + expected_error = re.escape("Invalid attribute name 'ICECREAM' in token '[[TARGET.RSV.ICECREAM]]'.") + with pytest.raises(ValueError, match=expected_error): + TokenProcessor.find_and_replace_tokens(person, condition) + + def test_missing_patient_vaccine_data_on_target_attribute_should_replace_with_empty(self): + person = Person([{"ATTRIBUTE_TYPE": "PERSON", "AGE": "30"}]) + + condition = Condition( + condition_name=ConditionName("Last successful date: [[TARGET.RSV.LAST_SUCCESSFUL_DATE]]"), + status=Status.actionable, + status_text=StatusText("Some status"), + cohort_results=[], + suitability_rules=[], + actions=[], + ) + + actual = TokenProcessor.find_and_replace_tokens(person, condition) + + assert actual.condition_name == "Last successful date: " + + def test_not_allowed_target_conditions_token_should_raise_error(self): + person = Person( + [ + {"ATTRIBUTE_TYPE": "PERSON", "AGE": "30"}, + {"ATTRIBUTE_TYPE": "YELLOW_FEVER", "CONDITION_NAME": "COVID", "LAST_SUCCESSFUL_DATE": "20250101"}, + ] + ) + + condition = Condition( + condition_name=ConditionName("Last successful date: [[TARGET.YELLOW_FEVER.LAST_SUCCESSFUL_DATE]]"), + status=Status.actionable, + status_text=StatusText("Some status"), + cohort_results=[], + suitability_rules=[], + actions=[], + ) + + expected_error = re.escape( + "Invalid attribute name 'LAST_SUCCESSFUL_DATE' in token '[[TARGET.YELLOW_FEVER.LAST_SUCCESSFUL_DATE]]'." + ) + with pytest.raises(ValueError, match=expected_error): + TokenProcessor.find_and_replace_tokens(person, condition) + + def test_valid_token_but_missing_attribute_data_to_replace(self): + person = Person( + [ + {"ATTRIBUTE_TYPE": "PERSON", "AGE": "30", "POSTCODE": None}, + {"ATTRIBUTE_TYPE": "RSV", "CONDITION_NAME": "RSV", "LAST_SUCCESSFUL_DATE": None}, + {"ATTRIBUTE_TYPE": "COVID", "CONDITION_NAME": "COVID", "LAST_SUCCESSFUL_DATE": "20250101"}, + ] + ) + + condition = Condition( + condition_name=ConditionName( + "You had your RSV vaccine on [[TARGET.RSV.LAST_SUCCESSFUL_DATE:DATE(%d %B %Y)]]" + ), + status=Status.actionable, + status_text=StatusText("You are from [[PERSON.POSTCODE]]."), + cohort_results=[], + suitability_rules=[], + actions=[], + ) + + expected = Condition( + condition_name=ConditionName("You had your RSV vaccine on "), + status=Status.actionable, + status_text=StatusText("You are from ."), + cohort_results=[], + suitability_rules=[], + actions=[], + ) + + actual = TokenProcessor.find_and_replace_tokens(person, condition) + + assert actual.status_text == expected.status_text + assert actual.condition_name == expected.condition_name + + def test_simple_string_with_multiple_tokens(self): + person = Person( + [ + {"ATTRIBUTE_TYPE": "PERSON", "AGE": "30", "DEGREE": "DOCTOR", "QUALITY": "NICE"}, + ] + ) + + condition = Condition( + condition_name=ConditionName("RSV"), + status=Status.actionable, + status_text=StatusText( + "You are a [[PERSON.QUALITY]] [[person.QUALITY]] " + "[[TARGET.RSV.LAST_SUCCESSFUL_DATE]] and your age is [[PERSON.AGE]]." + ), + cohort_results=[], + suitability_rules=[], + actions=[], + ) + + expected = Condition( + condition_name=ConditionName("RSV"), + status=Status.actionable, + status_text=StatusText("You are a NICE NICE and your age is 30."), + cohort_results=[], + suitability_rules=[], + actions=[], + ) + + actual = TokenProcessor.find_and_replace_tokens(person, condition) + + assert actual == expected + + def test_valid_token_valid_format_should_replace_with_date_formatting(self): + person = Person( + [ + {"ATTRIBUTE_TYPE": "PERSON", "AGE": "30", "DATE_OF_BIRTH": "19900327"}, + {"ATTRIBUTE_TYPE": "RSV", "CONDITION_NAME": "RSV", "LAST_SUCCESSFUL_DATE": "20250101"}, + {"ATTRIBUTE_TYPE": "COVID", "CONDITION_NAME": "COVID", "LAST_SUCCESSFUL_DATE": "20250101"}, + ] + ) + + condition = Condition( + condition_name=ConditionName( + "You had your COVID vaccine on [[TARGET.COVID.LAST_SUCCESSFUL_DATE:DATE(%d %B %Y)]]" + ), + status=Status.actionable, + status_text=StatusText("Your birthday is on [[PERSON.DATE_OF_BIRTH:DATE(%-d %B %Y)]]"), + cohort_results=[], + suitability_rules=[], + actions=[], + ) + + expected = Condition( + condition_name=ConditionName("You had your COVID vaccine on 01 January 2025"), + status=Status.actionable, + status_text=StatusText("Your birthday is on 27 March 1990"), + cohort_results=[], + suitability_rules=[], + actions=[], + ) + + actual = TokenProcessor.find_and_replace_tokens(person, condition) + + assert actual.condition_name == expected.condition_name + assert actual.status_text == expected.status_text + + @pytest.mark.parametrize( + "token_format", + [ + ":INVALID_DATE_FORMATTER(%ABC)", + ":INVALID_DATE_FORMATTER(19900327)", + ":()", + ":FORMAT(DATE)", + ":FORMAT(BLAH)", + ":DATE[%d %B %Y]", + ":DATE(%A, (%d) %B %Y)", + ], + ) + def test_valid_token_invalid_format_should_raise_error(self, token_format: str): + person = Person([{"ATTRIBUTE_TYPE": "PERSON", "AGE": "30", "DATE_OF_BIRTH": "19900327"}]) + + condition = Condition( + condition_name=ConditionName("You had your RSV vaccine"), + status=Status.actionable, + status_text=StatusText(f"Your birthday is on [[PERSON.DATE_OF_BIRTH{token_format}]]"), + cohort_results=[], + suitability_rules=[], + actions=[], + ) + + with pytest.raises(ValueError, match="Invalid token format."): + TokenProcessor.find_and_replace_tokens(person, condition) + + @pytest.mark.parametrize( + ("token_format", "expected"), + [ + (":DATE(%d %b %Y)", "27 Mar 1990"), + (":DATE()", ""), + ("", "19900327"), + (":DATE(random_value)", "random_value"), + (":DATE(random_value %Y)", "random_value 1990"), + (":DATE(%d %B %Y)", "27 March 1990"), + (":DATE(%A, %d %B %Y)", "Tuesday, 27 March 1990"), + (":DATE(%A, {%d} %B %Y)", "Tuesday, {27} March 1990"), + (":dATE(%A, {%d} %B %Y)", "Tuesday, {27} March 1990"), + (":date(%A, {%d} %B %Y)", "Tuesday, {27} March 1990"), + ], + ) + def test_valid_date_format(self, token_format: str, expected: str): + person = Person( + [ + {"ATTRIBUTE_TYPE": "MMR", "CONDITION_NAME": "MMR", "LAST_SUCCESSFUL_DATE": "19900327"}, + ] + ) + + condition = Condition( + condition_name=ConditionName(f"Date: [[TARGET.MMR.LAST_SUCCESSFUL_DATE{token_format}]]"), + status=Status.actionable, + status_text=StatusText("Some text"), + cohort_results=[], + suitability_rules=[], + actions=[], + ) + + actual = TokenProcessor.find_and_replace_tokens(person, condition) + + assert actual.condition_name == f"Date: {expected}" + + @pytest.mark.parametrize( + ("token", "expected"), + [ + ("[[person.DATE_OF_BIRTH:DATE(%d %B %Y)]]", "27 March 1990"), + ("[[PERSON.date_of_birth:DATE(%d %B %Y)]]", "27 March 1990"), + ("[[PERSON.DATE_OF_BIRTH:date(%d %B %Y)]]", "27 March 1990"), + ("[[pErSoN.DATE_OF_BIRTH:DATE(%d %B %Y)]]", "27 March 1990"), + ("[[target.FLU.LAST_SUCCESSFUL_DATE:DATE(%-d %B %Y)]]", "1 January 2025"), + ("[[TARGET.FLU.LAST_SUCCESSFUL_DATE:DATE(%-d %B %Y)]]", "1 January 2025"), + ("[[TARGET.FLU.last_successful_date:DATE(%-d %B %Y)]]", "1 January 2025"), + ("[[TARGET.FLU.last_successful_date:date(%-d %B %Y)]]", "1 January 2025"), + ], + ) + def test_token_replace_is_case_insensitive(self, token: str, expected: str): + person = Person( + [ + {"ATTRIBUTE_TYPE": "PERSON", "AGE": "30", "DATE_OF_BIRTH": "19900327"}, + {"ATTRIBUTE_TYPE": "FLU", "CONDITION_NAME": "FLU", "LAST_SUCCESSFUL_DATE": "20250101"}, + ] + ) + + condition = Condition( + condition_name=ConditionName(f"FLU vaccine on: {token}."), + status=Status.actionable, + status_text=StatusText(f"Your DOB is: {token}."), + cohort_results=[], + suitability_rules=[], + actions=[], + ) + + result = TokenProcessor.find_and_replace_tokens(person, condition) + + assert result.status_text == f"Your DOB is: {expected}." + assert result.condition_name == f"FLU vaccine on: {expected}."