@@ -92,14 +92,57 @@ def attach_state(project, groups, rules, event_counts, user_counts):
9292 }
9393
9494
95+ class Pipeline (object ):
96+ def __init__ (self ):
97+ self .operations = []
98+
99+ def __call__ (self , sequence ):
100+ return reduce (lambda x , operation : operation (x ), self .operations , sequence )
101+
102+ def apply (self , function ):
103+ def operation (sequence ):
104+ result = function (sequence )
105+ logger .debug ('%r applied to %s items.' , function , len (sequence ))
106+ return result
107+ self .operations .append (operation )
108+ return self
109+
110+ def filter (self , function ):
111+ def operation (sequence ):
112+ result = filter (function , sequence )
113+ logger .debug ('%r filtered %s items to %s.' , function , len (sequence ), len (result ))
114+ return result
115+ self .operations .append (operation )
116+ return self
117+
118+ def map (self , function ):
119+ def operation (sequence ):
120+ result = map (function , sequence )
121+ logger .debug ('%r applied to %s items.' , function , len (sequence ))
122+ return result
123+ self .operations .append (operation )
124+ return self
125+
126+ def reduce (self , function , initializer ):
127+ def operation (sequence ):
128+ result = reduce (function , sequence , initializer (sequence ))
129+ logger .debug ('%r reduced %s items to %s.' , function , len (sequence ), len (result ))
130+ return result
131+ self .operations .append (operation )
132+ return self
133+
134+
95135def rewrite_record (record , project , groups , rules ):
96136 event = record .value .event
137+
138+ # Reattach the group to the event.
97139 group = groups .get (event .group_id )
98- if group is None :
140+ if group is not None :
141+ event .group = group
142+ else :
143+ logger .debug ('%r could not be associated with a group.' , record )
99144 return
100145
101- event .group = group
102-
103146 return Record (
104147 record .key ,
105148 Notification (
@@ -110,36 +153,38 @@ def rewrite_record(record, project, groups, rules):
110153 )
111154
112155
113- def group_records (records ):
114- results = defaultdict (lambda : defaultdict (list ))
115- for record in records :
116- group = record .value .event .group
117- for rule in record .value .rules :
118- results [rule ][group ].append (record )
156+ def group_records (groups , record ):
157+ group = record .value .event .group
158+ rules = record .value .rules
159+ if not rules :
160+ logger .debug ('%r has no associated rules, and will not be added to any groups.' , record )
161+
162+ for rule in rules :
163+ groups [rule ][group ].append (record )
119164
120- return results
165+ return groups
121166
122167
123- def sort_groups ( grouped ):
124- def sort_by_events ( groups ):
125- return OrderedDict (
168+ def sort_group_contents ( rules ):
169+ for key , groups in rules . iteritems ( ):
170+ rules [ key ] = OrderedDict (
126171 sorted (
127172 groups .items (),
128173 key = lambda (group , records ): (group .event_count , group .user_count ),
129174 reverse = True ,
130- ),
175+ )
131176 )
177+ return rules
132178
133- def sort_by_groups (rules ):
134- return OrderedDict (
135- sorted (
136- rules .items (),
137- key = lambda (rule , groups ): len (groups ),
138- reverse = True ,
139- ),
140- )
141179
142- return sort_by_groups ({rule : sort_by_events (groups ) for rule , groups in grouped .iteritems ()})
180+ def sort_rule_groups (rules ):
181+ return OrderedDict (
182+ sorted (
183+ rules .items (),
184+ key = lambda (rule , groups ): len (groups ),
185+ reverse = True ,
186+ ),
187+ )
143188
144189
145190def build_digest (project , records , state = None ):
@@ -153,6 +198,16 @@ def build_digest(project, records, state=None):
153198 state = fetch_state (project , records )
154199
155200 state = attach_state (** state )
156- records = filter (None , map (functools .partial (rewrite_record , ** state ), records ))
157- records = filter (lambda record : record .value .event .group .get_status () is GroupStatus .UNRESOLVED , records )
158- return sort_groups (group_records (records ))
201+
202+ def check_group_state (record ):
203+ return record .value .event .group .get_status () is GroupStatus .UNRESOLVED
204+
205+ pipeline = Pipeline (). \
206+ map (functools .partial (rewrite_record , ** state )). \
207+ filter (bool ). \
208+ filter (check_group_state ). \
209+ reduce (group_records , lambda sequence : defaultdict (lambda : defaultdict (list ))). \
210+ apply (sort_group_contents ). \
211+ apply (sort_rule_groups )
212+
213+ return pipeline (records )
0 commit comments