Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/eligibility_signposting_api/model/eligibility.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from dataclasses import dataclass
from datetime import date
from enum import Enum, StrEnum, auto
Expand Down Expand Up @@ -30,6 +32,29 @@ def __lt__(self, other: Self) -> bool:
return self.value < other.value
return NotImplemented

@property
def is_exclusion(self) -> bool:
return self is not Status.actionable

@staticmethod
def worst(*statuses: Status) -> Status:
"""Pick the worst status from those given.

Here "worst" means furthest from being able to access vaccination, so not-eligible is "worse" than
not-actionable, and not-actionable is "worse" than actionable.
"""
return min(statuses)

@staticmethod
def best(*statuses: Status) -> Status:
"""Pick the best status between the existing status, and the status implied by
the rule excluding the person from vaccination.

Here "best" means closest to being able to access vaccination, so not-actionable is "better" than
not-eligible, and actionable is "better" than not-actionable.
"""
return max(statuses)


@dataclass
class Reason:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
from itertools import groupby
from typing import Any

from hamcrest.core.string_description import StringDescription
from wireup import service

from eligibility_signposting_api.model import eligibility, rules
from eligibility_signposting_api.services.rules.operators import OperatorRegistry
from eligibility_signposting_api.services.calculators.rule_calculator import RuleCalculator

Row = Collection[Mapping[str, Any]]

Expand Down Expand Up @@ -153,45 +152,22 @@ def evaluate_priority_group(
eligibility.Status.not_eligible if exclude_capable_rules else eligibility.Status.actionable
)
for iteration_rule in exclude_capable_rules:
exclusion, reason = self.evaluate_exclusion(iteration_rule)
if exclusion:
best_status_so_far_for_priority_group = self.best_status(
iteration_rule.type, best_status_so_far_for_priority_group
rule_calculator = RuleCalculator(person_data=self.person_data, rule=iteration_rule)
status, reason = rule_calculator.evaluate_exclusion()
if status.is_exclusion:
best_status_so_far_for_priority_group = eligibility.Status.best(
status, best_status_so_far_for_priority_group
)
exclusion_reasons.append(reason)
else:
best_status_so_far_for_priority_group = eligibility.Status.actionable
actionable_reasons.append(reason)
return (
self.worst_status(best_status_so_far_for_priority_group, worst_status_so_far_for_condition),
eligibility.Status.worst(best_status_so_far_for_priority_group, worst_status_so_far_for_condition),
actionable_reasons,
exclusion_reasons,
)

@staticmethod
def worst_status(*statuses: eligibility.Status) -> eligibility.Status:
"""Pick the worst status from those given.

Here "worst" means furthest from being able to access vaccination, so not-eligible is "worse" than
not-actionable, and not-actionable is "worse" than actionable.
"""
return min(statuses)

@staticmethod
def best_status(rule_type: rules.RuleType, status: eligibility.Status) -> eligibility.Status:
"""Pick the best status between the existing status, and the status implied by
the rule excluding the person from vaccination.

Here "best" means closest to being able to access vaccination, so not-actionable is "better" than
not-eligible, and actionable is "better" than not-actionable.
"""
return max(
status,
eligibility.Status.not_eligible
if rule_type == rules.RuleType.filter
else eligibility.Status.not_actionable,
)

def get_base_eligible_conditions(
self,
base_eligible_evaluations: Mapping[
Expand All @@ -204,48 +180,7 @@ def get_base_eligible_conditions(
# for each condition for which the person is base eligible:
# what is the "best" status, i.e. closest to actionable? Add the condition to the result with that status.
for condition_name, reasons_by_status in base_eligible_evaluations.items():
best_status = max(reasons_by_status.keys())
best_status = eligibility.Status.best(*list(reasons_by_status.keys()))
self.results[condition_name] = eligibility.Condition(
condition_name=condition_name, status=best_status, reasons=reasons_by_status[best_status]
)

def evaluate_exclusion(self, iteration_rule: rules.IterationRule) -> tuple[bool, eligibility.Reason]:
"""Evaluate if a particular rule excludes this person. Return the result, and the reason for the result."""
attribute_value = self.get_attribute_value(iteration_rule)
exclusion, reason = self.evaluate_rule(iteration_rule, attribute_value)
reason = eligibility.Reason(
rule_name=eligibility.RuleName(iteration_rule.name),
rule_type=eligibility.RuleType(iteration_rule.type),
rule_result=eligibility.RuleResult(
f"Rule {iteration_rule.name!r} ({iteration_rule.description!r}) "
f"{'' if exclusion else 'not '}excluding - "
f"{iteration_rule.attribute_name!r} {iteration_rule.comparator!r} {reason}"
),
)
return exclusion, reason

def get_attribute_value(self, iteration_rule: rules.IterationRule) -> str | None:
"""Pull out the correct attribute for a rule from the person's data."""
match iteration_rule.attribute_level:
case rules.RuleAttributeLevel.PERSON:
person: Mapping[str, str | None] | None = next(
(r for r in self.person_data if r.get("ATTRIBUTE_TYPE", "") == "PERSON"), None
)
attribute_value = person.get(iteration_rule.attribute_name) if person else None
case _: # pragma: no cover
msg = f"{iteration_rule.attribute_level} not implemented"
raise NotImplementedError(msg)
return attribute_value

@staticmethod
def evaluate_rule(iteration_rule: rules.IterationRule, attribute_value: str | None) -> tuple[bool, str]:
"""Evaluate a rule against a person data attribute. Return the result, and the reason for the result."""
matcher_class = OperatorRegistry.get(iteration_rule.operator)
matcher = matcher_class(rule_value=iteration_rule.comparator)

reason = StringDescription()
if matcher.matches(attribute_value):
matcher.describe_match(attribute_value, reason)
return True, str(reason)
matcher.describe_mismatch(attribute_value, reason)
return False, str(reason)
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from __future__ import annotations

from collections.abc import Collection, Mapping
from dataclasses import dataclass
from typing import Any

from hamcrest.core.string_description import StringDescription

from eligibility_signposting_api.model import eligibility, rules
from eligibility_signposting_api.services.rules.operators import OperatorRegistry

Row = Collection[Mapping[str, Any]]


@dataclass
class RuleCalculator:
person_data: Row
rule: rules.IterationRule

def evaluate_exclusion(self) -> tuple[eligibility.Status, eligibility.Reason]:
"""Evaluate if a particular rule excludes this person. Return the result, and the reason for the result."""
attribute_value = self.get_attribute_value()
status, reason = self.evaluate_rule(attribute_value)
reason = eligibility.Reason(
rule_name=eligibility.RuleName(self.rule.name),
rule_type=eligibility.RuleType(self.rule.type),
rule_result=eligibility.RuleResult(
f"Rule {self.rule.name!r} ({self.rule.description!r}) "
f"{'' if status.is_exclusion else 'not '}excluding - "
f"{self.rule.attribute_name!r} {self.rule.comparator!r} {reason}"
),
)
return status, reason

def get_attribute_value(self) -> str | None:
"""Pull out the correct attribute for a rule from the person's data."""
match self.rule.attribute_level:
case rules.RuleAttributeLevel.PERSON:
person: Mapping[str, str | None] | None = next(
(r for r in self.person_data if r.get("ATTRIBUTE_TYPE", "") == "PERSON"), None
)
attribute_value = person.get(self.rule.attribute_name) if person else None
case _: # pragma: no cover
msg = f"{self.rule.attribute_level} not implemented"
raise NotImplementedError(msg)
return attribute_value

def evaluate_rule(self, attribute_value: str | None) -> tuple[eligibility.Status, str]:
"""Evaluate a rule against a person data attribute. Return the result, and the reason for the result."""
matcher_class = OperatorRegistry.get(self.rule.operator)
matcher = matcher_class(rule_value=self.rule.comparator)

reason = StringDescription()
if matcher.matches(attribute_value):
matcher.describe_match(attribute_value, reason)
status = {
rules.RuleType.filter: eligibility.Status.not_eligible,
rules.RuleType.suppression: eligibility.Status.not_actionable,
}[self.rule.type]
return status, str(reason)
matcher.describe_mismatch(attribute_value, reason)
return eligibility.Status.actionable, str(reason)
Loading