Skip to content

Commit a952994

Browse files
committed
iterate over fixed depth
1 parent fbf6505 commit a952994

File tree

1 file changed

+116
-108
lines changed

1 file changed

+116
-108
lines changed

.ci/metrics/metrics.py

Lines changed: 116 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
1-
import requests
21
import collections
3-
import time
2+
import datetime
3+
import github
4+
import logging
45
import os
5-
from dataclasses import dataclass
6+
import requests
67
import sys
7-
import logging
8+
import time
89

9-
import github
10-
from github import Github
10+
from dataclasses import dataclass
1111
from github import Auth
12+
from github import Github
1213

1314
GRAFANA_URL = (
1415
"https://influx-prod-13-prod-us-east-0.grafana.net/api/v1/push/influx/write"
1516
)
16-
SCRAPE_INTERVAL_SECONDS = 60
17+
SCRAPE_INTERVAL_SECONDS = 5 * 60
1718

1819
# Lists the Github workflows we want to track. Maps the Github job name to
1920
# the metric name prefix in grafana.
@@ -31,89 +32,47 @@
3132
}
3233
}
3334

34-
# The number of workflows to pull when sampling queue size & running count.
35-
# Filtering at the query level doesn't work, and thus sampling workflow counts
36-
# cannot be done in a clean way.
37-
# If we miss running/queued workflows, we might want to bump this value.
38-
GITHUB_WORKFLOWS_COUNT_FOR_SAMPLING = 200
39-
35+
# The number of workflows to pull when sampling Github workflows.
36+
# - Github API filtering is broken: we cannot apply any filtering:
37+
# - See https://github.com/orgs/community/discussions/86766
38+
# - A workflow can complete before another workflow, even when starting later.
39+
# - We don't want to sample the same workflow twice.
40+
#
41+
# This means we essentially have a list of workflows sorted by creation date,
42+
# and that's all we can deduce from it. So for each iteration, we'll blindly
43+
# process the last N workflows.
44+
GITHUB_WORKFLOWS_MAX_PROCESS_COUNT = 1000
45+
# Second reason for the cut: reaching a workflow older than X.
46+
# This means we will miss long-tails (exceptional jobs running for more than
47+
# X hours), but that's also the case with the count cutoff above.
48+
# Only solution to avoid missing any workflow would be to process the complete
49+
# list, which is not possible.
50+
GITHUB_WORKFLOW_MAX_CREATED_AGE_HOURS = 8
51+
52+
# Grafana will fail to insert any metric older than ~2 hours (value determined
53+
# by trial and error).
54+
GRAFANA_METRIC_MAX_AGE_MN = 120
4055

4156
@dataclass
4257
class JobMetrics:
4358
job_name: str
4459
queue_time: int
4560
run_time: int
4661
status: int
47-
created_at_ns: int
62+
completed_at_ns: int
4863
workflow_id: int
4964
workflow_name: str
5065

51-
5266
@dataclass
5367
class GaugeMetric:
5468
name: str
5569
value: int
5670
time_ns: int
5771

58-
def get_sampled_workflow_metrics(github_repo: github.Repository):
59-
"""Gets global statistics about the Github workflow queue
60-
61-
Args:
62-
github_repo: A github repo object to use to query the relevant information.
63-
64-
Returns:
65-
Returns a list of GaugeMetric objects, containing the relevant metrics about
66-
the workflow
67-
"""
68-
queued_count = collections.Counter()
69-
running_count = collections.Counter()
70-
71-
# Do not apply any filters to this query.
72-
# See https://github.com/orgs/community/discussions/86766
73-
# Applying filters like `status=completed` will break pagination, and
74-
# return a non-sorted and incomplete list of workflows.
75-
i = 0
76-
for task in iter(github_repo.get_workflow_runs()):
77-
if i > GITHUB_WORKFLOWS_COUNT_FOR_SAMPLING:
78-
break
79-
i += 1
80-
81-
if task.name not in GITHUB_WORKFLOW_TO_TRACK:
82-
continue
83-
84-
prefix_name = GITHUB_WORKFLOW_TO_TRACK[task.name]
85-
for job in task.jobs():
86-
if job.name not in GITHUB_JOB_TO_TRACK[prefix_name]:
87-
continue
88-
suffix_name = GITHUB_JOB_TO_TRACK[prefix_name][job.name]
89-
metric_name = f"{prefix_name}_{suffix_name}"
90-
91-
# Other states are available (pending, waiting, etc), but the meaning
92-
# is not documented (See #70540).
93-
# "queued" seems to be the info we want.
94-
if job.status == "queued":
95-
queued_count[metric_name] += 1
96-
elif job.status == "in_progress":
97-
running_count[metric_name] += 1
98-
99-
workflow_metrics = []
100-
for name, value in queued_count.items():
101-
workflow_metrics.append(
102-
GaugeMetric(f"workflow_queue_size_{name}", value, time.time_ns())
103-
)
104-
for name, value in running_count.items():
105-
workflow_metrics.append(
106-
GaugeMetric(f"running_workflow_count_{name}", value, time.time_ns())
107-
)
108-
109-
# Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
110-
workflow_metrics.append(
111-
GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
112-
)
113-
return workflow_metrics
114-
11572

116-
def get_per_workflow_metrics(github_repo: github.Repository, last_seen_workflow: str):
73+
def github_get_metrics(
74+
github_repo: github.Repository, last_workflows_seen_as_completed: set[int]
75+
):
11776
"""Gets the metrics for specified Github workflows.
11877
11978
This function takes in a list of workflows to track, and optionally the
@@ -132,47 +91,65 @@ def get_per_workflow_metrics(github_repo: github.Repository, last_seen_workflow:
13291
- the ID of the most recent processed workflow run.
13392
"""
13493
workflow_metrics = []
135-
most_recent_workflow_processed = None
94+
queued_count = collections.Counter()
95+
running_count = collections.Counter()
96+
97+
# The list of workflows this iteration will process.
98+
# MaxSize = GITHUB_WORKFLOWS_MAX_PROCESS_COUNT
99+
workflow_seen_as_completed = set()
100+
101+
# Since we process a fixed count of workflows, we want to know when
102+
# the depth is too small and if we miss workflows.
103+
# E.g.: is there was more than N workflows int last 2 hours.
104+
# To monitor this, we'll log the age of the oldest workflow processed,
105+
# and setup alterting in Grafana to help us adjust this depth.
106+
oldest_seen_workflow_age_mn = None
136107

137108
# Do not apply any filters to this query.
138109
# See https://github.com/orgs/community/discussions/86766
139110
# Applying filters like `status=completed` will break pagination, and
140111
# return a non-sorted and incomplete list of workflows.
112+
i = 0
141113
for task in iter(github_repo.get_workflow_runs()):
142-
# Ignoring non-completed workflows.
143-
if task.status != "completed":
144-
continue
145-
146-
# Record the most recent workflow we processed so this script
147-
# only processes it once.
148-
if most_recent_workflow_processed is None:
149-
most_recent_workflow_processed = task.id
150-
151-
# This condition only happens when this script starts:
152-
# this is used to determine a start point. Don't return any
153-
# metrics, just the most recent workflow ID.
154-
if last_seen_workflow is None:
114+
# Max depth reached, stopping.
115+
if i >= GITHUB_WORKFLOWS_MAX_PROCESS_COUNT:
155116
break
117+
i += 1
156118

157-
# This workflow has already been processed. We can stop now.
158-
if last_seen_workflow == task.id:
119+
workflow_age_mn = (
120+
datetime.datetime.now(datetime.timezone.utc) - task.created_at
121+
).total_seconds() / 60
122+
oldest_seen_workflow_age_mn = workflow_age_mn
123+
# If we reach a workflow older than X, stop.
124+
if workflow_age_mn > GITHUB_WORKFLOW_MAX_CREATED_AGE_HOURS * 60:
159125
break
160126

161127
# This workflow is not interesting to us.
162128
if task.name not in GITHUB_WORKFLOW_TO_TRACK:
163129
continue
164130

165-
name_prefix = GITHUB_WORKFLOW_TO_TRACK[task.name]
131+
if task.status == "completed":
132+
workflow_seen_as_completed.add(task.id)
133+
134+
# This workflow has already been seen completed in the previous run.
135+
if task.id in last_workflows_seen_as_completed:
136+
continue
166137

138+
name_prefix = GITHUB_WORKFLOW_TO_TRACK[task.name]
167139
for job in task.jobs():
168140
# This job is not interesting to us.
169141
if job.name not in GITHUB_JOB_TO_TRACK[name_prefix]:
170142
continue
171143

172144
name_suffix = GITHUB_JOB_TO_TRACK[name_prefix][job.name]
173-
created_at = job.created_at
174-
started_at = job.started_at
175-
completed_at = job.completed_at
145+
metric_name = name_prefix + "_" + name_suffix
146+
147+
if task.status != "completed":
148+
if job.status == "queued":
149+
queued_count[metric_name] += 1
150+
elif job.status == "in_progress":
151+
running_count[metric_name] += 1
152+
continue
176153

177154
job_result = int(job.conclusion == "success")
178155
if job_result:
@@ -188,31 +165,63 @@ def get_per_workflow_metrics(github_repo: github.Repository, last_seen_workflow:
188165
job_result = 0
189166
break
190167

168+
created_at = job.created_at
169+
started_at = job.started_at
170+
completed_at = job.completed_at
191171
queue_time = started_at - created_at
192172
run_time = completed_at - started_at
193-
194173
if run_time.seconds == 0:
195174
continue
196175

176+
# Grafana will refuse to ingest metrics older than ~2 hours, so we
177+
# should avoid sending historical data.
178+
metric_age_mn = (
179+
datetime.datetime.now(datetime.timezone.utc) - completed_at
180+
).total_seconds() / 60
181+
if metric_age_mn > GRAFANA_METRIC_MAX_AGE_MN:
182+
continue
183+
184+
logging.info(f"Adding a job metric for job {job.id} in workflow {task.id}")
197185
# The timestamp associated with the event is expected by Grafana to be
198186
# in nanoseconds.
199187
completed_at_ns = int(completed_at.timestamp()) * 10**9
200-
201-
logging.info(f"Adding a job metric for job {job.id} in workflow {task.id}")
202-
203188
workflow_metrics.append(
204189
JobMetrics(
205-
name_prefix + "_" + name_suffix,
190+
metric_name,
206191
queue_time.seconds,
207192
run_time.seconds,
208193
job_result,
209194
completed_at_ns,
210-
workflow_run.id,
211-
workflow_run.name,
195+
task.id,
196+
task.name,
212197
)
213198
)
214199

215-
return workflow_metrics, most_recent_workflow_processed
200+
for name, value in queued_count.items():
201+
workflow_metrics.append(
202+
GaugeMetric(f"workflow_queue_size_{name}", value, time.time_ns())
203+
)
204+
for name, value in running_count.items():
205+
workflow_metrics.append(
206+
GaugeMetric(f"running_workflow_count_{name}", value, time.time_ns())
207+
)
208+
209+
# Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
210+
workflow_metrics.append(
211+
GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
212+
)
213+
214+
# Log the oldest workflow we saw, allowing us to monitor if the processing
215+
# depth is correctly set-up.
216+
if oldest_seen_workflow_age_mn is not None:
217+
workflow_metrics.append(
218+
GaugeMetric(
219+
"github_oldest_processed_workflow_mn",
220+
oldest_seen_workflow_age_mn,
221+
time.time_ns(),
222+
)
223+
)
224+
return workflow_metrics, workflow_seen_as_completed
216225

217226

218227
def upload_metrics(workflow_metrics, metrics_userid, api_key):
@@ -241,7 +250,7 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
241250
elif isinstance(workflow_metric, JobMetrics):
242251
name = workflow_metric.job_name.lower().replace(" ", "_")
243252
metrics_batch.append(
244-
f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
253+
f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.completed_at_ns}"
245254
)
246255
else:
247256
raise ValueError(
@@ -267,20 +276,19 @@ def main():
267276
grafana_metrics_userid = os.environ["GRAFANA_METRICS_USERID"]
268277

269278
# The last workflow this script processed.
270-
github_last_seen_workflow = None
279+
# Because the Github queries are broken, we'll simply log a 'processed'
280+
# bit for the last COUNT_TO_PROCESS workflows.
281+
gh_last_workflows_seen_as_completed = set()
271282

272283
# Enter the main loop. Every five minutes we wake up and dump metrics for
273284
# the relevant jobs.
274285
while True:
275286
github_object = Github(auth=github_auth)
276287
github_repo = github_object.get_repo("llvm/llvm-project")
277288

278-
github_metrics, github_last_seen_workflow = get_per_workflow_metrics(
279-
github_repo, github_last_seen_workflow
289+
metrics, gh_last_workflows_seen_as_completed = github_get_metrics(
290+
github_repo, gh_last_workflows_seen_as_completed
280291
)
281-
sampled_metrics = get_sampled_workflow_metrics(github_repo)
282-
metrics = github_metrics + sampled_metrics
283-
284292
upload_metrics(metrics, grafana_metrics_userid, grafana_api_key)
285293
logging.info(f"Uploaded {len(metrics)} metrics")
286294

0 commit comments

Comments
 (0)