@@ -24,6 +24,7 @@ class JobMetrics:
2424 status : int
2525 created_at_ns : int
2626 workflow_id : int
27+ workflow_name : str
2728
2829
2930@dataclass
@@ -43,40 +44,60 @@ def get_sampled_workflow_metrics(github_repo: github.Repository):
4344 Returns a list of GaugeMetric objects, containing the relevant metrics about
4445 the workflow
4546 """
47+ queued_job_counts = {}
48+ running_job_counts = {}
4649
4750 # Other states are available (pending, waiting, etc), but the meaning
4851 # is not documented (See #70540).
4952 # "queued" seems to be the info we want.
50- queued_workflow_count = len (
51- [
52- x
53- for x in github_repo .get_workflow_runs (status = "queued" )
54- if x .name in WORKFLOWS_TO_TRACK
55- ]
56- )
57- running_workflow_count = len (
58- [
59- x
60- for x in github_repo .get_workflow_runs (status = "in_progress" )
61- if x .name in WORKFLOWS_TO_TRACK
62- ]
63- )
53+ for queued_workflow in github_repo .get_workflow_runs (status = "queued" ):
54+ if queued_workflow .name not in WORKFLOWS_TO_TRACK :
55+ continue
56+ for queued_workflow_job in queued_workflow .jobs ():
57+ job_name = queued_workflow_job .name
58+ # Workflows marked as queued can potentially only have some jobs
59+ # queued, so make sure to also count jobs currently in progress.
60+ if queued_workflow_job .status == "queued" :
61+ if job_name not in queued_job_counts :
62+ queued_job_counts [job_name ] = 1
63+ else :
64+ queued_job_counts [job_name ] += 1
65+ elif queued_workflow_job .status == "in_progress" :
66+ if job_name not in running_job_counts :
67+ running_job_counts [job_name ] = 1
68+ else :
69+ running_job_counts [job_name ] += 1
70+
71+ for running_workflow in github_repo .get_workflow_runs (status = "in_progress" ):
72+ if running_workflow .name not in WORKFLOWS_TO_TRACK :
73+ continue
74+ for running_workflow_job in running_workflow .jobs ():
75+ job_name = running_workflow_job .name
76+ if running_workflow_job .status != "in_progress" :
77+ continue
78+
79+ if job_name not in running_job_counts :
80+ running_job_counts [job_name ] = 1
81+ else :
82+ running_job_counts [job_name ] += 1
6483
6584 workflow_metrics = []
66- workflow_metrics .append (
67- GaugeMetric (
68- "workflow_queue_size" ,
69- queued_workflow_count ,
70- time .time_ns (),
85+ for queued_job in queued_job_counts :
86+ workflow_metrics .append (
87+ GaugeMetric (
88+ f"workflow_queue_size_{ queued_job } " ,
89+ queued_job_counts [queued_job ],
90+ time .time_ns (),
91+ )
7192 )
72- )
73- workflow_metrics .append (
74- GaugeMetric (
75- "running_workflow_count" ,
76- running_workflow_count ,
77- time .time_ns (),
93+ for running_job in running_job_counts :
94+ workflow_metrics .append (
95+ GaugeMetric (
96+ f"running_workflow_count_{ running_job } " ,
97+ running_job_counts [running_job ],
98+ time .time_ns (),
99+ )
78100 )
79- )
80101 # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
81102 workflow_metrics .append (
82103 GaugeMetric ("metrics_container_heartbeat" , 1 , time .time_ns ())
@@ -130,34 +151,6 @@ def get_per_workflow_metrics(
130151 workflow_jobs = workflow_run .jobs ()
131152 if workflow_jobs .totalCount == 0 :
132153 continue
133- if workflow_jobs .totalCount > 1 :
134- raise ValueError (
135- f"Encountered an unexpected number of jobs: { workflow_jobs .totalCount } "
136- )
137-
138- created_at = workflow_jobs [0 ].created_at
139- started_at = workflow_jobs [0 ].started_at
140- completed_at = workflow_jobs [0 ].completed_at
141-
142- job_result = int (workflow_jobs [0 ].conclusion == "success" )
143- if job_result :
144- # We still might want to mark the job as a failure if one of the steps
145- # failed. This is required due to use setting continue-on-error in
146- # the premerge pipeline to prevent sending emails while we are
147- # testing the infrastructure.
148- # TODO(boomanaiden154): Remove this once the premerge pipeline is no
149- # longer in a testing state and we can directly assert the workflow
150- # result.
151- for step in workflow_jobs [0 ].steps :
152- if step .conclusion != "success" :
153- job_result = 0
154- break
155-
156- queue_time = started_at - created_at
157- run_time = completed_at - started_at
158-
159- if run_time .seconds == 0 :
160- continue
161154
162155 if (
163156 workflows_to_track [workflow_run .name ] is None
@@ -170,20 +163,46 @@ def get_per_workflow_metrics(
170163 ):
171164 break
172165
173- # The timestamp associated with the event is expected by Grafana to be
174- # in nanoseconds.
175- created_at_ns = int (created_at .timestamp ()) * 10 ** 9
176-
177- workflow_metrics .append (
178- JobMetrics (
179- workflow_run .name ,
180- queue_time .seconds ,
181- run_time .seconds ,
182- job_result ,
183- created_at_ns ,
184- workflow_run .id ,
166+ for workflow_job in workflow_jobs :
167+ created_at = workflow_job .created_at
168+ started_at = workflow_job .started_at
169+ completed_at = workflow_job .completed_at
170+
171+ job_result = int (workflow_job .conclusion == "success" )
172+ if job_result :
173+ # We still might want to mark the job as a failure if one of the steps
174+ # failed. This is required due to use setting continue-on-error in
175+ # the premerge pipeline to prevent sending emails while we are
176+ # testing the infrastructure.
177+ # TODO(boomanaiden154): Remove this once the premerge pipeline is no
178+ # longer in a testing state and we can directly assert the workflow
179+ # result.
180+ for step in workflow_job .steps :
181+ if step .conclusion != "success" and step .conclusion != "skipped" :
182+ job_result = 0
183+ break
184+
185+ queue_time = started_at - created_at
186+ run_time = completed_at - started_at
187+
188+ if run_time .seconds == 0 :
189+ continue
190+
191+ # The timestamp associated with the event is expected by Grafana to be
192+ # in nanoseconds.
193+ created_at_ns = int (created_at .timestamp ()) * 10 ** 9
194+
195+ workflow_metrics .append (
196+ JobMetrics (
197+ workflow_run .name + "-" + workflow_job .name ,
198+ queue_time .seconds ,
199+ run_time .seconds ,
200+ job_result ,
201+ created_at_ns ,
202+ workflow_run .id ,
203+ workflow_run .name ,
204+ )
185205 )
186- )
187206
188207 return workflow_metrics
189208
@@ -238,8 +257,6 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
238257def main ():
239258 # Authenticate with Github
240259 auth = Auth .Token (os .environ ["GITHUB_TOKEN" ])
241- github_object = Github (auth = auth )
242- github_repo = github_object .get_repo ("llvm/llvm-project" )
243260
244261 grafana_api_key = os .environ ["GRAFANA_API_KEY" ]
245262 grafana_metrics_userid = os .environ ["GRAFANA_METRICS_USERID" ]
@@ -251,20 +268,19 @@ def main():
251268 # Enter the main loop. Every five minutes we wake up and dump metrics for
252269 # the relevant jobs.
253270 while True :
271+ github_object = Github (auth = auth )
272+ github_repo = github_object .get_repo ("llvm/llvm-project" )
273+
254274 current_metrics = get_per_workflow_metrics (github_repo , workflows_to_track )
255275 current_metrics += get_sampled_workflow_metrics (github_repo )
256- # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
257- current_metrics .append (
258- GaugeMetric ("metrics_container_heartbeat" , 1 , time .time_ns ())
259- )
260276
261277 upload_metrics (current_metrics , grafana_metrics_userid , grafana_api_key )
262278 print (f"Uploaded { len (current_metrics )} metrics" , file = sys .stderr )
263279
264280 for workflow_metric in reversed (current_metrics ):
265281 if isinstance (workflow_metric , JobMetrics ):
266282 workflows_to_track [
267- workflow_metric .job_name
283+ workflow_metric .workflow_name
268284 ] = workflow_metric .workflow_id
269285
270286 time .sleep (SCRAPE_INTERVAL_SECONDS )
0 commit comments