Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
32a4567
WIP: basic find replace, with one value, one person data.
ayeshalshukri1-nhs Aug 14, 2025
b2fbcec
ELI-223: Nested token support added
shweta-nhs Aug 14, 2025
9828e65
ELI-223: Replaces token with given valid format
shweta-nhs Aug 15, 2025
304c6a4
ELI-223: Fixes tests
shweta-nhs Aug 18, 2025
fb973dc
ELI-223: Date format replacement for person and target
shweta-nhs Aug 18, 2025
97a109f
WIP: Added test for invalid formatter.
ayeshalshukri1-nhs Aug 18, 2025
19c15fa
Merge branch 'main' into eli-223-provide-date-last-vaccination
ayeshalshukri1-nhs Aug 18, 2025
5553cf7
ELI-223: Handle error scenarios
shweta-nhs Aug 19, 2025
a18667a
ELI-223: Handles case insensitive token replacement
shweta-nhs Aug 19, 2025
0a2a6d7
ELI-223: Supports token replacement in audit
shweta-nhs Aug 19, 2025
c08c356
Merge branch 'main' into eli-223-provide-date-last-vaccination
ayeshalshukri1-nhs Aug 20, 2025
a960ab2
Merge branch 'main' into eli-223-provide-date-last-vaccination
ayeshalshukri1-nhs Aug 20, 2025
5ba36a6
ELI-223: Token Parser
shweta-nhs Aug 19, 2025
27f9918
ELI-223: Integrate token parser in calculator
shweta-nhs Aug 20, 2025
ebe82e7
ELI-223: Adds tests
shweta-nhs Aug 20, 2025
0231a27
ELI-223: Moves token logic to token_processor
shweta-nhs Aug 21, 2025
b33606f
ELI-223: Adds audit token replacement and tests
shweta-nhs Aug 21, 2025
cfd8325
ELI-223: Adds all valid TARGET fields
shweta-nhs Aug 21, 2025
23dff19
Merge branch 'main' into eli-223-provide-date-last-vaccination
ayeshalshukri1-nhs Aug 22, 2025
554d6c7
Merge branch 'main' into eli-223-provide-date-last-vaccination
ayeshalshukri1-nhs Aug 22, 2025
ef1aa47
Added test case for clarity.
ayeshalshukri1-nhs Aug 22, 2025
730fca8
Formatting.
ayeshalshukri1-nhs Aug 22, 2025
119917a
WIP: linting.
ayeshalshukri1-nhs Aug 22, 2025
67551ac
Fixed integration test.
ayeshalshukri1-nhs Aug 22, 2025
6279adf
Merge branch 'main' into eli-223-provide-date-last-vaccination
ayeshalshukri1-nhs Aug 27, 2025
4ce0080
ELI-223: Fixed linting
shweta-nhs Aug 27, 2025
5d1bda8
ELI-223: Fixed linting
shweta-nhs Aug 27, 2025
767ddf3
ELI-223: Adds error integration tests
shweta-nhs Aug 27, 2025
92ea92e
Merge branch 'main' into eli-223-provide-date-last-vaccination
shweta-nhs Aug 27, 2025
1bb5e7f
ELI-223: Adds more tests
shweta-nhs Aug 27, 2025
99a553c
ELI-223: Adds more tests and linting
shweta-nhs Aug 27, 2025
c0816f0
ELI-223: Adds tests
shweta-nhs Aug 27, 2025
c873c62
ELI-223: Renames test
shweta-nhs Aug 27, 2025
90202e4
ELI-223: Extracts constants
shweta-nhs Aug 28, 2025
4519ee6
ELI-223: Fixes review comments
shweta-nhs Aug 28, 2025
6fa24bc
Merge branch 'main' into eli-223-provide-date-last-vaccination
shweta-nhs Aug 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/eligibility_signposting_api/config/contants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from typing import Literal

MAGIC_COHORT_LABEL = "elid_all_people"
RULE_STOP_DEFAULT = False
NHS_NUMBER_HEADER = "nhs-login-nhs-number"
ALLOWED_CONDITIONS = Literal["COVID", "FLU", "MMR", "RSV"]
4 changes: 2 additions & 2 deletions src/eligibility_signposting_api/model/campaign_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from pydantic import BaseModel, Field, HttpUrl, RootModel, field_serializer, field_validator, model_validator

from eligibility_signposting_api.config.contants import MAGIC_COHORT_LABEL, RULE_STOP_DEFAULT
from eligibility_signposting_api.config.contants import ALLOWED_CONDITIONS, MAGIC_COHORT_LABEL, RULE_STOP_DEFAULT

if typing.TYPE_CHECKING: # pragma: no cover
from pydantic import SerializationInfo
Expand Down Expand Up @@ -184,7 +184,7 @@ class CampaignConfig(BaseModel):
version: CampaignVersion = Field(..., alias="Version")
name: CampaignName = Field(..., alias="Name")
type: Literal["V", "S"] = Field(..., alias="Type")
target: Literal["COVID", "FLU", "MMR", "RSV"] = Field(..., alias="Target")
target: ALLOWED_CONDITIONS = Field(..., alias="Target")
manager: list[str] | None = Field(None, alias="Manager")
approver: list[str] | None = Field(None, alias="Approver")
reviewer: list[str] | None = Field(None, alias="Reviewer")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from eligibility_signposting_api.services.processors.action_rule_handler import ActionRuleHandler
from eligibility_signposting_api.services.processors.campaign_evaluator import CampaignEvaluator
from eligibility_signposting_api.services.processors.rule_processor import RuleProcessor
from eligibility_signposting_api.services.processors.token_processor import TokenProcessor

if TYPE_CHECKING:
from collections.abc import Collection
Expand All @@ -34,6 +35,7 @@
)
from eligibility_signposting_api.model.person import Person


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -99,12 +101,16 @@ def get_eligibility_status(self, include_actions: str, conditions: list[str], ca
include_actions_flag=include_actions_flag,
)

best_iteration_result = TokenProcessor.find_and_replace_tokens(self.person, best_iteration_result)
matched_action_detail = TokenProcessor.find_and_replace_tokens(self.person, matched_action_detail)

condition_results[condition_name] = best_iteration_result.iteration_result
condition_results[condition_name].actions = matched_action_detail.actions

condition: Condition = self.build_condition(
iteration_result=condition_results[condition_name], condition_name=condition_name
)

final_result.append(condition)

AuditContext.append_audit_condition(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import re
from dataclasses import dataclass


@dataclass
class ParsedToken:
"""
A class to represent a parsed token.
...
Attributes
----------
attribute_level : str
Example: "PERSON" or "TARGET"
attribute_name : str
Example: "POSTCODE" or "RSV"
attribute_value : int
Example: "LAST_SUCCESSFUL_DATE" if attribute_level is TARGET
format : str
Example: "%d %B %Y" if DATE formatting is used
"""

attribute_level: str
attribute_name: str
attribute_value: str | None
format: str | None


class TokenParser:
MIN_TOKEN_PARTS = 2

@staticmethod
def parse(token: str) -> ParsedToken:
"""Parses a token into its parts.
Steps:
Strip the surrounding [[ ]]
Check for empty body after stripping, e.g., '[[]]'
Check for empty parts created by leading/trailing dots or tokens with no dot
Check if the name contains a date format
Return a ParsedToken object
"""

token_body = token[2:-2]
if not token_body:
message = "Invalid token."
raise ValueError(message)

token_parts = token_body.split(".")

if len(token_parts) < TokenParser.MIN_TOKEN_PARTS or not all(token_parts):
message = "Invalid token."
raise ValueError(message)

token_level = token_parts[0].upper()
token_name = token_parts[-1]

format_match = re.search(r":DATE\(([^()]*)\)", token_name, re.IGNORECASE)
if not format_match and len(token_name.split(":")) > 1:
message = "Invalid token format."
raise ValueError(message)

format_str = format_match.group(1) if format_match else None

last_part = re.sub(r":DATE\(.*?\)", "", token_name, flags=re.IGNORECASE)

if len(token_parts) == TokenParser.MIN_TOKEN_PARTS:
name = last_part.upper()
value = None
else:
name = token_parts[1].upper()
value = last_part.upper()

return ParsedToken(attribute_level=token_level, attribute_name=name, attribute_value=value, format=format_str)
134 changes: 134 additions & 0 deletions src/eligibility_signposting_api/services/processors/token_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import re
from dataclasses import Field, fields, is_dataclass
from datetime import UTC, datetime
from typing import Any, Never, TypeVar

from wireup import service

from eligibility_signposting_api.config.contants import ALLOWED_CONDITIONS
from eligibility_signposting_api.model.person import Person
from eligibility_signposting_api.services.processors.token_parser import ParsedToken, TokenParser

T = TypeVar("T")


TARGET_ATTRIBUTE_LEVEL = "TARGET"
PERSON_ATTRIBUTE_LEVEL = "PERSON"
ALLOWED_TARGET_ATTRIBUTES = {
"ATTRIBUTE_TYPE",
"VALID_DOSES_COUNT",
"INVALID_DOSES_COUNT",
"LAST_SUCCESSFUL_DATE",
"LAST_VALID_DOSE_DATE",
"BOOKED_APPOINTMENT_DATE",
"BOOKED_APPOINTMENT_PROVIDER",
"LAST_INVITE_DATE",
"LAST_INVITE_STATUS",
}


@service
class TokenProcessor:
@staticmethod
def find_and_replace_tokens(person: Person, data_class: T) -> T:
if not is_dataclass(data_class):
return data_class
for class_field in fields(data_class):
value = getattr(data_class, class_field.name)
if isinstance(value, str):
setattr(data_class, class_field.name, TokenProcessor.replace_token(value, person))
elif isinstance(value, list):
TokenProcessor.process_list(class_field, data_class, person, value)
elif isinstance(value, dict):
TokenProcessor.process_dict(class_field, data_class, person, value)
elif is_dataclass(value):
setattr(data_class, class_field.name, TokenProcessor.find_and_replace_tokens(person, value))
return data_class

@staticmethod
def process_dict(class_field: Field, data_class: object, person: Person, value: dict[Any, Any]) -> None:
for key, dict_value in value.items():
if isinstance(dict_value, str):
value[key] = TokenProcessor.replace_token(dict_value, person)
elif is_dataclass(dict_value):
value[key] = TokenProcessor.find_and_replace_tokens(person, dict_value)
setattr(data_class, class_field.name, value)

@staticmethod
def process_list(class_field: Field, data_class: object, person: Person, value: list[Any]) -> None:
for i, item in enumerate(value):
if is_dataclass(item):
value[i] = TokenProcessor.find_and_replace_tokens(person, item)
elif isinstance(item, str):
value[i] = TokenProcessor.replace_token(item, person)
setattr(data_class, class_field.name, value)

@staticmethod
def replace_token(text: str, person: Person) -> str:
if not isinstance(text, str):
return text

pattern = r"\[\[.*?\]\]"
all_tokens = re.findall(pattern, text, re.IGNORECASE)
present_attributes = [attribute.get("ATTRIBUTE_TYPE") for attribute in person.data]

for token in all_tokens:
parsed_token = TokenParser.parse(token)
found_attribute, key_to_replace, replace_with = None, None, None

attribute_level_map = {
TARGET_ATTRIBUTE_LEVEL: parsed_token.attribute_value,
PERSON_ATTRIBUTE_LEVEL: parsed_token.attribute_name,
}

key_to_find = attribute_level_map.get(parsed_token.attribute_level)

if (
parsed_token.attribute_level == TARGET_ATTRIBUTE_LEVEL
and parsed_token.attribute_name in ALLOWED_CONDITIONS.__args__
and parsed_token.attribute_value in ALLOWED_TARGET_ATTRIBUTES
and parsed_token.attribute_name not in present_attributes
):
replace_with = ""

if replace_with != "":
for attribute in person.data:
is_person_attribute = attribute.get("ATTRIBUTE_TYPE") == PERSON_ATTRIBUTE_LEVEL
is_allowed_target = parsed_token.attribute_name.upper() in ALLOWED_CONDITIONS.__args__

if (is_allowed_target or is_person_attribute) and key_to_find in attribute:
found_attribute = attribute
key_to_replace = key_to_find
break

if not found_attribute or key_to_replace is None:
TokenProcessor.handle_token_not_found(parsed_token, token)

replace_with = TokenProcessor.apply_formatting(found_attribute, key_to_replace, parsed_token.format)
text = text.replace(token, str(replace_with))
return text

@staticmethod
def handle_token_not_found(parsed_token: ParsedToken, token: str) -> Never:
if parsed_token.attribute_level == TARGET_ATTRIBUTE_LEVEL:
message = f"Invalid attribute name '{parsed_token.attribute_value}' in token '{token}'."
raise ValueError(message)
if parsed_token.attribute_level == PERSON_ATTRIBUTE_LEVEL:
message = f"Invalid attribute name '{parsed_token.attribute_name}' in token '{token}'."
raise ValueError(message)
message = f"Invalid attribute level '{parsed_token.attribute_level}' in token '{token}'."
raise ValueError(message)

@staticmethod
def apply_formatting(attribute: dict[str, T], attribute_value: str, date_format: str | None) -> str:
try:
attribute_data = attribute.get(attribute_value)
if (date_format or date_format == "") and attribute_data:
replace_with_date_object = datetime.strptime(str(attribute_data), "%Y%m%d").replace(tzinfo=UTC)
replace_with = replace_with_date_object.strftime(str(date_format))
else:
replace_with = attribute_data if attribute_data else ""
return str(replace_with)
except AttributeError as error:
message = "Invalid token format"
raise AttributeError(message) from error
Loading