33from pathlib import Path
44import heapq
55import statistics
6-
7- import common
8-
9-
10- # Simple median calculation
11- class SimpleMedian :
12-
6+ from common import Validate , SanitizedConfig
7+ from abc import ABC , abstractmethod
8+
9+
10+ class Aggregator (ABC ):
11+ """
12+ Aggregator classes used to "aggregate" a pool of elements, and produce an
13+ "average" (precisely, some "measure of central tendency") from the elements.
14+ """
15+ @staticmethod
16+ @abstractmethod
17+ def get_type () -> str :
18+ """
19+ Return a string indicating the type of average this aggregator
20+ produces.
21+ """
22+ pass
23+
24+ @abstractmethod
25+ def add (self , n : float ):
26+ """
27+ Add/aggregate an element to the pool of elements used by this aggregator
28+ to produce an average calculation.
29+ """
30+ pass
31+
32+ @abstractmethod
33+ def get_avg (self ) -> float :
34+ """
35+ Produce an average from the pool of elements aggregated using add().
36+ """
37+ pass
38+
39+
40+ class SimpleMedian (Aggregator ):
41+ """
42+ Simple median calculation: if the number of samples being generated are low,
43+ this is the fastest median method.
44+ """
1345 def __init__ (self ):
1446 self .elements = []
1547
48+ @staticmethod
49+ def get_type () -> str :
50+ return "median"
51+
1652 def add (self , n : float ):
1753 self .elements .append (n )
1854
1955 def get_median (self ) -> float :
2056 return statistics .median (self .elements )
2157
2258
23- # Calculate medians incrementally using a heap: Useful for when dealing with
24- # large number of samples.
25- #
26- # TODO how many samples are we going to realistically get? I had written this
27- # with precommit in mind, but if this only runs nightly, it would actually be
28- # faster to do a normal median calculation .
29- class StreamingMedian :
59+ class StreamingMedian ( Aggregator ):
60+ """
61+ Calculate medians incrementally using heaps: Theoretically the fastest way
62+ to calculate a median from a stream of elements, but realistically is only
63+ faster when dealing with huge numbers of samples that would be generated by
64+ i.e. enabling this workflow in precommit and using longer periods of time .
65+ """
3066
3167 def __init__ (self ):
3268 # Gist: we keep a minheap and a maxheap, and store the median as the top
@@ -36,6 +72,10 @@ def __init__(self):
3672 self .minheap_larger = []
3773 self .maxheap_smaller = []
3874
75+ @staticmethod
76+ def get_type () -> str :
77+ return "median"
78+
3979 # Note: numbers on maxheap should be negative, as heapq
4080 # is minheap by default
4181
@@ -63,64 +103,87 @@ def get_median(self) -> float:
63103 return - self .maxheap_smaller [0 ]
64104
65105
66- def aggregate_median (test_name : str , test_dir : str , cutoff : str ):
67-
68- # Get all .csv samples for the requested test folder
69- def csv_samples () -> list [str ]:
70- # TODO check that the path below is valid directory
71- cache_dir = Path (f"{ test_dir } " )
72- # TODO check for time range; What time range do I want?
73- return filter (
74- lambda f : f .is_file ()
75- and common .valid_timestamp (str (f )[- 19 :- 4 ])
76- and str (f )[- 19 :- 4 ] > cutoff ,
77- cache_dir .glob (f"{ test_name } -*_*.csv" ),
78- )
79-
80- # Calculate median of every desired metric:
81- aggregate_s = dict ()
82- for sample_path in csv_samples ():
83- with open (sample_path , "r" ) as sample_file :
84- for s in csv .DictReader (sample_file ):
85- test_case = s ["TestCase" ]
86- # Construct entry in aggregate_s for test case if it does not
87- # exist already:
88- if test_case not in aggregate_s :
89- aggregate_s [test_case ] = {
90- metric : SimpleMedian () for metric in common .metrics_variance
91- }
92-
93- for metric in common .metrics_variance :
94- aggregate_s [test_case ][metric ].add (common .sanitize (s [metric ]))
106+ class Aggregate :
107+ """
108+ Static class providing methods for aggregating data
109+ """
110+ @staticmethod
111+ def hist_avg (benchmark_name : str , res_dir : str , cutoff : str ,
112+ aggregator = SimpleMedian ):
113+ if not os .path .isdir (res_dir ):
114+ print (f"Not a directory: { res_dir } ." , file = sys .stderr )
115+ exit (1 )
116+
117+ def csv_samples () -> list [str ]:
118+ """ Get all valid .csv samples from the results folder. """
119+ cache_dir = Path (f"{ res_dir } " )
120+ # Filter all benchmark .csv files in the result directory:
121+ return filter (
122+ # Make sure the .csv "file" is a file:
123+ lambda f : f .is_file ()
124+ # Make sure timestamp of .csv file is good format:
125+ # [-19:-4] corresponds to the timestamp in the filename.
126+ and Validate .timestamp (str (f )[- 19 :- 4 ])
127+ # Make sure timestamp is bigger than cutoff timestamp:
128+ and str (f )[- 19 :- 4 ] > cutoff ,
129+ cache_dir .glob (f"{ benchmark_name } -*_*.csv" ),
130+ )
95131
96- # Write calculated median (aggregate_s) as a new .csv file:
97- with open (
98- f"{ test_dir } /{ test_name } -median.csv" , "w"
99- ) as output_csv :
100- writer = csv .DictWriter (
101- output_csv , fieldnames = ["TestCase" , * common .metrics_variance .keys ()]
102- )
103- writer .writeheader ()
104- for test_case in aggregate_s :
105- writer .writerow (
106- {"TestCase" : test_case }
107- | {
108- metric : aggregate_s [test_case ][metric ].get_median ()
109- for metric in common .metrics_variance
110- }
132+ # Calculate median of every desired metric:
133+ samples_aggregate = dict ()
134+ for sample_path in csv_samples ():
135+ with open (sample_path , "r" ) as sample_file :
136+ for sample in csv .DictReader (sample_file ):
137+ test = sample ["TestCase" ]
138+ # Construct entry in aggregator for test if it doesn't exist
139+ # already:
140+ if test not in samples_aggregate :
141+ samples_aggregate [test ] = {
142+ metric : aggregator ()
143+ for metric in SanitizedConfig .METRICS_TOLERANCES
144+ }
145+
146+ # For each metric of concern, add to aggregator:
147+ for metric in SanitizedConfig .METRICS_TOLERANCES :
148+ sample_value = Validate .sanitize_stat (sample [metric ])
149+ if not isinstance (sample_value , float ):
150+ print (f"Malformatted statistic in { str (sample_path )} : " +
151+ f"'{ sample [metric ]} ' for { test } ." )
152+ exit (1 )
153+ # Add metric from sample for current test to aggregate:
154+ samples_aggregate [test ][metric ].add (sample_value )
155+
156+ # Calculate + write new average (from samples_aggregate) in new .csv file:
157+ with open (
158+ f"{ res_dir } /{ benchmark_name } -{ aggregator .get_type ()} .csv" , "w"
159+ ) as output_csv :
160+ writer = csv .DictWriter (
161+ output_csv , fieldnames = ["TestCase" , * SanitizedConfig .METRICS_TOLERANCES .keys ()]
111162 )
163+ writer .writeheader ()
164+ for test in samples_aggregate :
165+ writer .writerow (
166+ {"TestCase" : test_case }
167+ | {
168+ metric : samples_aggregate [test ][metric ].get_median ()
169+ for metric in SanitizedConfig .METRICS_TOLERANCES
170+ }
171+ )
112172
113173
114174if __name__ == "__main__" :
115- if len (sys .argv ) < 4 :
175+ if len (sys .argv ) != 5 :
116176 print (
117- f"Usage: { sys .argv [0 ]} <test name> <absolute path to test directory > <cutoff timestamp YYYYMMDD_HHMMSS>"
177+ f"Usage: { sys .argv [0 ]} <path to /devops> <benchmark name> <absolute path to benchmark results > <cutoff timestamp YYYYMMDD_HHMMSS>"
118178 )
119179 exit (1 )
120- if not common .valid_timestamp (sys .argv [3 ]):
121- print (sys .argv )
122- print (f"Bad cutoff timestamp, please use YYYYMMDD_HHMMSS." )
180+ if not Validate .timestamp (sys .argv [4 ]):
181+ print (f"Bad cutoff timestamp, please use YYYYMMDD_HHMMSS." , file = sys .stderr )
182+ exit (1 )
183+ if not Validate .filepath (sys .argv [1 ]):
184+ print (f"Not a valid filepath: { sys .argv [1 ]} " , file = sys .stderr )
123185 exit (1 )
124- common .load_configs ()
186+ # If the filepath provided passed filepath validation, then it is clean
187+ SanitizedConfig .load (sys .argv [1 ])
125188
126- aggregate_median (sys .argv [1 ], sys .argv [2 ], sys .argv [3 ])
189+ Aggregate . hist_avg (sys .argv [2 ], sys .argv [3 ], sys .argv [4 ])
0 commit comments