Skip to content

Commit 3fe53ad

Browse files
ELI-223 Provide date of last vaccination (#300)
* WIP: basic find replace, with one value, one person data. * ELI-223: Nested token support added * ELI-223: Replaces token with given valid format * ELI-223: Fixes tests * ELI-223: Date format replacement for person and target * WIP: Added test for invalid formatter. * ELI-223: Handle error scenarios * ELI-223: Handles case insensitive token replacement * ELI-223: Supports token replacement in audit * ELI-223: Token Parser * ELI-223: Integrate token parser in calculator * ELI-223: Adds tests * ELI-223: Moves token logic to token_processor * ELI-223: Adds audit token replacement and tests * ELI-223: Adds all valid TARGET fields * Added test case for clarity. * Formatting. * WIP: linting. * Fixed integration test. * ELI-223: Fixed linting * ELI-223: Fixed linting * ELI-223: Adds error integration tests * ELI-223: Adds more tests * ELI-223: Adds more tests and linting * ELI-223: Adds tests * ELI-223: Renames test * ELI-223: Extracts constants * ELI-223: Fixes review comments --------- Co-authored-by: Shweta <[email protected]>
1 parent 2760489 commit 3fe53ad

File tree

10 files changed

+1209
-147
lines changed

10 files changed

+1209
-147
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
from typing import Literal
2+
13
MAGIC_COHORT_LABEL = "elid_all_people"
24
RULE_STOP_DEFAULT = False
35
NHS_NUMBER_HEADER = "nhs-login-nhs-number"
6+
ALLOWED_CONDITIONS = Literal["COVID", "FLU", "MMR", "RSV"]

src/eligibility_signposting_api/model/campaign_config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
from pydantic import BaseModel, Field, HttpUrl, RootModel, field_serializer, field_validator, model_validator
1313

14-
from eligibility_signposting_api.config.contants import MAGIC_COHORT_LABEL, RULE_STOP_DEFAULT
14+
from eligibility_signposting_api.config.contants import ALLOWED_CONDITIONS, MAGIC_COHORT_LABEL, RULE_STOP_DEFAULT
1515

1616
if typing.TYPE_CHECKING: # pragma: no cover
1717
from pydantic import SerializationInfo
@@ -184,7 +184,7 @@ class CampaignConfig(BaseModel):
184184
version: CampaignVersion = Field(..., alias="Version")
185185
name: CampaignName = Field(..., alias="Name")
186186
type: Literal["V", "S"] = Field(..., alias="Type")
187-
target: Literal["COVID", "FLU", "MMR", "RSV"] = Field(..., alias="Target")
187+
target: ALLOWED_CONDITIONS = Field(..., alias="Target")
188188
manager: list[str] | None = Field(None, alias="Manager")
189189
approver: list[str] | None = Field(None, alias="Approver")
190190
reviewer: list[str] | None = Field(None, alias="Reviewer")

src/eligibility_signposting_api/services/calculators/eligibility_calculator.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from eligibility_signposting_api.services.processors.action_rule_handler import ActionRuleHandler
2424
from eligibility_signposting_api.services.processors.campaign_evaluator import CampaignEvaluator
2525
from eligibility_signposting_api.services.processors.rule_processor import RuleProcessor
26+
from eligibility_signposting_api.services.processors.token_processor import TokenProcessor
2627

2728
if TYPE_CHECKING:
2829
from collections.abc import Collection
@@ -34,6 +35,7 @@
3435
)
3536
from eligibility_signposting_api.model.person import Person
3637

38+
3739
logger = logging.getLogger(__name__)
3840

3941

@@ -99,12 +101,16 @@ def get_eligibility_status(self, include_actions: str, conditions: list[str], ca
99101
include_actions_flag=include_actions_flag,
100102
)
101103

104+
best_iteration_result = TokenProcessor.find_and_replace_tokens(self.person, best_iteration_result)
105+
matched_action_detail = TokenProcessor.find_and_replace_tokens(self.person, matched_action_detail)
106+
102107
condition_results[condition_name] = best_iteration_result.iteration_result
103108
condition_results[condition_name].actions = matched_action_detail.actions
104109

105110
condition: Condition = self.build_condition(
106111
iteration_result=condition_results[condition_name], condition_name=condition_name
107112
)
113+
108114
final_result.append(condition)
109115

110116
AuditContext.append_audit_condition(
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import re
2+
from dataclasses import dataclass
3+
4+
5+
@dataclass
6+
class ParsedToken:
7+
"""
8+
A class to represent a parsed token.
9+
...
10+
Attributes
11+
----------
12+
attribute_level : str
13+
Example: "PERSON" or "TARGET"
14+
attribute_name : str
15+
Example: "POSTCODE" or "RSV"
16+
attribute_value : int
17+
Example: "LAST_SUCCESSFUL_DATE" if attribute_level is TARGET
18+
format : str
19+
Example: "%d %B %Y" if DATE formatting is used
20+
"""
21+
22+
attribute_level: str
23+
attribute_name: str
24+
attribute_value: str | None
25+
format: str | None
26+
27+
28+
class TokenParser:
29+
MIN_TOKEN_PARTS = 2
30+
31+
@staticmethod
32+
def parse(token: str) -> ParsedToken:
33+
"""Parses a token into its parts.
34+
Steps:
35+
Strip the surrounding [[ ]]
36+
Check for empty body after stripping, e.g., '[[]]'
37+
Check for empty parts created by leading/trailing dots or tokens with no dot
38+
Check if the name contains a date format
39+
Return a ParsedToken object
40+
"""
41+
42+
token_body = token[2:-2]
43+
if not token_body:
44+
message = "Invalid token."
45+
raise ValueError(message)
46+
47+
token_parts = token_body.split(".")
48+
49+
if len(token_parts) < TokenParser.MIN_TOKEN_PARTS or not all(token_parts):
50+
message = "Invalid token."
51+
raise ValueError(message)
52+
53+
token_level = token_parts[0].upper()
54+
token_name = token_parts[-1]
55+
56+
format_match = re.search(r":DATE\(([^()]*)\)", token_name, re.IGNORECASE)
57+
if not format_match and len(token_name.split(":")) > 1:
58+
message = "Invalid token format."
59+
raise ValueError(message)
60+
61+
format_str = format_match.group(1) if format_match else None
62+
63+
last_part = re.sub(r":DATE\(.*?\)", "", token_name, flags=re.IGNORECASE)
64+
65+
if len(token_parts) == TokenParser.MIN_TOKEN_PARTS:
66+
name = last_part.upper()
67+
value = None
68+
else:
69+
name = token_parts[1].upper()
70+
value = last_part.upper()
71+
72+
return ParsedToken(attribute_level=token_level, attribute_name=name, attribute_value=value, format=format_str)
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import re
2+
from dataclasses import Field, fields, is_dataclass
3+
from datetime import UTC, datetime
4+
from typing import Any, Never, TypeVar
5+
6+
from wireup import service
7+
8+
from eligibility_signposting_api.config.contants import ALLOWED_CONDITIONS
9+
from eligibility_signposting_api.model.person import Person
10+
from eligibility_signposting_api.services.processors.token_parser import ParsedToken, TokenParser
11+
12+
T = TypeVar("T")
13+
14+
15+
TARGET_ATTRIBUTE_LEVEL = "TARGET"
16+
PERSON_ATTRIBUTE_LEVEL = "PERSON"
17+
ALLOWED_TARGET_ATTRIBUTES = {
18+
"ATTRIBUTE_TYPE",
19+
"VALID_DOSES_COUNT",
20+
"INVALID_DOSES_COUNT",
21+
"LAST_SUCCESSFUL_DATE",
22+
"LAST_VALID_DOSE_DATE",
23+
"BOOKED_APPOINTMENT_DATE",
24+
"BOOKED_APPOINTMENT_PROVIDER",
25+
"LAST_INVITE_DATE",
26+
"LAST_INVITE_STATUS",
27+
}
28+
29+
30+
@service
31+
class TokenProcessor:
32+
@staticmethod
33+
def find_and_replace_tokens(person: Person, data_class: T) -> T:
34+
if not is_dataclass(data_class):
35+
return data_class
36+
for class_field in fields(data_class):
37+
value = getattr(data_class, class_field.name)
38+
if isinstance(value, str):
39+
setattr(data_class, class_field.name, TokenProcessor.replace_token(value, person))
40+
elif isinstance(value, list):
41+
TokenProcessor.process_list(class_field, data_class, person, value)
42+
elif isinstance(value, dict):
43+
TokenProcessor.process_dict(class_field, data_class, person, value)
44+
elif is_dataclass(value):
45+
setattr(data_class, class_field.name, TokenProcessor.find_and_replace_tokens(person, value))
46+
return data_class
47+
48+
@staticmethod
49+
def process_dict(class_field: Field, data_class: object, person: Person, value: dict[Any, Any]) -> None:
50+
for key, dict_value in value.items():
51+
if isinstance(dict_value, str):
52+
value[key] = TokenProcessor.replace_token(dict_value, person)
53+
elif is_dataclass(dict_value):
54+
value[key] = TokenProcessor.find_and_replace_tokens(person, dict_value)
55+
setattr(data_class, class_field.name, value)
56+
57+
@staticmethod
58+
def process_list(class_field: Field, data_class: object, person: Person, value: list[Any]) -> None:
59+
for i, item in enumerate(value):
60+
if is_dataclass(item):
61+
value[i] = TokenProcessor.find_and_replace_tokens(person, item)
62+
elif isinstance(item, str):
63+
value[i] = TokenProcessor.replace_token(item, person)
64+
setattr(data_class, class_field.name, value)
65+
66+
@staticmethod
67+
def replace_token(text: str, person: Person) -> str:
68+
if not isinstance(text, str):
69+
return text
70+
71+
pattern = r"\[\[.*?\]\]"
72+
all_tokens = re.findall(pattern, text, re.IGNORECASE)
73+
present_attributes = [attribute.get("ATTRIBUTE_TYPE") for attribute in person.data]
74+
75+
for token in all_tokens:
76+
parsed_token = TokenParser.parse(token)
77+
found_attribute, key_to_replace, replace_with = None, None, None
78+
79+
attribute_level_map = {
80+
TARGET_ATTRIBUTE_LEVEL: parsed_token.attribute_value,
81+
PERSON_ATTRIBUTE_LEVEL: parsed_token.attribute_name,
82+
}
83+
84+
key_to_find = attribute_level_map.get(parsed_token.attribute_level)
85+
86+
if (
87+
parsed_token.attribute_level == TARGET_ATTRIBUTE_LEVEL
88+
and parsed_token.attribute_name in ALLOWED_CONDITIONS.__args__
89+
and parsed_token.attribute_value in ALLOWED_TARGET_ATTRIBUTES
90+
and parsed_token.attribute_name not in present_attributes
91+
):
92+
replace_with = ""
93+
94+
if replace_with != "":
95+
for attribute in person.data:
96+
is_person_attribute = attribute.get("ATTRIBUTE_TYPE") == PERSON_ATTRIBUTE_LEVEL
97+
is_allowed_target = parsed_token.attribute_name.upper() in ALLOWED_CONDITIONS.__args__
98+
99+
if (is_allowed_target or is_person_attribute) and key_to_find in attribute:
100+
found_attribute = attribute
101+
key_to_replace = key_to_find
102+
break
103+
104+
if not found_attribute or key_to_replace is None:
105+
TokenProcessor.handle_token_not_found(parsed_token, token)
106+
107+
replace_with = TokenProcessor.apply_formatting(found_attribute, key_to_replace, parsed_token.format)
108+
text = text.replace(token, str(replace_with))
109+
return text
110+
111+
@staticmethod
112+
def handle_token_not_found(parsed_token: ParsedToken, token: str) -> Never:
113+
if parsed_token.attribute_level == TARGET_ATTRIBUTE_LEVEL:
114+
message = f"Invalid attribute name '{parsed_token.attribute_value}' in token '{token}'."
115+
raise ValueError(message)
116+
if parsed_token.attribute_level == PERSON_ATTRIBUTE_LEVEL:
117+
message = f"Invalid attribute name '{parsed_token.attribute_name}' in token '{token}'."
118+
raise ValueError(message)
119+
message = f"Invalid attribute level '{parsed_token.attribute_level}' in token '{token}'."
120+
raise ValueError(message)
121+
122+
@staticmethod
123+
def apply_formatting(attribute: dict[str, T], attribute_value: str, date_format: str | None) -> str:
124+
try:
125+
attribute_data = attribute.get(attribute_value)
126+
if (date_format or date_format == "") and attribute_data:
127+
replace_with_date_object = datetime.strptime(str(attribute_data), "%Y%m%d").replace(tzinfo=UTC)
128+
replace_with = replace_with_date_object.strftime(str(date_format))
129+
else:
130+
replace_with = attribute_data if attribute_data else ""
131+
return str(replace_with)
132+
except AttributeError as error:
133+
message = "Invalid token format"
134+
raise AttributeError(message) from error

0 commit comments

Comments
 (0)