11import datetime
2- import json
3- from collections import defaultdict
4- from functools import lru_cache
52from typing import Any
63from urllib .parse import quote
74
5+ import requests
86from torchci .test_insights .file_report_generator import FileReportGenerator
97from torchci .test_insights .weekly_notification import send_to_aws_alerting_lambda
108
@@ -88,44 +86,24 @@ class RegressionNotification:
8886 previous_regression_sha_key = (
8987 "additional_info/weekly_file_report/regression_metadata.json.gz"
9088 )
89+ keys = [
90+ "cost" ,
91+ "time" ,
92+ "skipped" ,
93+ "success" ,
94+ "failure" ,
95+ "flaky" ,
96+ ]
9197
9298 def __init__ (self ):
9399 self .file_report_generator = FileReportGenerator (dry_run = True )
94100
95- @lru_cache
96- def _previous_regression_sha (self ) -> str :
97- text = self .file_report_generator ._fetch_from_s3 (
98- "ossci-raw-job-status" ,
99- self .previous_regression_sha_key ,
100- )
101- try :
102- return json .loads (text )[0 ]["sha" ]
103- except (json .JSONDecodeError , IndexError , KeyError ):
104- return ""
105-
106- def upload_new_regression_sha (self , sha : str ) -> None :
107- body = [{"sha" : sha }]
108- self .file_report_generator .upload_to_s3 (
109- body ,
110- "ossci-raw-job-status" ,
111- self .previous_regression_sha_key ,
112- )
113-
114101 def gen_regression_for_team (
115102 self ,
116103 team : dict [str , Any ],
117104 prev_invoking_file_info : list [dict [str , Any ]],
118105 curr_invoking_file_info : list [dict [str , Any ]],
119- status_changes : list [dict [str , Any ]],
120106 ) -> dict [str , Any ]:
121- relevant_status_changes = [
122- change for change in status_changes if team ["condition" ](change )
123- ]
124- # Aggregate status changes
125- aggregated_status_changes = defaultdict (int )
126- for change in relevant_status_changes :
127- aggregated_status_changes [change ["status" ]] += 1
128-
129107 # Invoking_file_info diff
130108 relevant_curr_invoking_file_info = [
131109 info for info in curr_invoking_file_info if team ["condition" ](info )
@@ -141,40 +119,22 @@ def gen_regression_for_team(
141119 ]
142120
143121 def _sum_invoking_file_info (data : list [dict [str , Any ]]) -> dict [str , Any ]:
144- info = {
145- "count" : sum (item ["count" ] for item in data ),
146- "cost" : sum (item ["cost" ] for item in data ),
147- "time" : sum (item ["time" ] for item in data ),
148- "skipped" : sum (item ["skipped" ] for item in data ),
149- }
122+ info = {}
123+ for key in self .keys :
124+ info [key ] = sum (item [key ] for item in data )
150125 return info
151126
152127 agg_prev_file_info = _sum_invoking_file_info (relevant_prev_invoking_file_info )
153128 agg_curr_file_info = _sum_invoking_file_info (relevant_curr_invoking_file_info )
154129
155- invoking_file_info_diff = {
156- "count" : {
157- "previous" : agg_prev_file_info ["count" ],
158- "current" : agg_curr_file_info ["count" ],
159- },
160- "cost" : {
161- "previous" : agg_prev_file_info ["cost" ],
162- "current" : agg_curr_file_info ["cost" ],
163- },
164- "time" : {
165- "previous" : agg_prev_file_info ["time" ],
166- "current" : agg_curr_file_info ["time" ],
167- },
168- "skipped" : {
169- "previous" : agg_prev_file_info ["skipped" ],
170- "current" : agg_curr_file_info ["skipped" ],
171- },
172- }
130+ invoking_file_info_diff = {}
131+ for key in self .keys :
132+ invoking_file_info_diff [key ] = {
133+ "previous" : agg_prev_file_info [key ],
134+ "current" : agg_curr_file_info [key ],
135+ }
173136
174- return {
175- "status_changes" : aggregated_status_changes ,
176- "invoking_file_info" : invoking_file_info_diff ,
177- }
137+ return invoking_file_info_diff
178138
179139 def filter_thresholds (self , regression : dict [str , Any ]) -> bool :
180140 def _exceeds_threshold (value : float , total : float ) -> bool :
@@ -184,36 +144,20 @@ def _exceeds_threshold(value: float, total: float) -> bool:
184144
185145 return (value / total ) >= percent_threshold
186146
187- def _status_change_exceeds_threshold (field : str , total_field : str ) -> bool :
188- return _exceeds_threshold (
189- regression ["status_changes" ].get (field , 0 ),
190- regression ["invoking_file_info" ][total_field ]["previous" ],
191- )
192-
193147 def _diff_exceeds_threshold (field : str ) -> bool :
194148 return _exceeds_threshold (
195- abs (
196- regression ["invoking_file_info" ][field ]["current" ]
197- - regression ["invoking_file_info" ][field ]["previous" ]
198- ),
199- regression ["invoking_file_info" ][field ]["previous" ],
149+ abs (regression [field ]["current" ] - regression [field ]["previous" ]),
150+ regression [field ]["previous" ],
200151 )
201152
202- return (
203- _status_change_exceeds_threshold ("removed" , "count" )
204- or _status_change_exceeds_threshold ("added" , "count" )
205- or _status_change_exceeds_threshold ("started_skipping" , "skipped" )
206- or _status_change_exceeds_threshold ("stopped_skipping" , "skipped" )
207- or any (
208- _diff_exceeds_threshold (key )
209- for key in ["cost" , "count" , "skipped" , "time" ]
210- )
211- )
153+ keys = self .keys .copy ()
154+ keys .remove ("flaky" )
155+ return any (_diff_exceeds_threshold (key ) for key in keys )
212156
213157 def format_regression_string (self , team , regression : dict [str , Any ]) -> str :
214158 def _get_change (field : str , additional_processing ) -> str :
215- current = regression ["invoking_file_info" ][ field ]["current" ]
216- previous = regression ["invoking_file_info" ][ field ]["previous" ]
159+ current = regression [field ]["current" ]
160+ previous = regression [field ]["previous" ]
217161 change = current - previous
218162 percent_change = (change / previous ) * 100 if previous != 0 else 0
219163 percent_change = round (percent_change , 2 )
@@ -226,14 +170,12 @@ def _get_change(field: str, additional_processing) -> str:
226170 return (
227171 f"Regression detected for Team:{ team ['team' ]} :\n "
228172 + f"Link: { team ['link' ]} \n "
229- + f"New tests: { regression ['status_changes' ].get ('added' , 0 )} \n "
230- + f"Removed tests: { regression ['status_changes' ].get ('removed' , 0 )} \n "
231- + f"Started skipping: { regression ['status_changes' ].get ('started_skipping' , 0 )} \n "
232- + f"Stopped skipping: { regression ['status_changes' ].get ('stopped_skipping' , 0 )} \n "
233173 + f"Cost ($) change: { _get_change ('cost' , additional_processing = lambda x : round (x , 2 ))} \n "
234174 + f"Time (min) change: { _get_change ('time' , additional_processing = lambda x : round (x / 60 , 2 ))} \n "
235- + f"Test count change: { _get_change ('count' , additional_processing = lambda x : round (x , 2 ))} \n "
236175 + f"\\ # skipped change: { _get_change ('skipped' , additional_processing = lambda x : round (x , 2 ))} \n "
176+ + f"\\ # success change: { _get_change ('success' , additional_processing = lambda x : round (x , 2 ))} \n "
177+ + f"\\ # failure change: { _get_change ('failure' , additional_processing = lambda x : round (x , 2 ))} \n "
178+ # + f"\\# flaky change: {_get_change('flaky', additional_processing=lambda x: round(x, 2))}\n"
237179 )
238180
239181 def generate_alert_json (
@@ -265,50 +207,80 @@ def generate_alert_json(
265207 },
266208 }
267209
210+ def get_representative_data_for_time (
211+ self , start_date , stop_date
212+ ) -> list [dict [str , Any ]]:
213+ response = requests .get (
214+ f"https://hud.pytorch.org/api/flaky-tests/fileReport?startDate={ start_date } &endDate={ stop_date } "
215+ )
216+
217+ if response .status_code != 200 :
218+ raise RuntimeError (
219+ f"Failed to fetch file report data: { response .status_code } { response .text } "
220+ )
221+ data = response .json ()
222+ results = data ["results" ]
223+ costInfo = data ["costInfo" ]
224+ shas = data ["shas" ]
225+ testOwnerLabels = data ["testOwnerLabels" ]
226+
227+ for row in results :
228+ costMatch = next ((r for r in costInfo if r ["label" ] == row ["label" ]), None )
229+ ownerLabels = next (
230+ (r for r in testOwnerLabels if r ["file" ] == f"{ row ['file' ]} .py" ), None
231+ )
232+ commit = next ((s for s in shas if s ["sha" ] == row ["sha" ]), None )
233+ row ["cost" ] = (
234+ row ["time" ] * (costMatch ["price_per_hour" ] if costMatch else 0 )
235+ ) / 3600
236+ row ["short_job_name" ] = f"{ row ['workflow_name' ]} / { row ['job_name' ]} "
237+ row ["labels" ] = ownerLabels ["owner_labels" ] if ownerLabels else ["unknown" ]
238+ row ["push_date" ] = commit ["push_date" ] if commit else 0
239+ row ["sha" ] = commit ["sha" ] if commit else "unknown"
240+
241+ # choose a commit with the median number of rows
242+ if not results :
243+ raise RuntimeError ("No data found for the given time range." )
244+
245+ # group by job name, file
246+ grouped_data : dict [str , list [dict [str , Any ]]] = {}
247+ for row in results :
248+ key = f"{ row ['short_job_name' ]} |{ row ['file' ]} "
249+ if key not in grouped_data :
250+ grouped_data [key ] = []
251+ grouped_data [key ].append (row )
252+
253+ # get median for each job name, file
254+ representative_data : list [dict [str , Any ]] = []
255+ for key , rows in grouped_data .items ():
256+ median_row = sorted (
257+ rows ,
258+ key = lambda x : (x ["failure" ], x ["flaky" ], x ["skipped" ], x ["success" ]),
259+ )[len (rows ) // 2 ]
260+ representative_data .append (median_row )
261+ return representative_data
262+
268263 def determine_regressions (self ) -> None :
269264 """
270265 Determine regressions in the test data based on the provided filter.
271266 Returns a list of regression entries.
272267 """
273- previous_regression_sha = self ._previous_regression_sha ()
274- metadata = self .file_report_generator .fetch_existing_metadata ()
275- curr_sha = metadata [- 1 ]
276-
277- current_sha = curr_sha ["sha" ]
278- if previous_regression_sha == current_sha :
279- print (f"No new reports since last report: { previous_regression_sha } " )
280- return
281- prev_sha = metadata [- 2 ]["sha" ]
282-
283- status_changes = self .file_report_generator .get_status_changes (
284- sha1 = prev_sha ,
285- sha2 = current_sha ,
286- sha2_push_date = curr_sha ["push_date" ],
268+ # Choose 5 commits between 5 hours ago and 1d5h ago
269+ current_sha = self .get_representative_data_for_time (
270+ datetime .datetime .now (datetime .timezone .utc ).timestamp () - 3600 * 29 ,
271+ datetime .datetime .now (datetime .timezone .utc ).timestamp () - 3600 * 5 ,
287272 )
288273
289- def _s3_to_json (bucket : str , key : str ) -> Any :
290- text = self .file_report_generator ._fetch_from_s3 (bucket , key )
291- data = []
292- for line in text .splitlines ():
293- data .append (json .loads (line ))
294-
295- return data
296-
297- previous_sha_invoking_file_info = _s3_to_json (
298- "ossci-raw-job-status" ,
299- f"additional_info/weekly_file_report/data_{ prev_sha } .json.gz" ,
300- )
301- current_sha_invoking_file_info = _s3_to_json (
302- "ossci-raw-job-status" ,
303- f"additional_info/weekly_file_report/data_{ current_sha } .json.gz" ,
274+ yesterday_sha = self .get_representative_data_for_time (
275+ datetime .datetime .now (datetime .timezone .utc ).timestamp () - 3600 * 53 ,
276+ datetime .datetime .now (datetime .timezone .utc ).timestamp () - 3600 * 29 ,
304277 )
305278
306279 for team in CONFIG :
307280 change = self .gen_regression_for_team (
308281 team = team ,
309- prev_invoking_file_info = previous_sha_invoking_file_info ,
310- curr_invoking_file_info = current_sha_invoking_file_info ,
311- status_changes = status_changes ,
282+ prev_invoking_file_info = yesterday_sha ,
283+ curr_invoking_file_info = current_sha ,
312284 )
313285 if self .filter_thresholds (change ):
314286 print (f"Regression detected for team: { team ['team' ]} " )
@@ -322,7 +294,6 @@ def _s3_to_json(bucket: str, key: str) -> Any:
322294 send_to_aws_alerting_lambda (alert )
323295 else :
324296 print (f"No significant regression for team: { team ['team' ]} " )
325- self .upload_new_regression_sha (current_sha )
326297
327298
328299if __name__ == "__main__" :
0 commit comments