1212 "https://influx-prod-13-prod-us-east-0.grafana.net/api/v1/push/influx/write"
1313)
1414GITHUB_PROJECT = "llvm/llvm-project"
15- WORKFLOWS_TO_TRACK = ["Check code formatting" , " LLVM Premerge Checks" ]
15+ WORKFLOWS_TO_TRACK = ["LLVM Premerge Checks" ]
1616SCRAPE_INTERVAL_SECONDS = 5 * 60
1717
1818
@@ -26,7 +26,67 @@ class JobMetrics:
2626 workflow_id : int
2727
2828
29- def get_metrics (github_repo : github .Repository , workflows_to_track : dict [str , int ]):
29+ @dataclass
30+ class GaugeMetric :
31+ name : str
32+ value : int
33+ time_ns : int
34+
35+
36+ def get_sampled_workflow_metrics (github_repo : github .Repository ):
37+ """Gets global statistics about the Github workflow queue
38+
39+ Args:
40+ github_repo: A github repo object to use to query the relevant information.
41+
42+ Returns:
43+ Returns a list of GaugeMetric objects, containing the relevant metrics about
44+ the workflow
45+ """
46+
47+ # Other states are available (pending, waiting, etc), but the meaning
48+ # is not documented (See #70540).
49+ # "queued" seems to be the info we want.
50+ queued_workflow_count = len (
51+ [
52+ x
53+ for x in github_repo .get_workflow_runs (status = "queued" )
54+ if x .name in WORKFLOWS_TO_TRACK
55+ ]
56+ )
57+ running_workflow_count = len (
58+ [
59+ x
60+ for x in github_repo .get_workflow_runs (status = "in_progress" )
61+ if x .name in WORKFLOWS_TO_TRACK
62+ ]
63+ )
64+
65+ workflow_metrics = []
66+ workflow_metrics .append (
67+ GaugeMetric (
68+ "workflow_queue_size" ,
69+ queued_workflow_count ,
70+ time .time_ns (),
71+ )
72+ )
73+ workflow_metrics .append (
74+ GaugeMetric (
75+ "running_workflow_count" ,
76+ running_workflow_count ,
77+ time .time_ns (),
78+ )
79+ )
80+ # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
81+ workflow_metrics .append (
82+ GaugeMetric ("metrics_container_heartbeat" , 1 , time .time_ns ())
83+ )
84+ return workflow_metrics
85+
86+
87+ def get_per_workflow_metrics (
88+ github_repo : github .Repository , workflows_to_track : dict [str , int ]
89+ ):
3090 """Gets the metrics for specified Github workflows.
3191
3292 This function takes in a list of workflows to track, and optionally the
@@ -43,14 +103,14 @@ def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, in
43103 Returns a list of JobMetrics objects, containing the relevant metrics about
44104 the workflow.
45105 """
46- workflow_runs = iter (github_repo .get_workflow_runs ())
47-
48106 workflow_metrics = []
49107
50108 workflows_to_include = set (workflows_to_track .keys ())
51109
52- while len (workflows_to_include ) > 0 :
53- workflow_run = next (workflow_runs )
110+ for workflow_run in iter (github_repo .get_workflow_runs ()):
111+ if len (workflows_to_include ) == 0 :
112+ break
113+
54114 if workflow_run .status != "completed" :
55115 continue
56116
@@ -70,34 +130,6 @@ def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, in
70130 workflow_jobs = workflow_run .jobs ()
71131 if workflow_jobs .totalCount == 0 :
72132 continue
73- if workflow_jobs .totalCount > 1 :
74- raise ValueError (
75- f"Encountered an unexpected number of jobs: { workflow_jobs .totalCount } "
76- )
77-
78- created_at = workflow_jobs [0 ].created_at
79- started_at = workflow_jobs [0 ].started_at
80- completed_at = workflow_jobs [0 ].completed_at
81-
82- job_result = int (workflow_jobs [0 ].conclusion == "success" )
83- if job_result :
84- # We still might want to mark the job as a failure if one of the steps
85- # failed. This is required due to use setting continue-on-error in
86- # the premerge pipeline to prevent sending emails while we are
87- # testing the infrastructure.
88- # TODO(boomanaiden154): Remove this once the premerge pipeline is no
89- # longer in a testing state and we can directly assert the workflow
90- # result.
91- for step in workflow_jobs [0 ].steps :
92- if step .conclusion != "success" :
93- job_result = 0
94- break
95-
96- queue_time = started_at - created_at
97- run_time = completed_at - started_at
98-
99- if run_time .seconds == 0 :
100- continue
101133
102134 if (
103135 workflows_to_track [workflow_run .name ] is None
@@ -110,20 +142,45 @@ def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, in
110142 ):
111143 break
112144
113- # The timestamp associated with the event is expected by Grafana to be
114- # in nanoseconds.
115- created_at_ns = int (created_at .timestamp ()) * 10 ** 9
116-
117- workflow_metrics .append (
118- JobMetrics (
119- workflow_run .name ,
120- queue_time .seconds ,
121- run_time .seconds ,
122- job_result ,
123- created_at_ns ,
124- workflow_run .id ,
145+ for workflow_job in workflow_jobs :
146+ created_at = workflow_job .created_at
147+ started_at = workflow_job .started_at
148+ completed_at = workflow_job .completed_at
149+
150+ job_result = int (workflow_job .conclusion == "success" )
151+ if job_result :
152+ # We still might want to mark the job as a failure if one of the steps
153+ # failed. This is required due to use setting continue-on-error in
154+ # the premerge pipeline to prevent sending emails while we are
155+ # testing the infrastructure.
156+ # TODO(boomanaiden154): Remove this once the premerge pipeline is no
157+ # longer in a testing state and we can directly assert the workflow
158+ # result.
159+ for step in workflow_job .steps :
160+ if step .conclusion != "success" :
161+ job_result = 0
162+ break
163+
164+ queue_time = started_at - created_at
165+ run_time = completed_at - started_at
166+
167+ if run_time .seconds == 0 :
168+ continue
169+
170+ # The timestamp associated with the event is expected by Grafana to be
171+ # in nanoseconds.
172+ created_at_ns = int (created_at .timestamp ()) * 10 ** 9
173+
174+ workflow_metrics .append (
175+ JobMetrics (
176+ workflow_run .name + "-" + workflow_job .name ,
177+ queue_time .seconds ,
178+ run_time .seconds ,
179+ job_result ,
180+ created_at_ns ,
181+ workflow_run .id ,
182+ )
125183 )
126- )
127184
128185 return workflow_metrics
129186
@@ -139,12 +196,27 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
139196 metrics_userid: The userid to use for the upload.
140197 api_key: The API key to use for the upload.
141198 """
199+
200+ if len (workflow_metrics ) == 0 :
201+ print ("No metrics found to upload." , file = sys .stderr )
202+ return
203+
142204 metrics_batch = []
143205 for workflow_metric in workflow_metrics :
144- workflow_formatted_name = workflow_metric .job_name .lower ().replace (" " , "_" )
145- metrics_batch .append (
146- f"{ workflow_formatted_name } queue_time={ workflow_metric .queue_time } ,run_time={ workflow_metric .run_time } ,status={ workflow_metric .status } { workflow_metric .created_at_ns } "
147- )
206+ if isinstance (workflow_metric , GaugeMetric ):
207+ name = workflow_metric .name .lower ().replace (" " , "_" )
208+ metrics_batch .append (
209+ f"{ name } value={ workflow_metric .value } { workflow_metric .time_ns } "
210+ )
211+ elif isinstance (workflow_metric , JobMetrics ):
212+ name = workflow_metric .job_name .lower ().replace (" " , "_" )
213+ metrics_batch .append (
214+ f"{ name } queue_time={ workflow_metric .queue_time } ,run_time={ workflow_metric .run_time } ,status={ workflow_metric .status } { workflow_metric .created_at_ns } "
215+ )
216+ else :
217+ raise ValueError (
218+ f"Unsupported object type { type (workflow_metric )} : { str (workflow_metric )} "
219+ )
148220
149221 request_data = "\n " .join (metrics_batch )
150222 response = requests .post (
@@ -176,16 +248,21 @@ def main():
176248 # Enter the main loop. Every five minutes we wake up and dump metrics for
177249 # the relevant jobs.
178250 while True :
179- current_metrics = get_metrics (github_repo , workflows_to_track )
180- if len (current_metrics ) == 0 :
181- print ("No metrics found to upload." , file = sys .stderr )
182- continue
251+ current_metrics = get_per_workflow_metrics (github_repo , workflows_to_track )
252+ current_metrics += get_sampled_workflow_metrics (github_repo )
253+ # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
254+ current_metrics .append (
255+ GaugeMetric ("metrics_container_heartbeat" , 1 , time .time_ns ())
256+ )
183257
184258 upload_metrics (current_metrics , grafana_metrics_userid , grafana_api_key )
185259 print (f"Uploaded { len (current_metrics )} metrics" , file = sys .stderr )
186260
187261 for workflow_metric in reversed (current_metrics ):
188- workflows_to_track [workflow_metric .job_name ] = workflow_metric .workflow_id
262+ if isinstance (workflow_metric , JobMetrics ):
263+ workflows_to_track [
264+ workflow_metric .job_name
265+ ] = workflow_metric .workflow_id
189266
190267 time .sleep (SCRAPE_INTERVAL_SECONDS )
191268
0 commit comments