Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from collections import defaultdict
from collections.abc import Collection, Iterator, Mapping
from dataclasses import dataclass, field
from functools import cached_property
from itertools import groupby
from typing import Any

Expand All @@ -28,159 +27,120 @@ class EligibilityCalculator:
person_data: Row
campaign_configs: Collection[rules.CampaignConfig]

results: dict[eligibility.ConditionName, eligibility.Condition] = field(default_factory=dict)
results: list[eligibility.Condition] = field(default_factory=list)

@cached_property
def condition_names(self) -> set[eligibility.ConditionName]:
return {
eligibility.ConditionName(cc.target)
for cc in self.campaign_configs
if cc.campaign_live and cc.current_iteration
}

def evaluate_eligibility(self) -> eligibility.EligibilityStatus:
"""Calculate a person's eligibility for vaccination."""
@property
def active_campaigns(self) -> list[rules.CampaignConfig]:
return [cc for cc in self.campaign_configs if cc.campaign_live and cc.current_iteration]

# Get all iterations for which the person is base eligible, i.e. those which *might* provide eligibility
# due to cohort membership.
base_eligible_campaigns = self.get_base_eligible_campaigns()
@property
def campaigns_grouped_by_condition_name(
self,
) -> Iterator[tuple[eligibility.ConditionName, list[rules.CampaignConfig]]]:
"""Generator function to iterate over campaign groups by condition name."""

# Evaluate iteration rules to see if the person is actionable, not actionable (due to "F" rules),
# or not eligible (due to "S" rules")
evaluations = self.evaluate_for_base_eligible_campaigns(base_eligible_campaigns)
for condition_name, campaign_group in groupby(
sorted(self.active_campaigns, key=attrgetter("target")), key=attrgetter("target")
):
yield condition_name, list(campaign_group)

# Add all not base eligible conditions to result set.
self.get_not_base_eligible_conditions(base_eligible_campaigns)
# Add all base eligible conditions to result set.
self.get_base_eligible_conditions(evaluations)
def evaluate_eligibility(self) -> eligibility.EligibilityStatus:
"""Iterates over campaign groups, evaluates eligibility, and returns a consolidated status."""

return eligibility.EligibilityStatus(conditions=list(self.results.values()))
for condition_name, campaign_group in self.campaigns_grouped_by_condition_name:
if base_eligible_campaigns := self.get_the_base_eligible_campaigns(campaign_group):
status, reasons = self.evaluate_eligibility_by_iteration_rules(base_eligible_campaigns)
# Append the evaluation result for this condition to the results list
self.results.append(eligibility.Condition(condition_name, status, reasons))
else:
# Create and append the evaluation result, as no campaign config is base eligible
self.results.append(eligibility.Condition(condition_name, eligibility.Status.not_eligible, []))

def get_base_eligible_campaigns(self) -> list[rules.CampaignConfig]:
"""Get all campaigns for which the person is base eligible, i.e. those which *might* provide eligibility.
# Return the overall eligibility status, constructed from the list of condition results
return eligibility.EligibilityStatus(conditions=list(self.results))

Build and return a collection of campaigns for which the person is base eligible (using cohorts).
Also build and return a set of conditions in the campaigns while we are here.
"""
base_eligible_campaigns: list[rules.CampaignConfig] = []
def get_the_base_eligible_campaigns(self, campaign_group: list[rules.CampaignConfig]) -> list[rules.CampaignConfig]:
"""Return campaigns for which the person is base eligible via cohorts."""

for campaign_config in (cc for cc in self.campaign_configs if cc.campaign_live and cc.current_iteration):
base_eligible = self.evaluate_base_eligibility(campaign_config.current_iteration)
if base_eligible:
base_eligible_campaigns.append(campaign_config)
base_eligible_campaigns: list[rules.CampaignConfig] = [
campaign
for campaign in campaign_group
if campaign.campaign_live
and campaign.current_iteration
and self.check_base_eligibility(campaign.current_iteration)
]

return base_eligible_campaigns
if base_eligible_campaigns:
return base_eligible_campaigns
return []

def evaluate_base_eligibility(self, iteration: rules.Iteration | None) -> set[str]:
def check_base_eligibility(self, iteration: rules.Iteration | None) -> set[str]:
"""Return cohorts for which person is base eligible."""

if not iteration:
return set()
iteration_cohorts: set[str] = {
cohort.cohort_label for cohort in iteration.iteration_cohorts if cohort.cohort_label
}

cohorts_row: Mapping[str, dict[str, dict[str, dict[str, Any]]]] = next(
(r for r in self.person_data if r.get("ATTRIBUTE_TYPE", "") == "COHORTS"), {}
(row for row in self.person_data if row.get("ATTRIBUTE_TYPE") == "COHORTS"), {}
)
person_cohorts = set(cohorts_row.get("COHORT_MAP", {}).get("cohorts", {}).get("M", {}).keys())
person_cohorts: set[str] = set(cohorts_row.get("COHORT_MAP", {}).get("cohorts", {}).get("M", {}).keys())
return iteration_cohorts & person_cohorts

return iteration_cohorts.intersection(person_cohorts)

def get_not_base_eligible_conditions(
self,
base_eligible_campaigns: Collection[rules.CampaignConfig],
) -> None:
"""Get conditions where the person is not base eligible,
i.e. is not is the cohort for any campaign iteration."""

# for each condition:
# if the person isn't base eligible for any iteration,
# the person is not (base) eligible for the condition
for condition_name in self.condition_names:
if condition_name not in {eligibility.ConditionName(cc.target) for cc in base_eligible_campaigns}:
self.results[condition_name] = eligibility.Condition(
condition_name=condition_name, status=eligibility.Status.not_eligible, reasons=[]
)

def evaluate_for_base_eligible_campaigns(
self, base_eligible_campaigns: Collection[rules.CampaignConfig]
) -> dict[eligibility.ConditionName, dict[eligibility.Status, list[eligibility.Reason]]]:
def evaluate_eligibility_by_iteration_rules(
self, campaign_group: list[rules.CampaignConfig]
) -> tuple[eligibility.Status, list[eligibility.Reason]]:
"""Evaluate iteration rules to see if the person is actionable, not actionable (due to "F" rules),
or not eligible (due to "S" rules").

For each condition, evaluate all iterations for inclusion or exclusion."""

priority_getter = attrgetter("priority")

base_eligible_evaluations: dict[
eligibility.ConditionName, dict[eligibility.Status, list[eligibility.Reason]]
] = defaultdict(dict)
for condition_name, iteration in [
(eligibility.ConditionName(cc.target), cc.current_iteration)
for cc in base_eligible_campaigns
if cc.current_iteration
]:
status_with_reasons: dict[eligibility.Status, list[eligibility.Reason]] = defaultdict()

for iteration in [cc.current_iteration for cc in campaign_group if cc.current_iteration]:
# Until we see a worse status, we assume someone is actionable for this iteration.
worst_status_so_far_for_condition = eligibility.Status.actionable
worst_status = eligibility.Status.actionable
exclusion_reasons, actionable_reasons = [], []
for _priority, iteration_rule_group in groupby(
sorted(iteration.iteration_rules, key=priority_getter), key=priority_getter
):
worst_status_so_far_for_condition, group_actionable_reasons, group_exclusion_reasons = (
self.evaluate_priority_group(iteration_rule_group, worst_status_so_far_for_condition)
)
actionable_reasons.extend(group_actionable_reasons)
exclusion_reasons.extend(group_exclusion_reasons)
condition_entry = base_eligible_evaluations.setdefault(condition_name, {})
condition_status_entry = condition_entry.setdefault(worst_status_so_far_for_condition, [])
by_priority = sorted(iteration.iteration_rules, key=priority_getter)
for _, rule_group in groupby(by_priority, key=priority_getter):
status, group_actionable, group_exclusions = self.evaluate_priority_group(rule_group, worst_status)
# Merge results
worst_status = status
actionable_reasons.extend(group_actionable)
exclusion_reasons.extend(group_exclusions)
condition_status_entry = status_with_reasons.setdefault(worst_status, [])
condition_status_entry.extend(
actionable_reasons
if worst_status_so_far_for_condition is eligibility.Status.actionable
else exclusion_reasons
actionable_reasons if worst_status is eligibility.Status.actionable else exclusion_reasons
)
return base_eligible_evaluations

best_status = eligibility.Status.best(*list(status_with_reasons.keys()))

return best_status, status_with_reasons[best_status]

def evaluate_priority_group(
self,
iteration_rule_group: Iterator[rules.IterationRule],
worst_status_so_far_for_condition: eligibility.Status,
) -> tuple[eligibility.Status, list[eligibility.Reason], list[eligibility.Reason]]:
actionable_reasons, exclusion_reasons = [], []
exclusion_reasons, actionable_reasons = [], []
exclude_capable_rules = [
ir for ir in iteration_rule_group if ir.type in (rules.RuleType.filter, rules.RuleType.suppression)
]
best_status_so_far_for_priority_group = (
eligibility.Status.not_eligible if exclude_capable_rules else eligibility.Status.actionable
)
for iteration_rule in exclude_capable_rules:
rule_calculator = RuleCalculator(person_data=self.person_data, rule=iteration_rule)

best_status = eligibility.Status.not_eligible if exclude_capable_rules else eligibility.Status.actionable

for rule in exclude_capable_rules:
rule_calculator = RuleCalculator(person_data=self.person_data, rule=rule)
status, reason = rule_calculator.evaluate_exclusion()
if status.is_exclusion:
best_status_so_far_for_priority_group = eligibility.Status.best(
status, best_status_so_far_for_priority_group
)
best_status = eligibility.Status.best(status, best_status)
exclusion_reasons.append(reason)
else:
best_status_so_far_for_priority_group = eligibility.Status.actionable
best_status = eligibility.Status.actionable
actionable_reasons.append(reason)
return (
eligibility.Status.worst(best_status_so_far_for_priority_group, worst_status_so_far_for_condition),
actionable_reasons,
exclusion_reasons,
)

def get_base_eligible_conditions(
self,
base_eligible_evaluations: Mapping[
eligibility.ConditionName, Mapping[eligibility.Status, list[eligibility.Reason]]
],
) -> None:
"""Get conditions where the person is base eligible, but may be either actionable, not actionable,
or not eligible."""

# for each condition for which the person is base eligible:
# what is the "best" status, i.e. closest to actionable? Add the condition to the result with that status.
for condition_name, reasons_by_status in base_eligible_evaluations.items():
best_status = eligibility.Status.best(*list(reasons_by_status.keys()))
self.results[condition_name] = eligibility.Condition(
condition_name=condition_name, status=best_status, reasons=reasons_by_status[best_status]
)
worst_group_status = eligibility.Status.worst(best_status, worst_status_so_far_for_condition)
return worst_group_status, actionable_reasons, exclusion_reasons
56 changes: 55 additions & 1 deletion tests/unit/services/calculators/test_eligibility_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ def test_rules_with_same_priority_must_all_match_to_exclude(
)


def test_multiple_conditions(faker: Faker):
def test_multiple_conditions_where_both_are_actionable(faker: Faker):
# Given
nhs_number = NHSNumber(f"5{faker.random_int(max=999999999):09d}")
date_of_birth = DateOfBirth(faker.date_of_birth(minimum_age=76, maximum_age=78))
Expand Down Expand Up @@ -446,6 +446,60 @@ def test_multiple_conditions(faker: Faker):
)


def test_multiple_conditions_where_all_give_unique_statuses(faker: Faker):
# Given
nhs_number = NHSNumber(f"5{faker.random_int(max=999999999):09d}")
date_of_birth = DateOfBirth(faker.date_of_birth(minimum_age=76, maximum_age=78))

person_rows = person_rows_builder(nhs_number, date_of_birth=date_of_birth, cohorts=["cohort1"])
campaign_configs = [
rule_builder.CampaignConfigFactory.build(
target="RSV",
iterations=[
rule_builder.IterationFactory.build(
iteration_cohorts=[rule_builder.IterationCohortFactory.build(cohort_label="cohort1")],
iteration_rules=[rule_builder.PersonAgeSuppressionRuleFactory.build()],
)
],
),
rule_builder.CampaignConfigFactory.build(
target="COVID",
iterations=[
rule_builder.IterationFactory.build(
iteration_cohorts=[rule_builder.IterationCohortFactory.build(cohort_label="cohort1")],
iteration_rules=[rule_builder.PersonAgeSuppressionRuleFactory.build(comparator="-85")],
)
],
),
rule_builder.CampaignConfigFactory.build(
target="FLU",
iterations=[
rule_builder.IterationFactory.build(
iteration_cohorts=[rule_builder.IterationCohortFactory.build(cohort_label="cohort2")],
iteration_rules=[rule_builder.PersonAgeSuppressionRuleFactory.build(comparator="-85")],
)
],
),
]

calculator = EligibilityCalculator(person_rows, campaign_configs)

# When
actual = calculator.evaluate_eligibility()

# Then
assert_that(
actual,
is_eligibility_status().with_conditions(
has_items(
is_condition().with_condition_name(ConditionName("RSV")).and_status(Status.actionable),
is_condition().with_condition_name(ConditionName("COVID")).and_status(Status.not_actionable),
is_condition().with_condition_name(ConditionName("FLU")).and_status(Status.not_eligible),
)
),
)


@pytest.mark.parametrize(
("test_comment", "campaign1", "campaign2"),
[
Expand Down
Loading