11from __future__ import annotations
22
33from _operator import attrgetter
4- from collections import defaultdict
5- from collections .abc import Collection , Iterator , Mapping
4+ from collections .abc import Collection , Iterable , Iterator , Mapping
65from dataclasses import dataclass , field
7- from functools import cached_property
86from itertools import groupby
9- from typing import Any
7+ from typing import TYPE_CHECKING , Any
8+
9+ if TYPE_CHECKING :
10+ from eligibility_signposting_api .model .rules import Iteration , IterationCohort
1011
1112from wireup import service
1213
1314from eligibility_signposting_api .model import eligibility , rules
15+ from eligibility_signposting_api .model .eligibility import (
16+ CohortResult ,
17+ Condition ,
18+ ConditionName ,
19+ IterationResult ,
20+ Status ,
21+ )
1422from eligibility_signposting_api .services .calculators .rule_calculator import RuleCalculator
1523
1624Row = Collection [Mapping [str , Any ]]
@@ -40,107 +48,180 @@ def campaigns_grouped_by_condition_name(
4048 self ,
4149 ) -> Iterator [tuple [eligibility .ConditionName , list [rules .CampaignConfig ]]]:
4250 """Generator function to iterate over campaign groups by condition name."""
43-
4451 for condition_name , campaign_group in groupby (
4552 sorted (self .active_campaigns , key = attrgetter ("target" )), key = attrgetter ("target" )
4653 ):
4754 yield condition_name , list (campaign_group )
4855
49- @cached_property
56+ @property
5057 def person_cohorts (self ) -> set [str ]:
5158 cohorts_row : Mapping [str , dict [str , dict [str , dict [str , Any ]]]] = next (
5259 (row for row in self .person_data if row .get ("ATTRIBUTE_TYPE" ) == "COHORTS" ), {}
5360 )
5461 return set (cohorts_row .get ("COHORT_MAP" , {}).get ("cohorts" , {}).get ("M" , {}).keys ())
5562
63+ @staticmethod
64+ def get_best_cohort (cohort_results : dict [str , CohortResult ]) -> tuple [Status , list [CohortResult ]]:
65+ if not cohort_results :
66+ return eligibility .Status .not_eligible , []
67+ best_status = eligibility .Status .best (* [result .status for result in cohort_results .values ()])
68+ best_cohorts = [result for result in cohort_results .values () if result .status == best_status ]
69+ return best_status , best_cohorts
70+
71+ @staticmethod
72+ def get_exclusion_rules (
73+ cohort : IterationCohort , rules_filter : Iterable [rules .IterationRule ]
74+ ) -> Iterator [rules .IterationRule ]:
75+ return (
76+ ir
77+ for ir in rules_filter
78+ if ir .cohort_label is None
79+ or cohort .cohort_label == ir .cohort_label
80+ or (isinstance (ir .cohort_label , (list , set , tuple )) and cohort .cohort_label in ir .cohort_label )
81+ )
82+
83+ @staticmethod
84+ def get_rules_by_type (
85+ active_iteration : Iteration ,
86+ ) -> tuple [tuple [rules .IterationRule , ...], tuple [rules .IterationRule , ...]]:
87+ rules_by_type = {
88+ rule_type : tuple (rule for rule in active_iteration .iteration_rules if attrgetter ("type" )(rule ) == rule_type )
89+ for rule_type in (rules .RuleType .filter , rules .RuleType .suppression , rules .RuleType .redirect )
90+ }
91+ rules_filter = rules_by_type [rules .RuleType .filter ]
92+ rules_suppression = rules_by_type [rules .RuleType .suppression ]
93+ return rules_filter , rules_suppression
94+
5695 def evaluate_eligibility (self ) -> eligibility .EligibilityStatus :
5796 """Iterates over campaign groups, evaluates eligibility, and returns a consolidated status."""
97+ results : dict [ConditionName , IterationResult ] = {}
5898
5999 for condition_name , campaign_group in self .campaigns_grouped_by_condition_name :
60- if base_eligible_campaigns := self .get_the_base_eligible_campaigns (campaign_group ):
61- status , reasons = self .evaluate_eligibility_by_iteration_rules (base_eligible_campaigns )
62- # Append the evaluation result for this condition to the results list
63- self .results .append (eligibility .Condition (condition_name , status , reasons ))
100+ iteration_results : dict [str , IterationResult ] = {}
101+
102+ for active_iteration in [cc .current_iteration for cc in campaign_group ]:
103+ cohort_results : dict [str , CohortResult ] = {}
104+
105+ rules_filter , rules_suppression = self .get_rules_by_type (active_iteration )
106+ for cohort in sorted (active_iteration .iteration_cohorts , key = attrgetter ("priority" )):
107+ # Check Base Eligibility
108+ if cohort .cohort_label in self .person_cohorts or cohort .cohort_label == magic_cohort :
109+ is_eligible : bool = True
110+ is_eligible = self .evaluate_filter_rules (
111+ cohort ,
112+ cohort_results ,
113+ rules_filter ,
114+ is_eligible = is_eligible ,
115+ )
116+
117+ if is_eligible :
118+ is_actionable : bool = True
119+ suppression_reasons , is_actionable = self .evaluate_suppression_rules (
120+ cohort ,
121+ rules_suppression ,
122+ is_actionable = is_actionable ,
123+ )
124+ if cohort .cohort_label is not None :
125+ key = cohort .cohort_label
126+ if is_actionable :
127+ cohort_results [key ] = CohortResult (
128+ cohort .cohort_group if cohort .cohort_group else key ,
129+ Status .actionable ,
130+ [],
131+ str (cohort .positive_description ),
132+ )
133+ else :
134+ cohort_results [key ] = CohortResult (
135+ cohort .cohort_group if cohort .cohort_group else key ,
136+ Status .not_actionable ,
137+ suppression_reasons ,
138+ str (cohort .positive_description ),
139+ )
140+
141+ # Not base eligible
142+ elif cohort .cohort_label is not None :
143+ cohort_results [cohort .cohort_label ] = CohortResult (
144+ cohort .cohort_group if cohort .cohort_group else cohort .cohort_label ,
145+ Status .not_eligible ,
146+ [],
147+ str (cohort .negative_description ),
148+ )
149+
150+ # Determine Result between cohorts - get the best
151+ status , best_cohorts = self .get_best_cohort (cohort_results )
152+ iteration_results [active_iteration .name ] = IterationResult (status , best_cohorts )
153+
154+ # Determine results between iterations - get the best
155+ if iteration_results :
156+ best_candidate = max (iteration_results .values (), key = lambda r : r .status .value )
64157 else :
65- # Create and append the evaluation result, as no campaign config is base eligible
66- self . results . append ( eligibility . Condition ( condition_name , eligibility . Status . not_eligible , []))
67-
68- # Return the overall eligibility status, constructed from the list of condition results
69- return eligibility . EligibilityStatus ( conditions = list ( self . results ))
70-
71- def get_the_base_eligible_campaigns ( self , campaign_group : list [ rules . CampaignConfig ]) -> list [ rules . CampaignConfig ]:
72- """Return campaigns for which the person is base eligible via cohorts."""
73-
74- base_eligible_campaigns : list [ rules . CampaignConfig ] = [
75- campaign for campaign in campaign_group if self . check_base_eligibility ( campaign . current_iteration )
158+ best_candidate = IterationResult ( eligibility . Status . not_eligible , [])
159+ results [ condition_name ] = best_candidate
160+
161+ # Consolidate all the results and return
162+ final_result = [
163+ Condition (
164+ condition_name = condition_name ,
165+ status = active_iteration_result . status ,
166+ cohort_results = active_iteration_result . cohort_results ,
167+ )
168+ for condition_name , active_iteration_result in results . items ( )
76169 ]
170+ return eligibility .EligibilityStatus (conditions = final_result )
77171
78- if base_eligible_campaigns :
79- return base_eligible_campaigns
80- return []
81-
82- def check_base_eligibility (self , iteration : rules .Iteration ) -> bool :
83- """Return cohorts for which person is base eligible."""
84- iteration_cohorts : set [str ] = {
85- cohort .cohort_label for cohort in iteration .iteration_cohorts if cohort .cohort_label
86- }
87- if magic_cohort in iteration_cohorts :
88- return True
89- return bool (iteration_cohorts & self .person_cohorts )
90-
91- def evaluate_eligibility_by_iteration_rules (
92- self , campaign_group : list [rules .CampaignConfig ]
93- ) -> tuple [eligibility .Status , list [eligibility .Reason ]]:
94- """Evaluate iteration rules to see if the person is actionable, not actionable (due to "S" rules),
95- or not eligible (due to "F" rules").
96-
97- For each condition, evaluate all iterations for inclusion or exclusion."""
98-
172+ def evaluate_filter_rules (
173+ self ,
174+ cohort : IterationCohort ,
175+ cohort_results : dict [str , CohortResult ],
176+ rules_filter : Iterable [rules .IterationRule ],
177+ * ,
178+ is_eligible : bool ,
179+ ) -> bool :
99180 priority_getter = attrgetter ("priority" )
181+ sorted_rules_by_priority = sorted (self .get_exclusion_rules (cohort , rules_filter ), key = priority_getter )
100182
101- status_with_reasons : dict [eligibility .Status , list [eligibility .Reason ]] = defaultdict ()
102-
103- for iteration in [cc .current_iteration for cc in campaign_group ]:
104- # Until we see a worse status, we assume someone is actionable for this iteration.
105- worst_status = eligibility .Status .actionable
106- exclusion_reasons , actionable_reasons = [], []
107- by_priority = sorted (iteration .iteration_rules , key = priority_getter )
108- for _ , rule_group in groupby (by_priority , key = priority_getter ):
109- status , group_actionable , group_exclusions , is_rule_stop = self .evaluate_priority_group (
110- rule_group , worst_status
111- )
112- # Merge results
113- worst_status = status
114- actionable_reasons .extend (group_actionable )
115- exclusion_reasons .extend (group_exclusions )
116- if is_rule_stop :
183+ for _ , rule_group in groupby (sorted_rules_by_priority , key = priority_getter ):
184+ status , group_actionable , group_exclusions , rule_stop = self .evaluate_rules_priority_group (rule_group )
185+ if status .is_exclusion :
186+ if cohort .cohort_label is not None :
187+ cohort_results [str (cohort .cohort_label )] = CohortResult (
188+ cohort .cohort_group if cohort .cohort_group else cohort .cohort_label ,
189+ Status .not_eligible ,
190+ [],
191+ str (cohort .negative_description ),
192+ )
193+ is_eligible = False
194+ break
195+ return is_eligible
196+
197+ def evaluate_suppression_rules (
198+ self ,
199+ cohort : IterationCohort ,
200+ rules_suppression : Iterable [rules .IterationRule ],
201+ * ,
202+ is_actionable : bool ,
203+ ) -> tuple [list , bool ]:
204+ priority_getter = attrgetter ("priority" )
205+ suppression_reasons = []
206+ sorted_rules_by_priority = sorted (self .get_exclusion_rules (cohort , rules_suppression ), key = priority_getter )
207+ for _ , rule_group in groupby (sorted_rules_by_priority , key = priority_getter ):
208+ status , group_actionable , group_exclusions , rule_stop = self .evaluate_rules_priority_group (rule_group )
209+ if status .is_exclusion :
210+ is_actionable = False
211+ suppression_reasons .extend (group_exclusions )
212+ if rule_stop :
117213 break
118- condition_status_entry = status_with_reasons .setdefault (worst_status , [])
119- condition_status_entry .extend (
120- actionable_reasons if worst_status is eligibility .Status .actionable else exclusion_reasons
121- )
122-
123- best_status = eligibility .Status .best (* list (status_with_reasons .keys ()))
124-
125- return best_status , status_with_reasons [best_status ]
214+ return suppression_reasons , is_actionable
126215
127- def evaluate_priority_group (
128- self ,
129- iteration_rule_group : Iterator [rules .IterationRule ],
130- worst_status_so_far_for_condition : eligibility .Status ,
216+ def evaluate_rules_priority_group (
217+ self , rules_group : Iterator [rules .IterationRule ]
131218 ) -> tuple [eligibility .Status , list [eligibility .Reason ], list [eligibility .Reason ], bool ]:
132219 is_rule_stop = False
133- exclusion_reasons , actionable_reasons = [], []
134- exclude_capable_rules = [
135- ir
136- for ir in iteration_rule_group
137- if ir .type in (rules .RuleType .filter , rules .RuleType .suppression )
138- and (ir .cohort_label is None or (ir .cohort_label in self .person_cohorts ))
139- ]
140-
141- best_status = eligibility .Status .not_eligible if exclude_capable_rules else eligibility .Status .actionable
220+ actionable_reasons , exclusion_reasons = [], []
221+ best_status = eligibility .Status .not_eligible
142222
143- for rule in exclude_capable_rules :
223+ for rule in rules_group :
224+ is_rule_stop = rule .rule_stop or is_rule_stop
144225 rule_calculator = RuleCalculator (person_data = self .person_data , rule = rule )
145226 status , reason = rule_calculator .evaluate_exclusion ()
146227 if status .is_exclusion :
@@ -150,7 +231,4 @@ def evaluate_priority_group(
150231 best_status = eligibility .Status .actionable
151232 actionable_reasons .append (reason )
152233
153- worst_group_status = eligibility .Status .worst (best_status , worst_status_so_far_for_condition )
154- if worst_group_status .is_exclusion :
155- is_rule_stop = any (rule .rule_stop for rule in exclude_capable_rules )
156- return worst_group_status , actionable_reasons , exclusion_reasons , is_rule_stop
234+ return best_status , actionable_reasons , exclusion_reasons , is_rule_stop
0 commit comments