diff --git a/src/eligibility_signposting_api/model/eligibility.py b/src/eligibility_signposting_api/model/eligibility.py index e33b2b43..e0ba3122 100644 --- a/src/eligibility_signposting_api/model/eligibility.py +++ b/src/eligibility_signposting_api/model/eligibility.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from dataclasses import dataclass from datetime import date from enum import Enum, StrEnum, auto @@ -30,6 +32,29 @@ def __lt__(self, other: Self) -> bool: return self.value < other.value return NotImplemented + @property + def is_exclusion(self) -> bool: + return self is not Status.actionable + + @staticmethod + def worst(*statuses: Status) -> Status: + """Pick the worst status from those given. + + Here "worst" means furthest from being able to access vaccination, so not-eligible is "worse" than + not-actionable, and not-actionable is "worse" than actionable. + """ + return min(statuses) + + @staticmethod + def best(*statuses: Status) -> Status: + """Pick the best status between the existing status, and the status implied by + the rule excluding the person from vaccination. + + Here "best" means closest to being able to access vaccination, so not-actionable is "better" than + not-eligible, and actionable is "better" than not-actionable. + """ + return max(statuses) + @dataclass class Reason: diff --git a/src/eligibility_signposting_api/services/calculators/eligibility_calculator.py b/src/eligibility_signposting_api/services/calculators/eligibility_calculator.py index 5cab648a..f8f58aea 100644 --- a/src/eligibility_signposting_api/services/calculators/eligibility_calculator.py +++ b/src/eligibility_signposting_api/services/calculators/eligibility_calculator.py @@ -8,11 +8,10 @@ from itertools import groupby from typing import Any -from hamcrest.core.string_description import StringDescription from wireup import service from eligibility_signposting_api.model import eligibility, rules -from eligibility_signposting_api.services.rules.operators import OperatorRegistry +from eligibility_signposting_api.services.calculators.rule_calculator import RuleCalculator Row = Collection[Mapping[str, Any]] @@ -153,45 +152,22 @@ def evaluate_priority_group( eligibility.Status.not_eligible if exclude_capable_rules else eligibility.Status.actionable ) for iteration_rule in exclude_capable_rules: - exclusion, reason = self.evaluate_exclusion(iteration_rule) - if exclusion: - best_status_so_far_for_priority_group = self.best_status( - iteration_rule.type, best_status_so_far_for_priority_group + rule_calculator = RuleCalculator(person_data=self.person_data, rule=iteration_rule) + status, reason = rule_calculator.evaluate_exclusion() + if status.is_exclusion: + best_status_so_far_for_priority_group = eligibility.Status.best( + status, best_status_so_far_for_priority_group ) exclusion_reasons.append(reason) else: best_status_so_far_for_priority_group = eligibility.Status.actionable actionable_reasons.append(reason) return ( - self.worst_status(best_status_so_far_for_priority_group, worst_status_so_far_for_condition), + eligibility.Status.worst(best_status_so_far_for_priority_group, worst_status_so_far_for_condition), actionable_reasons, exclusion_reasons, ) - @staticmethod - def worst_status(*statuses: eligibility.Status) -> eligibility.Status: - """Pick the worst status from those given. - - Here "worst" means furthest from being able to access vaccination, so not-eligible is "worse" than - not-actionable, and not-actionable is "worse" than actionable. - """ - return min(statuses) - - @staticmethod - def best_status(rule_type: rules.RuleType, status: eligibility.Status) -> eligibility.Status: - """Pick the best status between the existing status, and the status implied by - the rule excluding the person from vaccination. - - Here "best" means closest to being able to access vaccination, so not-actionable is "better" than - not-eligible, and actionable is "better" than not-actionable. - """ - return max( - status, - eligibility.Status.not_eligible - if rule_type == rules.RuleType.filter - else eligibility.Status.not_actionable, - ) - def get_base_eligible_conditions( self, base_eligible_evaluations: Mapping[ @@ -204,48 +180,7 @@ def get_base_eligible_conditions( # for each condition for which the person is base eligible: # what is the "best" status, i.e. closest to actionable? Add the condition to the result with that status. for condition_name, reasons_by_status in base_eligible_evaluations.items(): - best_status = max(reasons_by_status.keys()) + best_status = eligibility.Status.best(*list(reasons_by_status.keys())) self.results[condition_name] = eligibility.Condition( condition_name=condition_name, status=best_status, reasons=reasons_by_status[best_status] ) - - def evaluate_exclusion(self, iteration_rule: rules.IterationRule) -> tuple[bool, eligibility.Reason]: - """Evaluate if a particular rule excludes this person. Return the result, and the reason for the result.""" - attribute_value = self.get_attribute_value(iteration_rule) - exclusion, reason = self.evaluate_rule(iteration_rule, attribute_value) - reason = eligibility.Reason( - rule_name=eligibility.RuleName(iteration_rule.name), - rule_type=eligibility.RuleType(iteration_rule.type), - rule_result=eligibility.RuleResult( - f"Rule {iteration_rule.name!r} ({iteration_rule.description!r}) " - f"{'' if exclusion else 'not '}excluding - " - f"{iteration_rule.attribute_name!r} {iteration_rule.comparator!r} {reason}" - ), - ) - return exclusion, reason - - def get_attribute_value(self, iteration_rule: rules.IterationRule) -> str | None: - """Pull out the correct attribute for a rule from the person's data.""" - match iteration_rule.attribute_level: - case rules.RuleAttributeLevel.PERSON: - person: Mapping[str, str | None] | None = next( - (r for r in self.person_data if r.get("ATTRIBUTE_TYPE", "") == "PERSON"), None - ) - attribute_value = person.get(iteration_rule.attribute_name) if person else None - case _: # pragma: no cover - msg = f"{iteration_rule.attribute_level} not implemented" - raise NotImplementedError(msg) - return attribute_value - - @staticmethod - def evaluate_rule(iteration_rule: rules.IterationRule, attribute_value: str | None) -> tuple[bool, str]: - """Evaluate a rule against a person data attribute. Return the result, and the reason for the result.""" - matcher_class = OperatorRegistry.get(iteration_rule.operator) - matcher = matcher_class(rule_value=iteration_rule.comparator) - - reason = StringDescription() - if matcher.matches(attribute_value): - matcher.describe_match(attribute_value, reason) - return True, str(reason) - matcher.describe_mismatch(attribute_value, reason) - return False, str(reason) diff --git a/src/eligibility_signposting_api/services/calculators/rule_calculator.py b/src/eligibility_signposting_api/services/calculators/rule_calculator.py new file mode 100644 index 00000000..14d9db70 --- /dev/null +++ b/src/eligibility_signposting_api/services/calculators/rule_calculator.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +from collections.abc import Collection, Mapping +from dataclasses import dataclass +from typing import Any + +from hamcrest.core.string_description import StringDescription + +from eligibility_signposting_api.model import eligibility, rules +from eligibility_signposting_api.services.rules.operators import OperatorRegistry + +Row = Collection[Mapping[str, Any]] + + +@dataclass +class RuleCalculator: + person_data: Row + rule: rules.IterationRule + + def evaluate_exclusion(self) -> tuple[eligibility.Status, eligibility.Reason]: + """Evaluate if a particular rule excludes this person. Return the result, and the reason for the result.""" + attribute_value = self.get_attribute_value() + status, reason = self.evaluate_rule(attribute_value) + reason = eligibility.Reason( + rule_name=eligibility.RuleName(self.rule.name), + rule_type=eligibility.RuleType(self.rule.type), + rule_result=eligibility.RuleResult( + f"Rule {self.rule.name!r} ({self.rule.description!r}) " + f"{'' if status.is_exclusion else 'not '}excluding - " + f"{self.rule.attribute_name!r} {self.rule.comparator!r} {reason}" + ), + ) + return status, reason + + def get_attribute_value(self) -> str | None: + """Pull out the correct attribute for a rule from the person's data.""" + match self.rule.attribute_level: + case rules.RuleAttributeLevel.PERSON: + person: Mapping[str, str | None] | None = next( + (r for r in self.person_data if r.get("ATTRIBUTE_TYPE", "") == "PERSON"), None + ) + attribute_value = person.get(self.rule.attribute_name) if person else None + case _: # pragma: no cover + msg = f"{self.rule.attribute_level} not implemented" + raise NotImplementedError(msg) + return attribute_value + + def evaluate_rule(self, attribute_value: str | None) -> tuple[eligibility.Status, str]: + """Evaluate a rule against a person data attribute. Return the result, and the reason for the result.""" + matcher_class = OperatorRegistry.get(self.rule.operator) + matcher = matcher_class(rule_value=self.rule.comparator) + + reason = StringDescription() + if matcher.matches(attribute_value): + matcher.describe_match(attribute_value, reason) + status = { + rules.RuleType.filter: eligibility.Status.not_eligible, + rules.RuleType.suppression: eligibility.Status.not_actionable, + }[self.rule.type] + return status, str(reason) + matcher.describe_mismatch(attribute_value, reason) + return eligibility.Status.actionable, str(reason)