33import os
44from dataclasses import dataclass
55import sys
6+ import logging
67
78import github
89from github import Github
@@ -24,6 +25,7 @@ class JobMetrics:
2425 status : int
2526 created_at_ns : int
2627 workflow_id : int
28+ workflow_name : str
2729
2830
2931@dataclass
@@ -43,40 +45,60 @@ def get_sampled_workflow_metrics(github_repo: github.Repository):
4345 Returns a list of GaugeMetric objects, containing the relevant metrics about
4446 the workflow
4547 """
48+ queued_job_counts = {}
49+ running_job_counts = {}
4650
4751 # Other states are available (pending, waiting, etc), but the meaning
4852 # is not documented (See #70540).
4953 # "queued" seems to be the info we want.
50- queued_workflow_count = len (
51- [
52- x
53- for x in github_repo .get_workflow_runs (status = "queued" )
54- if x .name in WORKFLOWS_TO_TRACK
55- ]
56- )
57- running_workflow_count = len (
58- [
59- x
60- for x in github_repo .get_workflow_runs (status = "in_progress" )
61- if x .name in WORKFLOWS_TO_TRACK
62- ]
63- )
54+ for queued_workflow in github_repo .get_workflow_runs (status = "queued" ):
55+ if queued_workflow .name not in WORKFLOWS_TO_TRACK :
56+ continue
57+ for queued_workflow_job in queued_workflow .jobs ():
58+ job_name = queued_workflow_job .name
59+ # Workflows marked as queued can potentially only have some jobs
60+ # queued, so make sure to also count jobs currently in progress.
61+ if queued_workflow_job .status == "queued" :
62+ if job_name not in queued_job_counts :
63+ queued_job_counts [job_name ] = 1
64+ else :
65+ queued_job_counts [job_name ] += 1
66+ elif queued_workflow_job .status == "in_progress" :
67+ if job_name not in running_job_counts :
68+ running_job_counts [job_name ] = 1
69+ else :
70+ running_job_counts [job_name ] += 1
71+
72+ for running_workflow in github_repo .get_workflow_runs (status = "in_progress" ):
73+ if running_workflow .name not in WORKFLOWS_TO_TRACK :
74+ continue
75+ for running_workflow_job in running_workflow .jobs ():
76+ job_name = running_workflow_job .name
77+ if running_workflow_job .status != "in_progress" :
78+ continue
79+
80+ if job_name not in running_job_counts :
81+ running_job_counts [job_name ] = 1
82+ else :
83+ running_job_counts [job_name ] += 1
6484
6585 workflow_metrics = []
66- workflow_metrics .append (
67- GaugeMetric (
68- "workflow_queue_size" ,
69- queued_workflow_count ,
70- time .time_ns (),
86+ for queued_job in queued_job_counts :
87+ workflow_metrics .append (
88+ GaugeMetric (
89+ f"workflow_queue_size_{ queued_job } " ,
90+ queued_job_counts [queued_job ],
91+ time .time_ns (),
92+ )
7193 )
72- )
73- workflow_metrics .append (
74- GaugeMetric (
75- "running_workflow_count" ,
76- running_workflow_count ,
77- time .time_ns (),
94+ for running_job in running_job_counts :
95+ workflow_metrics .append (
96+ GaugeMetric (
97+ f"running_workflow_count_{ running_job } " ,
98+ running_job_counts [running_job ],
99+ time .time_ns (),
100+ )
78101 )
79- )
80102 # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
81103 workflow_metrics .append (
82104 GaugeMetric ("metrics_container_heartbeat" , 1 , time .time_ns ())
@@ -130,34 +152,6 @@ def get_per_workflow_metrics(
130152 workflow_jobs = workflow_run .jobs ()
131153 if workflow_jobs .totalCount == 0 :
132154 continue
133- if workflow_jobs .totalCount > 1 :
134- raise ValueError (
135- f"Encountered an unexpected number of jobs: { workflow_jobs .totalCount } "
136- )
137-
138- created_at = workflow_jobs [0 ].created_at
139- started_at = workflow_jobs [0 ].started_at
140- completed_at = workflow_jobs [0 ].completed_at
141-
142- job_result = int (workflow_jobs [0 ].conclusion == "success" )
143- if job_result :
144- # We still might want to mark the job as a failure if one of the steps
145- # failed. This is required due to use setting continue-on-error in
146- # the premerge pipeline to prevent sending emails while we are
147- # testing the infrastructure.
148- # TODO(boomanaiden154): Remove this once the premerge pipeline is no
149- # longer in a testing state and we can directly assert the workflow
150- # result.
151- for step in workflow_jobs [0 ].steps :
152- if step .conclusion != "success" :
153- job_result = 0
154- break
155-
156- queue_time = started_at - created_at
157- run_time = completed_at - started_at
158-
159- if run_time .seconds == 0 :
160- continue
161155
162156 if (
163157 workflows_to_track [workflow_run .name ] is None
@@ -170,20 +164,50 @@ def get_per_workflow_metrics(
170164 ):
171165 break
172166
173- # The timestamp associated with the event is expected by Grafana to be
174- # in nanoseconds.
175- created_at_ns = int (created_at .timestamp ()) * 10 ** 9
167+ for workflow_job in workflow_jobs :
168+ created_at = workflow_job .created_at
169+ started_at = workflow_job .started_at
170+ completed_at = workflow_job .completed_at
171+
172+ job_result = int (workflow_job .conclusion == "success" )
173+ if job_result :
174+ # We still might want to mark the job as a failure if one of the steps
175+ # failed. This is required due to use setting continue-on-error in
176+ # the premerge pipeline to prevent sending emails while we are
177+ # testing the infrastructure.
178+ # TODO(boomanaiden154): Remove this once the premerge pipeline is no
179+ # longer in a testing state and we can directly assert the workflow
180+ # result.
181+ for step in workflow_job .steps :
182+ if step .conclusion != "success" and step .conclusion != "skipped" :
183+ job_result = 0
184+ break
185+
186+ queue_time = started_at - created_at
187+ run_time = completed_at - started_at
188+
189+ if run_time .seconds == 0 :
190+ continue
191+
192+ # The timestamp associated with the event is expected by Grafana to be
193+ # in nanoseconds.
194+ created_at_ns = int (created_at .timestamp ()) * 10 ** 9
195+
196+ logging .info (
197+ f"Adding a job metric for job { workflow_job .id } in workflow { workflow_run .id } "
198+ )
176199
177- workflow_metrics .append (
178- JobMetrics (
179- workflow_run .name ,
180- queue_time .seconds ,
181- run_time .seconds ,
182- job_result ,
183- created_at_ns ,
184- workflow_run .id ,
200+ workflow_metrics .append (
201+ JobMetrics (
202+ workflow_run .name + "-" + workflow_job .name ,
203+ queue_time .seconds ,
204+ run_time .seconds ,
205+ job_result ,
206+ created_at_ns ,
207+ workflow_run .id ,
208+ workflow_run .name ,
209+ )
185210 )
186- )
187211
188212 return workflow_metrics
189213
@@ -201,7 +225,7 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
201225 """
202226
203227 if len (workflow_metrics ) == 0 :
204- print ("No metrics found to upload." , file = sys . stderr )
228+ logging . info ("No metrics found to upload." )
205229 return
206230
207231 metrics_batch = []
@@ -230,16 +254,12 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
230254 )
231255
232256 if response .status_code < 200 or response .status_code >= 300 :
233- print (
234- f"Failed to submit data to Grafana: { response .status_code } " , file = sys .stderr
235- )
257+ logging .info (f"Failed to submit data to Grafana: { response .status_code } " )
236258
237259
238260def main ():
239261 # Authenticate with Github
240262 auth = Auth .Token (os .environ ["GITHUB_TOKEN" ])
241- github_object = Github (auth = auth )
242- github_repo = github_object .get_repo ("llvm/llvm-project" )
243263
244264 grafana_api_key = os .environ ["GRAFANA_API_KEY" ]
245265 grafana_metrics_userid = os .environ ["GRAFANA_METRICS_USERID" ]
@@ -251,24 +271,24 @@ def main():
251271 # Enter the main loop. Every five minutes we wake up and dump metrics for
252272 # the relevant jobs.
253273 while True :
274+ github_object = Github (auth = auth )
275+ github_repo = github_object .get_repo ("llvm/llvm-project" )
276+
254277 current_metrics = get_per_workflow_metrics (github_repo , workflows_to_track )
255278 current_metrics += get_sampled_workflow_metrics (github_repo )
256- # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
257- current_metrics .append (
258- GaugeMetric ("metrics_container_heartbeat" , 1 , time .time_ns ())
259- )
260279
261280 upload_metrics (current_metrics , grafana_metrics_userid , grafana_api_key )
262- print (f"Uploaded { len (current_metrics )} metrics" , file = sys . stderr )
281+ logging . info (f"Uploaded { len (current_metrics )} metrics" )
263282
264283 for workflow_metric in reversed (current_metrics ):
265284 if isinstance (workflow_metric , JobMetrics ):
266285 workflows_to_track [
267- workflow_metric .job_name
286+ workflow_metric .workflow_name
268287 ] = workflow_metric .workflow_id
269288
270289 time .sleep (SCRAPE_INTERVAL_SECONDS )
271290
272291
273292if __name__ == "__main__" :
293+ logging .basicConfig (level = logging .INFO )
274294 main ()
0 commit comments