Skip to content

Commit 56892b3

Browse files
committed
[CI] Rework github workflow processing
Before this patch, the job/workflow name impacted the metric name, meaning a change in the workflow definition could break monitoring. This patch adds a map to get a stable name on metrics from a workflow name. In addition, it reworks a bit how we track the last processed workflow to simplify the behavior, and work around an API issue which returns bogus results if a filter is used. This PR is a first step to bring buildkite metrics monitoring. Signed-off-by: Nathan Gauër <[email protected]>
1 parent e9de91e commit 56892b3

File tree

1 file changed

+114
-115
lines changed

1 file changed

+114
-115
lines changed

.ci/metrics/metrics.py

Lines changed: 114 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import requests
2+
import collections
23
import time
34
import os
45
from dataclasses import dataclass
@@ -12,9 +13,29 @@
1213
GRAFANA_URL = (
1314
"https://influx-prod-13-prod-us-east-0.grafana.net/api/v1/push/influx/write"
1415
)
15-
GITHUB_PROJECT = "llvm/llvm-project"
16-
WORKFLOWS_TO_TRACK = ["LLVM Premerge Checks"]
17-
SCRAPE_INTERVAL_SECONDS = 5 * 60
16+
SCRAPE_INTERVAL_SECONDS = 60
17+
18+
# Lists the Github workflows we want to track. Maps the Github job name to
19+
# the metric name prefix in grafana.
20+
# This metric name is also used as a key in the job->name map.
21+
GITHUB_WORKFLOW_TO_TRACK = {"LLVM Premerge Checks": "github_llvm_premerge_checks"}
22+
23+
# Lists the Github jobs to track for a given workflow. The key is the stable
24+
# name (metric name) of the workflow (see GITHUB_WORKFLOW_TO_TRACK).
25+
# Each value is a map to link the github job name to the corresponding metric
26+
# name.
27+
GITHUB_JOB_TO_TRACK = {
28+
"github_llvm_premerge_checks": {
29+
"Linux Premerge Checks (Test Only - Please Ignore Results)": "premerge_linux",
30+
"Windows Premerge Checks (Test Only - Please Ignore Results)": "premerge_windows",
31+
}
32+
}
33+
34+
# The number of workflows to pull when sampling queue size & running count.
35+
# Filtering at the query level doesn't work, and thus sampling workflow counts
36+
# cannot be done in a clean way.
37+
# If we miss running/queued workflows, we might want to bump this value.
38+
GITHUB_WORKFLOWS_COUNT_FOR_SAMPLING = 200
1839

1940

2041
@dataclass
@@ -34,7 +55,6 @@ class GaugeMetric:
3455
value: int
3556
time_ns: int
3657

37-
3858
def get_sampled_workflow_metrics(github_repo: github.Repository):
3959
"""Gets global statistics about the Github workflow queue
4060
@@ -45,131 +65,117 @@ def get_sampled_workflow_metrics(github_repo: github.Repository):
4565
Returns a list of GaugeMetric objects, containing the relevant metrics about
4666
the workflow
4767
"""
48-
queued_job_counts = {}
49-
running_job_counts = {}
50-
51-
# Other states are available (pending, waiting, etc), but the meaning
52-
# is not documented (See #70540).
53-
# "queued" seems to be the info we want.
54-
for queued_workflow in github_repo.get_workflow_runs(status="queued"):
55-
if queued_workflow.name not in WORKFLOWS_TO_TRACK:
56-
continue
57-
for queued_workflow_job in queued_workflow.jobs():
58-
job_name = queued_workflow_job.name
59-
# Workflows marked as queued can potentially only have some jobs
60-
# queued, so make sure to also count jobs currently in progress.
61-
if queued_workflow_job.status == "queued":
62-
if job_name not in queued_job_counts:
63-
queued_job_counts[job_name] = 1
64-
else:
65-
queued_job_counts[job_name] += 1
66-
elif queued_workflow_job.status == "in_progress":
67-
if job_name not in running_job_counts:
68-
running_job_counts[job_name] = 1
69-
else:
70-
running_job_counts[job_name] += 1
71-
72-
for running_workflow in github_repo.get_workflow_runs(status="in_progress"):
73-
if running_workflow.name not in WORKFLOWS_TO_TRACK:
68+
queued_count = collections.Counter()
69+
running_count = collections.Counter()
70+
71+
# Do not apply any filters to this query.
72+
# See https://github.com/orgs/community/discussions/86766
73+
# Applying filters like `status=completed` will break pagination, and
74+
# return a non-sorted and incomplete list of workflows.
75+
i = 0
76+
for task in iter(github_repo.get_workflow_runs()):
77+
if i > GITHUB_WORKFLOWS_COUNT_FOR_SAMPLING:
78+
break
79+
i += 1
80+
81+
if task.name not in GITHUB_WORKFLOW_TO_TRACK:
7482
continue
75-
for running_workflow_job in running_workflow.jobs():
76-
job_name = running_workflow_job.name
77-
if running_workflow_job.status != "in_progress":
83+
84+
prefix_name = GITHUB_WORKFLOW_TO_TRACK[task.name]
85+
for job in task.jobs():
86+
if job.name not in GITHUB_JOB_TO_TRACK[prefix_name]:
7887
continue
88+
suffix_name = GITHUB_JOB_TO_TRACK[prefix_name][job.name]
89+
metric_name = f"{prefix_name}_{suffix_name}"
90+
91+
# Other states are available (pending, waiting, etc), but the meaning
92+
# is not documented (See #70540).
93+
# "queued" seems to be the info we want.
94+
if job.status == "queued":
95+
queued_count[metric_name] += 1
96+
elif job.status == "in_progress":
97+
running_count[metric_name] += 1
7998

80-
if job_name not in running_job_counts:
81-
running_job_counts[job_name] = 1
82-
else:
83-
running_job_counts[job_name] += 1
8499

85100
workflow_metrics = []
86-
for queued_job in queued_job_counts:
101+
for name, value in queued_count.items():
87102
workflow_metrics.append(
88-
GaugeMetric(
89-
f"workflow_queue_size_{queued_job}",
90-
queued_job_counts[queued_job],
91-
time.time_ns(),
92-
)
103+
GaugeMetric(f"workflow_queue_size_{name}", value, time.time_ns())
93104
)
94-
for running_job in running_job_counts:
105+
for name, value in running_count.items():
95106
workflow_metrics.append(
96-
GaugeMetric(
97-
f"running_workflow_count_{running_job}",
98-
running_job_counts[running_job],
99-
time.time_ns(),
100-
)
107+
GaugeMetric(f"running_workflow_count_{name}", value, time.time_ns())
101108
)
109+
102110
# Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
103111
workflow_metrics.append(
104112
GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
105113
)
106114
return workflow_metrics
107115

108116

109-
def get_per_workflow_metrics(
110-
github_repo: github.Repository, workflows_to_track: dict[str, int]
111-
):
117+
def get_per_workflow_metrics(github_repo: github.Repository, last_seen_workflow: str):
112118
"""Gets the metrics for specified Github workflows.
113119
114120
This function takes in a list of workflows to track, and optionally the
115121
workflow ID of the last tracked invocation. It grabs the relevant data
116122
from Github, returning it to the caller.
123+
If the last_seen_workflow parameter is None, this returns no metrics, but
124+
returns the id of the most recent workflow.
117125
118126
Args:
119127
github_repo: A github repo object to use to query the relevant information.
120-
workflows_to_track: A dictionary mapping workflow names to the last
121-
invocation ID where metrics have been collected, or None to collect the
122-
last five results.
128+
last_seen_workflow: the last workflow this function processed.
123129
124130
Returns:
125-
Returns a list of JobMetrics objects, containing the relevant metrics about
126-
the workflow.
131+
Returns a tuple with 2 elements:
132+
- a list of JobMetrics objects, one per processed job.
133+
- the ID of the most recent processed workflow run.
127134
"""
128135
workflow_metrics = []
136+
most_recent_workflow_processed = None
137+
138+
# Do not apply any filters to this query.
139+
# See https://github.com/orgs/community/discussions/86766
140+
# Applying filters like `status=completed` will break pagination, and
141+
# return a non-sorted and incomplete list of workflows.
142+
for task in iter(github_repo.get_workflow_runs()):
143+
# Ignoring non-completed workflows.
144+
if task.status != "completed":
145+
continue
129146

130-
workflows_to_include = set(workflows_to_track.keys())
147+
# Record the most recent workflow we processed so this script
148+
# only processes it once.
149+
if most_recent_workflow_processed is None:
150+
most_recent_workflow_processed = task.id
131151

132-
for workflow_run in iter(github_repo.get_workflow_runs()):
133-
if len(workflows_to_include) == 0:
152+
# This condition only happens when this script starts:
153+
# this is used to determine a start point. Don't return any
154+
# metrics, just the most recent workflow ID.
155+
if last_seen_workflow is None:
134156
break
135157

136-
if workflow_run.status != "completed":
137-
continue
138-
139-
# This workflow was already sampled for this run, or is not tracked at
140-
# all. Ignoring.
141-
if workflow_run.name not in workflows_to_include:
142-
continue
158+
# This workflow has already been processed. We can stop now.
159+
if last_seen_workflow == task.id:
160+
break
143161

144-
# There were no new workflow invocations since the previous scrape.
145-
# The API returns a sorted list with the most recent invocations first,
146-
# so we can stop looking for this particular workflow. Continue to grab
147-
# information on the other workflows of interest, if present.
148-
if workflows_to_track[workflow_run.name] == workflow_run.id:
149-
workflows_to_include.remove(workflow_run.name)
162+
# This workflow is not interesting to us.
163+
if task.name not in GITHUB_WORKFLOW_TO_TRACK:
150164
continue
151165

152-
workflow_jobs = workflow_run.jobs()
153-
if workflow_jobs.totalCount == 0:
154-
continue
166+
name_prefix = GITHUB_WORKFLOW_TO_TRACK[task.name]
155167

156-
if (
157-
workflows_to_track[workflow_run.name] is None
158-
or workflows_to_track[workflow_run.name] == workflow_run.id
159-
):
160-
workflows_to_include.remove(workflow_run.name)
161-
if (
162-
workflows_to_track[workflow_run.name] is not None
163-
and len(workflows_to_include) == 0
164-
):
165-
break
168+
for job in task.jobs():
169+
# This job is not interesting to us.
170+
if job.name not in GITHUB_JOB_TO_TRACK[name_prefix]:
171+
continue
166172

167-
for workflow_job in workflow_jobs:
168-
created_at = workflow_job.created_at
169-
started_at = workflow_job.started_at
170-
completed_at = workflow_job.completed_at
173+
name_suffix = GITHUB_JOB_TO_TRACK[name_prefix][job.name]
174+
created_at = job.created_at
175+
started_at = job.started_at
176+
completed_at = job.completed_at
171177

172-
job_result = int(workflow_job.conclusion == "success")
178+
job_result = int(job.conclusion == "success")
173179
if job_result:
174180
# We still might want to mark the job as a failure if one of the steps
175181
# failed. This is required due to use setting continue-on-error in
@@ -178,7 +184,7 @@ def get_per_workflow_metrics(
178184
# TODO(boomanaiden154): Remove this once the premerge pipeline is no
179185
# longer in a testing state and we can directly assert the workflow
180186
# result.
181-
for step in workflow_job.steps:
187+
for step in job.steps:
182188
if step.conclusion != "success" and step.conclusion != "skipped":
183189
job_result = 0
184190
break
@@ -191,25 +197,23 @@ def get_per_workflow_metrics(
191197

192198
# The timestamp associated with the event is expected by Grafana to be
193199
# in nanoseconds.
194-
created_at_ns = int(created_at.timestamp()) * 10**9
200+
completed_at_ns = int(completed_at.timestamp()) * 10**9
195201

196-
logging.info(
197-
f"Adding a job metric for job {workflow_job.id} in workflow {workflow_run.id}"
198-
)
202+
logging.info(f"Adding a job metric for job {job.id} in workflow {task.id}")
199203

200204
workflow_metrics.append(
201205
JobMetrics(
202-
workflow_run.name + "-" + workflow_job.name,
206+
name_prefix + "_" + name_suffix,
203207
queue_time.seconds,
204208
run_time.seconds,
205209
job_result,
206-
created_at_ns,
210+
completed_at_ns,
207211
workflow_run.id,
208212
workflow_run.name,
209213
)
210214
)
211215

212-
return workflow_metrics
216+
return workflow_metrics, most_recent_workflow_processed
213217

214218

215219
def upload_metrics(workflow_metrics, metrics_userid, api_key):
@@ -259,32 +263,27 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
259263

260264
def main():
261265
# Authenticate with Github
262-
auth = Auth.Token(os.environ["GITHUB_TOKEN"])
263-
266+
github_auth = Auth.Token(os.environ["GITHUB_TOKEN"])
264267
grafana_api_key = os.environ["GRAFANA_API_KEY"]
265268
grafana_metrics_userid = os.environ["GRAFANA_METRICS_USERID"]
266269

267-
workflows_to_track = {}
268-
for workflow_to_track in WORKFLOWS_TO_TRACK:
269-
workflows_to_track[workflow_to_track] = None
270+
# The last workflow this script processed.
271+
github_last_seen_workflow = None
270272

271273
# Enter the main loop. Every five minutes we wake up and dump metrics for
272274
# the relevant jobs.
273275
while True:
274-
github_object = Github(auth=auth)
276+
github_object = Github(auth=github_auth)
275277
github_repo = github_object.get_repo("llvm/llvm-project")
276278

277-
current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
278-
current_metrics += get_sampled_workflow_metrics(github_repo)
279-
280-
upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
281-
logging.info(f"Uploaded {len(current_metrics)} metrics")
279+
github_metrics, github_last_seen_workflow = get_per_workflow_metrics(
280+
github_repo, github_last_seen_workflow
281+
)
282+
sampled_metrics = get_sampled_workflow_metrics(github_repo)
283+
metrics = github_metrics + sampled_metrics
282284

283-
for workflow_metric in reversed(current_metrics):
284-
if isinstance(workflow_metric, JobMetrics):
285-
workflows_to_track[
286-
workflow_metric.workflow_name
287-
] = workflow_metric.workflow_id
285+
upload_metrics(metrics, grafana_metrics_userid, grafana_api_key)
286+
logging.info(f"Uploaded {len(metrics)} metrics")
288287

289288
time.sleep(SCRAPE_INTERVAL_SECONDS)
290289

0 commit comments

Comments
 (0)