Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/eligibility_signposting_api/model/eligibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class RuleType(StrEnum):

@total_ordering
class Status(Enum):
nothing = auto()
not_eligible = auto()
not_actionable = auto()
actionable = auto()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from wireup import service

from eligibility_signposting_api.model import eligibility, rules
from eligibility_signposting_api.model.eligibility import Reason, Status
from eligibility_signposting_api.services.calculators.rule_calculator import RuleCalculator

Row = Collection[Mapping[str, Any]]
Expand All @@ -28,7 +29,7 @@ class EligibilityCalculator:
person_data: Row
campaign_configs: Collection[rules.CampaignConfig]

results: dict[eligibility.ConditionName, eligibility.Condition] = field(default_factory=dict)
results: list[eligibility.Condition] = field(default_factory=list)

@cached_property
def condition_names(self) -> set[eligibility.ConditionName]:
Expand All @@ -39,105 +40,105 @@ def condition_names(self) -> set[eligibility.ConditionName]:
}

def evaluate_eligibility(self) -> eligibility.EligibilityStatus:
"""Calculate a person's eligibility for vaccination."""
# Group campaign configs by their 'target' attribute and sort each group by 'target'
campaign_configs_grouped_by_condition_name = {
key: sorted(campaign_group, key=attrgetter("target"))
for key, campaign_group in groupby(self.campaign_configs, key=attrgetter("target"))
}

# Iterate over each group of campaign configs
for condition_name, campaign_group in campaign_configs_grouped_by_condition_name.items():
# Skip processing if the condition name is not in the set of valid condition names
if condition_name not in self.condition_names:
continue

# Get all iterations for which the person is base eligible, i.e. those which *might* provide eligibility
# due to cohort membership.
base_eligible_campaigns = self.get_base_eligible_campaigns()
# Get the base eligible campaigns or base ineligibility reasons for the current group
base_eligible_campaigns, status, reasons = (
self.get_the_base_eligible_campaigns_or_base_ineligibility_reasons(campaign_group)
)

# Evaluate iteration rules to see if the person is actionable, not actionable (due to "F" rules),
# or not eligible (due to "S" rules")
evaluations = self.evaluate_for_base_eligible_campaigns(base_eligible_campaigns)
# If there are base eligible campaigns, further evaluate them by iteration rules
if base_eligible_campaigns:
status, reasons = self.evaluate_eligibility_by_iteration_rules(base_eligible_campaigns)

# Add all not base eligible conditions to result set.
self.get_not_base_eligible_conditions(base_eligible_campaigns)
# Add all base eligible conditions to result set.
self.get_base_eligible_conditions(evaluations)
# Append the evaluation result for this condition to the results list
self.results.append(eligibility.Condition(condition_name, status, reasons))

return eligibility.EligibilityStatus(conditions=list(self.results.values()))
# Return the overall eligibility status, constructed from the list of condition results
return eligibility.EligibilityStatus(conditions=list(self.results))

def get_base_eligible_campaigns(self) -> list[rules.CampaignConfig]:
"""Get all campaigns for which the person is base eligible, i.e. those which *might* provide eligibility.
def get_the_base_eligible_campaigns_or_base_ineligibility_reasons(
self, campaign_group: list[rules.CampaignConfig]
) -> tuple[list[rules.CampaignConfig], Status, list[Reason]]:
"""Get all campaigns in the group for which the person is base eligible,
i.e. those which *might* provide eligibility.

Build and return a collection of campaigns for which the person is base eligible (using cohorts).
Also build and return a set of conditions in the campaigns while we are here.
Build and return a collection of campaigns for which the person is base eligible (using cohorts),
Otherwise, build and return the in-eligibility status and reasons
"""
base_eligible_campaigns: list[rules.CampaignConfig] = []

for campaign_config in (cc for cc in self.campaign_configs if cc.campaign_live and cc.current_iteration):
base_eligible = self.evaluate_base_eligibility(campaign_config.current_iteration)
for campaign_config in (cc for cc in campaign_group if cc.campaign_live and cc.current_iteration):
base_eligible = self.check_base_eligibility(campaign_config.current_iteration)
if base_eligible:
base_eligible_campaigns.append(campaign_config)

return base_eligible_campaigns
if base_eligible_campaigns:
return base_eligible_campaigns, Status.nothing, []
return [], eligibility.Status.not_eligible, []

def evaluate_base_eligibility(self, iteration: rules.Iteration | None) -> set[str]:
def check_base_eligibility(self, iteration: rules.Iteration | None) -> set[str]:
"""Return cohorts for which person is base eligible."""
if not iteration:
if not iteration or not iteration.iteration_cohorts:
return set()
# Extract iteration cohorts efficiently
iteration_cohorts: set[str] = {
cohort.cohort_label for cohort in iteration.iteration_cohorts if cohort.cohort_label
}

# Locate person's cohorts safely
cohorts_row: Mapping[str, dict[str, dict[str, dict[str, Any]]]] = next(
(r for r in self.person_data if r.get("ATTRIBUTE_TYPE", "") == "COHORTS"), {}
(r for r in self.person_data if r.get("ATTRIBUTE_TYPE") == "COHORTS"), {}
)
person_cohorts = set(cohorts_row.get("COHORT_MAP", {}).get("cohorts", {}).get("M", {}).keys())

return iteration_cohorts.intersection(person_cohorts)
return iteration_cohorts & person_cohorts

def get_not_base_eligible_conditions(
self,
base_eligible_campaigns: Collection[rules.CampaignConfig],
) -> None:
"""Get conditions where the person is not base eligible,
i.e. is not is the cohort for any campaign iteration."""

# for each condition:
# if the person isn't base eligible for any iteration,
# the person is not (base) eligible for the condition
for condition_name in self.condition_names:
if condition_name not in {eligibility.ConditionName(cc.target) for cc in base_eligible_campaigns}:
self.results[condition_name] = eligibility.Condition(
condition_name=condition_name, status=eligibility.Status.not_eligible, reasons=[]
)

def evaluate_for_base_eligible_campaigns(
self, base_eligible_campaigns: Collection[rules.CampaignConfig]
) -> dict[eligibility.ConditionName, dict[eligibility.Status, list[eligibility.Reason]]]:
def evaluate_eligibility_by_iteration_rules(
self, campaign_group: list[rules.CampaignConfig]
) -> tuple[Status, list[Reason]]:
"""Evaluate iteration rules to see if the person is actionable, not actionable (due to "F" rules),
or not eligible (due to "S" rules").

For each condition, evaluate all iterations for inclusion or exclusion."""

priority_getter = attrgetter("priority")

base_eligible_evaluations: dict[
eligibility.ConditionName, dict[eligibility.Status, list[eligibility.Reason]]
] = defaultdict(dict)
for condition_name, iteration in [
(eligibility.ConditionName(cc.target), cc.current_iteration)
for cc in base_eligible_campaigns
if cc.current_iteration
]:
status_reason_dict: dict[Status, list[Reason]] = defaultdict()

for iteration in [cc.current_iteration for cc in campaign_group if cc.current_iteration]:
# Until we see a worse status, we assume someone is actionable for this iteration.
worst_status_so_far_for_condition = eligibility.Status.actionable
exclusion_reasons, actionable_reasons = [], []
for _priority, iteration_rule_group in groupby(
sorted(iteration.iteration_rules, key=priority_getter), key=priority_getter
):
worst_status_so_far_for_condition, group_actionable_reasons, group_exclusion_reasons = (
self.evaluate_priority_group(iteration_rule_group, worst_status_so_far_for_condition)
)
actionable_reasons.extend(group_actionable_reasons)
exclusion_reasons.extend(group_exclusion_reasons)
condition_entry = base_eligible_evaluations.setdefault(condition_name, {})
condition_status_entry = condition_entry.setdefault(worst_status_so_far_for_condition, [])
(
worst_status_so_far_for_condition,
campaign_group_actionable_reasons,
campaign_group_exclusion_reasons,
) = self.evaluate_priority_group(iteration_rule_group, worst_status_so_far_for_condition)
actionable_reasons.extend(campaign_group_actionable_reasons)
exclusion_reasons.extend(campaign_group_exclusion_reasons)
condition_status_entry = status_reason_dict.setdefault(worst_status_so_far_for_condition, [])
condition_status_entry.extend(
actionable_reasons
if worst_status_so_far_for_condition is eligibility.Status.actionable
else exclusion_reasons
)
return base_eligible_evaluations

best_status = eligibility.Status.best(*list(status_reason_dict.keys()))

return best_status, status_reason_dict[best_status]

def evaluate_priority_group(
self,
Expand Down Expand Up @@ -167,20 +168,3 @@ def evaluate_priority_group(
actionable_reasons,
exclusion_reasons,
)

def get_base_eligible_conditions(
self,
base_eligible_evaluations: Mapping[
eligibility.ConditionName, Mapping[eligibility.Status, list[eligibility.Reason]]
],
) -> None:
"""Get conditions where the person is base eligible, but may be either actionable, not actionable,
or not eligible."""

# for each condition for which the person is base eligible:
# what is the "best" status, i.e. closest to actionable? Add the condition to the result with that status.
for condition_name, reasons_by_status in base_eligible_evaluations.items():
best_status = eligibility.Status.best(*list(reasons_by_status.keys()))
self.results[condition_name] = eligibility.Condition(
condition_name=condition_name, status=best_status, reasons=reasons_by_status[best_status]
)
Loading