44from collections import defaultdict
55from collections .abc import Collection , Iterator , Mapping
66from dataclasses import dataclass , field
7- from functools import cached_property
87from itertools import groupby
98from typing import Any
109
@@ -28,159 +27,120 @@ class EligibilityCalculator:
2827 person_data : Row
2928 campaign_configs : Collection [rules .CampaignConfig ]
3029
31- results : dict [eligibility .ConditionName , eligibility . Condition ] = field (default_factory = dict )
30+ results : list [eligibility .Condition ] = field (default_factory = list )
3231
33- @cached_property
34- def condition_names (self ) -> set [eligibility .ConditionName ]:
35- return {
36- eligibility .ConditionName (cc .target )
37- for cc in self .campaign_configs
38- if cc .campaign_live and cc .current_iteration
39- }
40-
41- def evaluate_eligibility (self ) -> eligibility .EligibilityStatus :
42- """Calculate a person's eligibility for vaccination."""
32+ @property
33+ def active_campaigns (self ) -> list [rules .CampaignConfig ]:
34+ return [cc for cc in self .campaign_configs if cc .campaign_live and cc .current_iteration ]
4335
44- # Get all iterations for which the person is base eligible, i.e. those which *might* provide eligibility
45- # due to cohort membership.
46- base_eligible_campaigns = self .get_base_eligible_campaigns ()
36+ @property
37+ def campaigns_grouped_by_condition_name (
38+ self ,
39+ ) -> Iterator [tuple [eligibility .ConditionName , list [rules .CampaignConfig ]]]:
40+ """Generator function to iterate over campaign groups by condition name."""
4741
48- # Evaluate iteration rules to see if the person is actionable, not actionable (due to "F" rules),
49- # or not eligible (due to "S" rules")
50- evaluations = self .evaluate_for_base_eligible_campaigns (base_eligible_campaigns )
42+ for condition_name , campaign_group in groupby (
43+ sorted (self .active_campaigns , key = attrgetter ("target" )), key = attrgetter ("target" )
44+ ):
45+ yield condition_name , list (campaign_group )
5146
52- # Add all not base eligible conditions to result set.
53- self .get_not_base_eligible_conditions (base_eligible_campaigns )
54- # Add all base eligible conditions to result set.
55- self .get_base_eligible_conditions (evaluations )
47+ def evaluate_eligibility (self ) -> eligibility .EligibilityStatus :
48+ """Iterates over campaign groups, evaluates eligibility, and returns a consolidated status."""
5649
57- return eligibility .EligibilityStatus (conditions = list (self .results .values ()))
50+ for condition_name , campaign_group in self .campaigns_grouped_by_condition_name :
51+ if base_eligible_campaigns := self .get_the_base_eligible_campaigns (campaign_group ):
52+ status , reasons = self .evaluate_eligibility_by_iteration_rules (base_eligible_campaigns )
53+ # Append the evaluation result for this condition to the results list
54+ self .results .append (eligibility .Condition (condition_name , status , reasons ))
55+ else :
56+ # Create and append the evaluation result, as no campaign config is base eligible
57+ self .results .append (eligibility .Condition (condition_name , eligibility .Status .not_eligible , []))
5858
59- def get_base_eligible_campaigns ( self ) -> list [ rules . CampaignConfig ]:
60- """Get all campaigns for which the person is base eligible, i.e. those which *might* provide eligibility.
59+ # Return the overall eligibility status, constructed from the list of condition results
60+ return eligibility .EligibilityStatus ( conditions = list ( self . results ))
6161
62- Build and return a collection of campaigns for which the person is base eligible (using cohorts).
63- Also build and return a set of conditions in the campaigns while we are here.
64- """
65- base_eligible_campaigns : list [rules .CampaignConfig ] = []
62+ def get_the_base_eligible_campaigns (self , campaign_group : list [rules .CampaignConfig ]) -> list [rules .CampaignConfig ]:
63+ """Return campaigns for which the person is base eligible via cohorts."""
6664
67- for campaign_config in (cc for cc in self .campaign_configs if cc .campaign_live and cc .current_iteration ):
68- base_eligible = self .evaluate_base_eligibility (campaign_config .current_iteration )
69- if base_eligible :
70- base_eligible_campaigns .append (campaign_config )
65+ base_eligible_campaigns : list [rules .CampaignConfig ] = [
66+ campaign
67+ for campaign in campaign_group
68+ if campaign .campaign_live
69+ and campaign .current_iteration
70+ and self .check_base_eligibility (campaign .current_iteration )
71+ ]
7172
72- return base_eligible_campaigns
73+ if base_eligible_campaigns :
74+ return base_eligible_campaigns
75+ return []
7376
74- def evaluate_base_eligibility (self , iteration : rules .Iteration | None ) -> set [str ]:
77+ def check_base_eligibility (self , iteration : rules .Iteration | None ) -> set [str ]:
7578 """Return cohorts for which person is base eligible."""
79+
7680 if not iteration :
7781 return set ()
7882 iteration_cohorts : set [str ] = {
7983 cohort .cohort_label for cohort in iteration .iteration_cohorts if cohort .cohort_label
8084 }
81-
8285 cohorts_row : Mapping [str , dict [str , dict [str , dict [str , Any ]]]] = next (
83- (r for r in self .person_data if r .get ("ATTRIBUTE_TYPE" , " " ) == "COHORTS" ), {}
86+ (row for row in self .person_data if row .get ("ATTRIBUTE_TYPE" ) == "COHORTS" ), {}
8487 )
85- person_cohorts = set (cohorts_row .get ("COHORT_MAP" , {}).get ("cohorts" , {}).get ("M" , {}).keys ())
88+ person_cohorts : set [str ] = set (cohorts_row .get ("COHORT_MAP" , {}).get ("cohorts" , {}).get ("M" , {}).keys ())
89+ return iteration_cohorts & person_cohorts
8690
87- return iteration_cohorts .intersection (person_cohorts )
88-
89- def get_not_base_eligible_conditions (
90- self ,
91- base_eligible_campaigns : Collection [rules .CampaignConfig ],
92- ) -> None :
93- """Get conditions where the person is not base eligible,
94- i.e. is not is the cohort for any campaign iteration."""
95-
96- # for each condition:
97- # if the person isn't base eligible for any iteration,
98- # the person is not (base) eligible for the condition
99- for condition_name in self .condition_names :
100- if condition_name not in {eligibility .ConditionName (cc .target ) for cc in base_eligible_campaigns }:
101- self .results [condition_name ] = eligibility .Condition (
102- condition_name = condition_name , status = eligibility .Status .not_eligible , reasons = []
103- )
104-
105- def evaluate_for_base_eligible_campaigns (
106- self , base_eligible_campaigns : Collection [rules .CampaignConfig ]
107- ) -> dict [eligibility .ConditionName , dict [eligibility .Status , list [eligibility .Reason ]]]:
91+ def evaluate_eligibility_by_iteration_rules (
92+ self , campaign_group : list [rules .CampaignConfig ]
93+ ) -> tuple [eligibility .Status , list [eligibility .Reason ]]:
10894 """Evaluate iteration rules to see if the person is actionable, not actionable (due to "F" rules),
10995 or not eligible (due to "S" rules").
11096
11197 For each condition, evaluate all iterations for inclusion or exclusion."""
98+
11299 priority_getter = attrgetter ("priority" )
113100
114- base_eligible_evaluations : dict [
115- eligibility .ConditionName , dict [eligibility .Status , list [eligibility .Reason ]]
116- ] = defaultdict (dict )
117- for condition_name , iteration in [
118- (eligibility .ConditionName (cc .target ), cc .current_iteration )
119- for cc in base_eligible_campaigns
120- if cc .current_iteration
121- ]:
101+ status_with_reasons : dict [eligibility .Status , list [eligibility .Reason ]] = defaultdict ()
102+
103+ for iteration in [cc .current_iteration for cc in campaign_group if cc .current_iteration ]:
122104 # Until we see a worse status, we assume someone is actionable for this iteration.
123- worst_status_so_far_for_condition = eligibility .Status .actionable
105+ worst_status = eligibility .Status .actionable
124106 exclusion_reasons , actionable_reasons = [], []
125- for _priority , iteration_rule_group in groupby (
126- sorted (iteration .iteration_rules , key = priority_getter ), key = priority_getter
127- ):
128- worst_status_so_far_for_condition , group_actionable_reasons , group_exclusion_reasons = (
129- self .evaluate_priority_group (iteration_rule_group , worst_status_so_far_for_condition )
130- )
131- actionable_reasons .extend (group_actionable_reasons )
132- exclusion_reasons .extend (group_exclusion_reasons )
133- condition_entry = base_eligible_evaluations .setdefault (condition_name , {})
134- condition_status_entry = condition_entry .setdefault (worst_status_so_far_for_condition , [])
107+ by_priority = sorted (iteration .iteration_rules , key = priority_getter )
108+ for _ , rule_group in groupby (by_priority , key = priority_getter ):
109+ status , group_actionable , group_exclusions = self .evaluate_priority_group (rule_group , worst_status )
110+ # Merge results
111+ worst_status = status
112+ actionable_reasons .extend (group_actionable )
113+ exclusion_reasons .extend (group_exclusions )
114+ condition_status_entry = status_with_reasons .setdefault (worst_status , [])
135115 condition_status_entry .extend (
136- actionable_reasons
137- if worst_status_so_far_for_condition is eligibility .Status .actionable
138- else exclusion_reasons
116+ actionable_reasons if worst_status is eligibility .Status .actionable else exclusion_reasons
139117 )
140- return base_eligible_evaluations
118+
119+ best_status = eligibility .Status .best (* list (status_with_reasons .keys ()))
120+
121+ return best_status , status_with_reasons [best_status ]
141122
142123 def evaluate_priority_group (
143124 self ,
144125 iteration_rule_group : Iterator [rules .IterationRule ],
145126 worst_status_so_far_for_condition : eligibility .Status ,
146127 ) -> tuple [eligibility .Status , list [eligibility .Reason ], list [eligibility .Reason ]]:
147- actionable_reasons , exclusion_reasons = [], []
128+ exclusion_reasons , actionable_reasons = [], []
148129 exclude_capable_rules = [
149130 ir for ir in iteration_rule_group if ir .type in (rules .RuleType .filter , rules .RuleType .suppression )
150131 ]
151- best_status_so_far_for_priority_group = (
152- eligibility .Status .not_eligible if exclude_capable_rules else eligibility .Status .actionable
153- )
154- for iteration_rule in exclude_capable_rules :
155- rule_calculator = RuleCalculator (person_data = self .person_data , rule = iteration_rule )
132+
133+ best_status = eligibility .Status .not_eligible if exclude_capable_rules else eligibility .Status .actionable
134+
135+ for rule in exclude_capable_rules :
136+ rule_calculator = RuleCalculator (person_data = self .person_data , rule = rule )
156137 status , reason = rule_calculator .evaluate_exclusion ()
157138 if status .is_exclusion :
158- best_status_so_far_for_priority_group = eligibility .Status .best (
159- status , best_status_so_far_for_priority_group
160- )
139+ best_status = eligibility .Status .best (status , best_status )
161140 exclusion_reasons .append (reason )
162141 else :
163- best_status_so_far_for_priority_group = eligibility .Status .actionable
142+ best_status = eligibility .Status .actionable
164143 actionable_reasons .append (reason )
165- return (
166- eligibility .Status .worst (best_status_so_far_for_priority_group , worst_status_so_far_for_condition ),
167- actionable_reasons ,
168- exclusion_reasons ,
169- )
170144
171- def get_base_eligible_conditions (
172- self ,
173- base_eligible_evaluations : Mapping [
174- eligibility .ConditionName , Mapping [eligibility .Status , list [eligibility .Reason ]]
175- ],
176- ) -> None :
177- """Get conditions where the person is base eligible, but may be either actionable, not actionable,
178- or not eligible."""
179-
180- # for each condition for which the person is base eligible:
181- # what is the "best" status, i.e. closest to actionable? Add the condition to the result with that status.
182- for condition_name , reasons_by_status in base_eligible_evaluations .items ():
183- best_status = eligibility .Status .best (* list (reasons_by_status .keys ()))
184- self .results [condition_name ] = eligibility .Condition (
185- condition_name = condition_name , status = best_status , reasons = reasons_by_status [best_status ]
186- )
145+ worst_group_status = eligibility .Status .worst (best_status , worst_status_so_far_for_condition )
146+ return worst_group_status , actionable_reasons , exclusion_reasons
0 commit comments