diff --git a/src/eligibility_signposting_api/services/calculators/eligibility_calculator.py b/src/eligibility_signposting_api/services/calculators/eligibility_calculator.py index f8f58aea..9a86e2ef 100644 --- a/src/eligibility_signposting_api/services/calculators/eligibility_calculator.py +++ b/src/eligibility_signposting_api/services/calculators/eligibility_calculator.py @@ -4,7 +4,6 @@ from collections import defaultdict from collections.abc import Collection, Iterator, Mapping from dataclasses import dataclass, field -from functools import cached_property from itertools import groupby from typing import Any @@ -28,159 +27,120 @@ class EligibilityCalculator: person_data: Row campaign_configs: Collection[rules.CampaignConfig] - results: dict[eligibility.ConditionName, eligibility.Condition] = field(default_factory=dict) + results: list[eligibility.Condition] = field(default_factory=list) - @cached_property - def condition_names(self) -> set[eligibility.ConditionName]: - return { - eligibility.ConditionName(cc.target) - for cc in self.campaign_configs - if cc.campaign_live and cc.current_iteration - } - - def evaluate_eligibility(self) -> eligibility.EligibilityStatus: - """Calculate a person's eligibility for vaccination.""" + @property + def active_campaigns(self) -> list[rules.CampaignConfig]: + return [cc for cc in self.campaign_configs if cc.campaign_live and cc.current_iteration] - # Get all iterations for which the person is base eligible, i.e. those which *might* provide eligibility - # due to cohort membership. - base_eligible_campaigns = self.get_base_eligible_campaigns() + @property + def campaigns_grouped_by_condition_name( + self, + ) -> Iterator[tuple[eligibility.ConditionName, list[rules.CampaignConfig]]]: + """Generator function to iterate over campaign groups by condition name.""" - # Evaluate iteration rules to see if the person is actionable, not actionable (due to "F" rules), - # or not eligible (due to "S" rules") - evaluations = self.evaluate_for_base_eligible_campaigns(base_eligible_campaigns) + for condition_name, campaign_group in groupby( + sorted(self.active_campaigns, key=attrgetter("target")), key=attrgetter("target") + ): + yield condition_name, list(campaign_group) - # Add all not base eligible conditions to result set. - self.get_not_base_eligible_conditions(base_eligible_campaigns) - # Add all base eligible conditions to result set. - self.get_base_eligible_conditions(evaluations) + def evaluate_eligibility(self) -> eligibility.EligibilityStatus: + """Iterates over campaign groups, evaluates eligibility, and returns a consolidated status.""" - return eligibility.EligibilityStatus(conditions=list(self.results.values())) + for condition_name, campaign_group in self.campaigns_grouped_by_condition_name: + if base_eligible_campaigns := self.get_the_base_eligible_campaigns(campaign_group): + status, reasons = self.evaluate_eligibility_by_iteration_rules(base_eligible_campaigns) + # Append the evaluation result for this condition to the results list + self.results.append(eligibility.Condition(condition_name, status, reasons)) + else: + # Create and append the evaluation result, as no campaign config is base eligible + self.results.append(eligibility.Condition(condition_name, eligibility.Status.not_eligible, [])) - def get_base_eligible_campaigns(self) -> list[rules.CampaignConfig]: - """Get all campaigns for which the person is base eligible, i.e. those which *might* provide eligibility. + # Return the overall eligibility status, constructed from the list of condition results + return eligibility.EligibilityStatus(conditions=list(self.results)) - Build and return a collection of campaigns for which the person is base eligible (using cohorts). - Also build and return a set of conditions in the campaigns while we are here. - """ - base_eligible_campaigns: list[rules.CampaignConfig] = [] + def get_the_base_eligible_campaigns(self, campaign_group: list[rules.CampaignConfig]) -> list[rules.CampaignConfig]: + """Return campaigns for which the person is base eligible via cohorts.""" - for campaign_config in (cc for cc in self.campaign_configs if cc.campaign_live and cc.current_iteration): - base_eligible = self.evaluate_base_eligibility(campaign_config.current_iteration) - if base_eligible: - base_eligible_campaigns.append(campaign_config) + base_eligible_campaigns: list[rules.CampaignConfig] = [ + campaign + for campaign in campaign_group + if campaign.campaign_live + and campaign.current_iteration + and self.check_base_eligibility(campaign.current_iteration) + ] - return base_eligible_campaigns + if base_eligible_campaigns: + return base_eligible_campaigns + return [] - def evaluate_base_eligibility(self, iteration: rules.Iteration | None) -> set[str]: + def check_base_eligibility(self, iteration: rules.Iteration | None) -> set[str]: """Return cohorts for which person is base eligible.""" + if not iteration: return set() iteration_cohorts: set[str] = { cohort.cohort_label for cohort in iteration.iteration_cohorts if cohort.cohort_label } - cohorts_row: Mapping[str, dict[str, dict[str, dict[str, Any]]]] = next( - (r for r in self.person_data if r.get("ATTRIBUTE_TYPE", "") == "COHORTS"), {} + (row for row in self.person_data if row.get("ATTRIBUTE_TYPE") == "COHORTS"), {} ) - person_cohorts = set(cohorts_row.get("COHORT_MAP", {}).get("cohorts", {}).get("M", {}).keys()) + person_cohorts: set[str] = set(cohorts_row.get("COHORT_MAP", {}).get("cohorts", {}).get("M", {}).keys()) + return iteration_cohorts & person_cohorts - return iteration_cohorts.intersection(person_cohorts) - - def get_not_base_eligible_conditions( - self, - base_eligible_campaigns: Collection[rules.CampaignConfig], - ) -> None: - """Get conditions where the person is not base eligible, - i.e. is not is the cohort for any campaign iteration.""" - - # for each condition: - # if the person isn't base eligible for any iteration, - # the person is not (base) eligible for the condition - for condition_name in self.condition_names: - if condition_name not in {eligibility.ConditionName(cc.target) for cc in base_eligible_campaigns}: - self.results[condition_name] = eligibility.Condition( - condition_name=condition_name, status=eligibility.Status.not_eligible, reasons=[] - ) - - def evaluate_for_base_eligible_campaigns( - self, base_eligible_campaigns: Collection[rules.CampaignConfig] - ) -> dict[eligibility.ConditionName, dict[eligibility.Status, list[eligibility.Reason]]]: + def evaluate_eligibility_by_iteration_rules( + self, campaign_group: list[rules.CampaignConfig] + ) -> tuple[eligibility.Status, list[eligibility.Reason]]: """Evaluate iteration rules to see if the person is actionable, not actionable (due to "F" rules), or not eligible (due to "S" rules"). For each condition, evaluate all iterations for inclusion or exclusion.""" + priority_getter = attrgetter("priority") - base_eligible_evaluations: dict[ - eligibility.ConditionName, dict[eligibility.Status, list[eligibility.Reason]] - ] = defaultdict(dict) - for condition_name, iteration in [ - (eligibility.ConditionName(cc.target), cc.current_iteration) - for cc in base_eligible_campaigns - if cc.current_iteration - ]: + status_with_reasons: dict[eligibility.Status, list[eligibility.Reason]] = defaultdict() + + for iteration in [cc.current_iteration for cc in campaign_group if cc.current_iteration]: # Until we see a worse status, we assume someone is actionable for this iteration. - worst_status_so_far_for_condition = eligibility.Status.actionable + worst_status = eligibility.Status.actionable exclusion_reasons, actionable_reasons = [], [] - for _priority, iteration_rule_group in groupby( - sorted(iteration.iteration_rules, key=priority_getter), key=priority_getter - ): - worst_status_so_far_for_condition, group_actionable_reasons, group_exclusion_reasons = ( - self.evaluate_priority_group(iteration_rule_group, worst_status_so_far_for_condition) - ) - actionable_reasons.extend(group_actionable_reasons) - exclusion_reasons.extend(group_exclusion_reasons) - condition_entry = base_eligible_evaluations.setdefault(condition_name, {}) - condition_status_entry = condition_entry.setdefault(worst_status_so_far_for_condition, []) + by_priority = sorted(iteration.iteration_rules, key=priority_getter) + for _, rule_group in groupby(by_priority, key=priority_getter): + status, group_actionable, group_exclusions = self.evaluate_priority_group(rule_group, worst_status) + # Merge results + worst_status = status + actionable_reasons.extend(group_actionable) + exclusion_reasons.extend(group_exclusions) + condition_status_entry = status_with_reasons.setdefault(worst_status, []) condition_status_entry.extend( - actionable_reasons - if worst_status_so_far_for_condition is eligibility.Status.actionable - else exclusion_reasons + actionable_reasons if worst_status is eligibility.Status.actionable else exclusion_reasons ) - return base_eligible_evaluations + + best_status = eligibility.Status.best(*list(status_with_reasons.keys())) + + return best_status, status_with_reasons[best_status] def evaluate_priority_group( self, iteration_rule_group: Iterator[rules.IterationRule], worst_status_so_far_for_condition: eligibility.Status, ) -> tuple[eligibility.Status, list[eligibility.Reason], list[eligibility.Reason]]: - actionable_reasons, exclusion_reasons = [], [] + exclusion_reasons, actionable_reasons = [], [] exclude_capable_rules = [ ir for ir in iteration_rule_group if ir.type in (rules.RuleType.filter, rules.RuleType.suppression) ] - best_status_so_far_for_priority_group = ( - eligibility.Status.not_eligible if exclude_capable_rules else eligibility.Status.actionable - ) - for iteration_rule in exclude_capable_rules: - rule_calculator = RuleCalculator(person_data=self.person_data, rule=iteration_rule) + + best_status = eligibility.Status.not_eligible if exclude_capable_rules else eligibility.Status.actionable + + for rule in exclude_capable_rules: + rule_calculator = RuleCalculator(person_data=self.person_data, rule=rule) status, reason = rule_calculator.evaluate_exclusion() if status.is_exclusion: - best_status_so_far_for_priority_group = eligibility.Status.best( - status, best_status_so_far_for_priority_group - ) + best_status = eligibility.Status.best(status, best_status) exclusion_reasons.append(reason) else: - best_status_so_far_for_priority_group = eligibility.Status.actionable + best_status = eligibility.Status.actionable actionable_reasons.append(reason) - return ( - eligibility.Status.worst(best_status_so_far_for_priority_group, worst_status_so_far_for_condition), - actionable_reasons, - exclusion_reasons, - ) - def get_base_eligible_conditions( - self, - base_eligible_evaluations: Mapping[ - eligibility.ConditionName, Mapping[eligibility.Status, list[eligibility.Reason]] - ], - ) -> None: - """Get conditions where the person is base eligible, but may be either actionable, not actionable, - or not eligible.""" - - # for each condition for which the person is base eligible: - # what is the "best" status, i.e. closest to actionable? Add the condition to the result with that status. - for condition_name, reasons_by_status in base_eligible_evaluations.items(): - best_status = eligibility.Status.best(*list(reasons_by_status.keys())) - self.results[condition_name] = eligibility.Condition( - condition_name=condition_name, status=best_status, reasons=reasons_by_status[best_status] - ) + worst_group_status = eligibility.Status.worst(best_status, worst_status_so_far_for_condition) + return worst_group_status, actionable_reasons, exclusion_reasons diff --git a/tests/unit/services/calculators/test_eligibility_calculator.py b/tests/unit/services/calculators/test_eligibility_calculator.py index 627de8c7..cf831366 100644 --- a/tests/unit/services/calculators/test_eligibility_calculator.py +++ b/tests/unit/services/calculators/test_eligibility_calculator.py @@ -402,7 +402,7 @@ def test_rules_with_same_priority_must_all_match_to_exclude( ) -def test_multiple_conditions(faker: Faker): +def test_multiple_conditions_where_both_are_actionable(faker: Faker): # Given nhs_number = NHSNumber(f"5{faker.random_int(max=999999999):09d}") date_of_birth = DateOfBirth(faker.date_of_birth(minimum_age=76, maximum_age=78)) @@ -446,6 +446,60 @@ def test_multiple_conditions(faker: Faker): ) +def test_multiple_conditions_where_all_give_unique_statuses(faker: Faker): + # Given + nhs_number = NHSNumber(f"5{faker.random_int(max=999999999):09d}") + date_of_birth = DateOfBirth(faker.date_of_birth(minimum_age=76, maximum_age=78)) + + person_rows = person_rows_builder(nhs_number, date_of_birth=date_of_birth, cohorts=["cohort1"]) + campaign_configs = [ + rule_builder.CampaignConfigFactory.build( + target="RSV", + iterations=[ + rule_builder.IterationFactory.build( + iteration_cohorts=[rule_builder.IterationCohortFactory.build(cohort_label="cohort1")], + iteration_rules=[rule_builder.PersonAgeSuppressionRuleFactory.build()], + ) + ], + ), + rule_builder.CampaignConfigFactory.build( + target="COVID", + iterations=[ + rule_builder.IterationFactory.build( + iteration_cohorts=[rule_builder.IterationCohortFactory.build(cohort_label="cohort1")], + iteration_rules=[rule_builder.PersonAgeSuppressionRuleFactory.build(comparator="-85")], + ) + ], + ), + rule_builder.CampaignConfigFactory.build( + target="FLU", + iterations=[ + rule_builder.IterationFactory.build( + iteration_cohorts=[rule_builder.IterationCohortFactory.build(cohort_label="cohort2")], + iteration_rules=[rule_builder.PersonAgeSuppressionRuleFactory.build(comparator="-85")], + ) + ], + ), + ] + + calculator = EligibilityCalculator(person_rows, campaign_configs) + + # When + actual = calculator.evaluate_eligibility() + + # Then + assert_that( + actual, + is_eligibility_status().with_conditions( + has_items( + is_condition().with_condition_name(ConditionName("RSV")).and_status(Status.actionable), + is_condition().with_condition_name(ConditionName("COVID")).and_status(Status.not_actionable), + is_condition().with_condition_name(ConditionName("FLU")).and_status(Status.not_eligible), + ) + ), + ) + + @pytest.mark.parametrize( ("test_comment", "campaign1", "campaign2"), [