@@ -24,9 +24,90 @@ class JobMetrics:
2424 status : int
2525 created_at_ns : int
2626 workflow_id : int
27+ workflow_name : str
2728
2829
29- def get_metrics (github_repo : github .Repository , workflows_to_track : dict [str , int ]):
30+ @dataclass
31+ class GaugeMetric :
32+ name : str
33+ value : int
34+ time_ns : int
35+
36+
37+ def get_sampled_workflow_metrics (github_repo : github .Repository ):
38+ """Gets global statistics about the Github workflow queue
39+
40+ Args:
41+ github_repo: A github repo object to use to query the relevant information.
42+
43+ Returns:
44+ Returns a list of GaugeMetric objects, containing the relevant metrics about
45+ the workflow
46+ """
47+ queued_job_counts = {}
48+ running_job_counts = {}
49+
50+ # Other states are available (pending, waiting, etc), but the meaning
51+ # is not documented (See #70540).
52+ # "queued" seems to be the info we want.
53+ for queued_workflow in github_repo .get_workflow_runs (status = "queued" ):
54+ if queued_workflow .name not in WORKFLOWS_TO_TRACK :
55+ continue
56+ for queued_workflow_job in queued_workflow .jobs ():
57+ job_name = queued_workflow_job .name
58+ # Workflows marked as queued can potentially only have some jobs
59+ # queued, so make sure to also count jobs currently in progress.
60+ if queued_workflow_job .status == "queued" :
61+ if job_name not in queued_job_counts :
62+ queued_job_counts [job_name ] = 1
63+ else :
64+ queued_job_counts [job_name ] += 1
65+ elif queued_workflow_job .status == "in_progress" :
66+ if job_name not in running_job_counts :
67+ running_job_counts [job_name ] = 1
68+ else :
69+ running_job_counts [job_name ] += 1
70+
71+ for running_workflow in github_repo .get_workflow_runs (status = "in_progress" ):
72+ if running_workflow .name not in WORKFLOWS_TO_TRACK :
73+ continue
74+ for running_workflow_job in running_workflow .jobs ():
75+ job_name = running_workflow_job .name
76+ if running_workflow_job .status != "in_progress" :
77+ continue
78+
79+ if job_name not in running_job_counts :
80+ running_job_counts [job_name ] = 1
81+ else :
82+ running_job_counts [job_name ] += 1
83+
84+ workflow_metrics = []
85+ for queued_job in queued_job_counts :
86+ workflow_metrics .append (
87+ GaugeMetric (
88+ f"workflow_queue_size_{ queued_job } " ,
89+ queued_job_counts [queued_job ],
90+ time .time_ns (),
91+ )
92+ )
93+ for running_job in running_job_counts :
94+ workflow_metrics .append (
95+ GaugeMetric (
96+ f"running_workflow_count_{ running_job } " ,
97+ running_job_counts [running_job ],
98+ time .time_ns (),
99+ )
100+ )
101+ # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
102+ workflow_metrics .append (
103+ GaugeMetric ("metrics_container_heartbeat" , 1 , time .time_ns ())
104+ )
105+ return workflow_metrics
106+
107+
108+ def get_per_workflow_metrics (
109+ github_repo : github .Repository , workflows_to_track : dict [str , int ]
110+ ):
30111 """Gets the metrics for specified Github workflows.
31112
32113 This function takes in a list of workflows to track, and optionally the
@@ -43,14 +124,14 @@ def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, in
43124 Returns a list of JobMetrics objects, containing the relevant metrics about
44125 the workflow.
45126 """
46- workflow_runs = iter (github_repo .get_workflow_runs ())
47-
48127 workflow_metrics = []
49128
50129 workflows_to_include = set (workflows_to_track .keys ())
51130
52- while len (workflows_to_include ) > 0 :
53- workflow_run = next (workflow_runs )
131+ for workflow_run in iter (github_repo .get_workflow_runs ()):
132+ if len (workflows_to_include ) == 0 :
133+ break
134+
54135 if workflow_run .status != "completed" :
55136 continue
56137
@@ -70,34 +151,6 @@ def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, in
70151 workflow_jobs = workflow_run .jobs ()
71152 if workflow_jobs .totalCount == 0 :
72153 continue
73- if workflow_jobs .totalCount > 1 :
74- raise ValueError (
75- f"Encountered an unexpected number of jobs: { workflow_jobs .totalCount } "
76- )
77-
78- created_at = workflow_jobs [0 ].created_at
79- started_at = workflow_jobs [0 ].started_at
80- completed_at = workflow_jobs [0 ].completed_at
81-
82- job_result = int (workflow_jobs [0 ].conclusion == "success" )
83- if job_result :
84- # We still might want to mark the job as a failure if one of the steps
85- # failed. This is required due to use setting continue-on-error in
86- # the premerge pipeline to prevent sending emails while we are
87- # testing the infrastructure.
88- # TODO(boomanaiden154): Remove this once the premerge pipeline is no
89- # longer in a testing state and we can directly assert the workflow
90- # result.
91- for step in workflow_jobs [0 ].steps :
92- if step .conclusion != "success" :
93- job_result = 0
94- break
95-
96- queue_time = started_at - created_at
97- run_time = completed_at - started_at
98-
99- if run_time .seconds == 0 :
100- continue
101154
102155 if (
103156 workflows_to_track [workflow_run .name ] is None
@@ -110,20 +163,46 @@ def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, in
110163 ):
111164 break
112165
113- # The timestamp associated with the event is expected by Grafana to be
114- # in nanoseconds.
115- created_at_ns = int (created_at .timestamp ()) * 10 ** 9
116-
117- workflow_metrics .append (
118- JobMetrics (
119- workflow_run .name ,
120- queue_time .seconds ,
121- run_time .seconds ,
122- job_result ,
123- created_at_ns ,
124- workflow_run .id ,
166+ for workflow_job in workflow_jobs :
167+ created_at = workflow_job .created_at
168+ started_at = workflow_job .started_at
169+ completed_at = workflow_job .completed_at
170+
171+ job_result = int (workflow_job .conclusion == "success" )
172+ if job_result :
173+ # We still might want to mark the job as a failure if one of the steps
174+ # failed. This is required due to use setting continue-on-error in
175+ # the premerge pipeline to prevent sending emails while we are
176+ # testing the infrastructure.
177+ # TODO(boomanaiden154): Remove this once the premerge pipeline is no
178+ # longer in a testing state and we can directly assert the workflow
179+ # result.
180+ for step in workflow_job .steps :
181+ if step .conclusion != "success" and step .conclusion != "skipped" :
182+ job_result = 0
183+ break
184+
185+ queue_time = started_at - created_at
186+ run_time = completed_at - started_at
187+
188+ if run_time .seconds == 0 :
189+ continue
190+
191+ # The timestamp associated with the event is expected by Grafana to be
192+ # in nanoseconds.
193+ created_at_ns = int (created_at .timestamp ()) * 10 ** 9
194+
195+ workflow_metrics .append (
196+ JobMetrics (
197+ workflow_run .name + "-" + workflow_job .name ,
198+ queue_time .seconds ,
199+ run_time .seconds ,
200+ job_result ,
201+ created_at_ns ,
202+ workflow_run .id ,
203+ workflow_run .name ,
204+ )
125205 )
126- )
127206
128207 return workflow_metrics
129208
@@ -139,12 +218,27 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
139218 metrics_userid: The userid to use for the upload.
140219 api_key: The API key to use for the upload.
141220 """
221+
222+ if len (workflow_metrics ) == 0 :
223+ print ("No metrics found to upload." , file = sys .stderr )
224+ return
225+
142226 metrics_batch = []
143227 for workflow_metric in workflow_metrics :
144- workflow_formatted_name = workflow_metric .job_name .lower ().replace (" " , "_" )
145- metrics_batch .append (
146- f"{ workflow_formatted_name } queue_time={ workflow_metric .queue_time } ,run_time={ workflow_metric .run_time } ,status={ workflow_metric .status } { workflow_metric .created_at_ns } "
147- )
228+ if isinstance (workflow_metric , GaugeMetric ):
229+ name = workflow_metric .name .lower ().replace (" " , "_" )
230+ metrics_batch .append (
231+ f"{ name } value={ workflow_metric .value } { workflow_metric .time_ns } "
232+ )
233+ elif isinstance (workflow_metric , JobMetrics ):
234+ name = workflow_metric .job_name .lower ().replace (" " , "_" )
235+ metrics_batch .append (
236+ f"{ name } queue_time={ workflow_metric .queue_time } ,run_time={ workflow_metric .run_time } ,status={ workflow_metric .status } { workflow_metric .created_at_ns } "
237+ )
238+ else :
239+ raise ValueError (
240+ f"Unsupported object type { type (workflow_metric )} : { str (workflow_metric )} "
241+ )
148242
149243 request_data = "\n " .join (metrics_batch )
150244 response = requests .post (
@@ -163,8 +257,6 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
163257def main ():
164258 # Authenticate with Github
165259 auth = Auth .Token (os .environ ["GITHUB_TOKEN" ])
166- github_object = Github (auth = auth )
167- github_repo = github_object .get_repo ("llvm/llvm-project" )
168260
169261 grafana_api_key = os .environ ["GRAFANA_API_KEY" ]
170262 grafana_metrics_userid = os .environ ["GRAFANA_METRICS_USERID" ]
@@ -176,16 +268,20 @@ def main():
176268 # Enter the main loop. Every five minutes we wake up and dump metrics for
177269 # the relevant jobs.
178270 while True :
179- current_metrics = get_metrics (github_repo , workflows_to_track )
180- if len (current_metrics ) == 0 :
181- print ("No metrics found to upload." , file = sys .stderr )
182- continue
271+ github_object = Github (auth = auth )
272+ github_repo = github_object .get_repo ("llvm/llvm-project" )
273+
274+ current_metrics = get_per_workflow_metrics (github_repo , workflows_to_track )
275+ current_metrics += get_sampled_workflow_metrics (github_repo )
183276
184277 upload_metrics (current_metrics , grafana_metrics_userid , grafana_api_key )
185278 print (f"Uploaded { len (current_metrics )} metrics" , file = sys .stderr )
186279
187280 for workflow_metric in reversed (current_metrics ):
188- workflows_to_track [workflow_metric .job_name ] = workflow_metric .workflow_id
281+ if isinstance (workflow_metric , JobMetrics ):
282+ workflows_to_track [
283+ workflow_metric .workflow_name
284+ ] = workflow_metric .workflow_id
189285
190286 time .sleep (SCRAPE_INTERVAL_SECONDS )
191287
0 commit comments