Skip to content

Commit 5f79857

Browse files
committed
[CI] Add queue size, running count metrics
This commits allows the container to report 3 additional metrics at every sampling event: - a heartbeat - the size of the workflow queue (filtered) - the number of running workflows (filtered) The heartbeat is a simple metric allowing us to monitor the metrics health. Before this commit, a new metrics was pushed only when a workflow was completed. This meant we had to wait a few hours before noticing if the metrics container was unable to push metrics. In addition to this, this commits adds a sampling of the workflow queue size and running count. This should allow us to better understand the load, and improve the autoscale values we pick for the cluster. Signed-off-by: Nathan Gauër <[email protected]>
1 parent 05f9cdd commit 5f79857

File tree

1 file changed

+83
-14
lines changed

1 file changed

+83
-14
lines changed

.ci/metrics/metrics.py

Lines changed: 83 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,67 @@ class JobMetrics:
2626
workflow_id: int
2727

2828

29-
def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, int]):
29+
@dataclass
30+
class GaugeMetric:
31+
name: str
32+
value: int
33+
time_ns: int
34+
35+
36+
def get_sampled_workflow_metrics(github_repo: github.Repository):
37+
"""Gets global statistics about the Github workflow queue
38+
39+
Args:
40+
github_repo: A github repo object to use to query the relevant information.
41+
42+
Returns:
43+
Returns a list of GaugeMetric objects, containing the relevant metrics about
44+
the workflow
45+
"""
46+
47+
# Other states are available (pending, waiting, etc), but the meaning
48+
# is not documented (See #70540).
49+
# "queued" seems to be the info we want.
50+
queued_workflow_count = len(
51+
[
52+
x
53+
for x in github_repo.get_workflow_runs(status="queued")
54+
if x.name in WORKFLOWS_TO_TRACK
55+
]
56+
)
57+
running_workflow_count = len(
58+
[
59+
x
60+
for x in github_repo.get_workflow_runs(status="in_progress")
61+
if x.name in WORKFLOWS_TO_TRACK
62+
]
63+
)
64+
65+
workflow_metrics = []
66+
workflow_metrics.append(
67+
GaugeMetric(
68+
"workflow_queue_size",
69+
queued_workflow_count,
70+
time.time_ns(),
71+
)
72+
)
73+
workflow_metrics.append(
74+
GaugeMetric(
75+
"running_workflow_count",
76+
running_workflow_count,
77+
time.time_ns(),
78+
)
79+
)
80+
# Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
81+
workflow_metrics.append(
82+
GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
83+
)
84+
return workflow_metrics
85+
86+
87+
def get_per_workflow_metrics(
88+
github_repo: github.Repository, workflows_to_track: dict[str, int]
89+
):
3090
"""Gets the metrics for specified Github workflows.
3191
3292
This function takes in a list of workflows to track, and optionally the
@@ -43,14 +103,14 @@ def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, in
43103
Returns a list of JobMetrics objects, containing the relevant metrics about
44104
the workflow.
45105
"""
46-
workflow_runs = iter(github_repo.get_workflow_runs())
47-
48106
workflow_metrics = []
49107

50108
workflows_to_include = set(workflows_to_track.keys())
51109

52-
while len(workflows_to_include) > 0:
53-
workflow_run = next(workflow_runs)
110+
for workflow_run in iter(github_repo.get_workflow_runs()):
111+
if len(workflows_to_include) == 0:
112+
break
113+
54114
if workflow_run.status != "completed":
55115
continue
56116

@@ -141,10 +201,16 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
141201
"""
142202
metrics_batch = []
143203
for workflow_metric in workflow_metrics:
144-
workflow_formatted_name = workflow_metric.job_name.lower().replace(" ", "_")
145-
metrics_batch.append(
146-
f"{workflow_formatted_name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
147-
)
204+
if isinstance(workflow_metric, GaugeMetric):
205+
name = workflow_metric.name.lower().replace(" ", "_")
206+
metrics_batch.append(
207+
f"{name} value={workflow_metric.value} {workflow_metric.time_ns}"
208+
)
209+
else:
210+
name = workflow_metric.job_name.lower().replace(" ", "_")
211+
metrics_batch.append(
212+
f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
213+
)
148214

149215
request_data = "\n".join(metrics_batch)
150216
response = requests.post(
@@ -176,16 +242,19 @@ def main():
176242
# Enter the main loop. Every five minutes we wake up and dump metrics for
177243
# the relevant jobs.
178244
while True:
179-
current_metrics = get_metrics(github_repo, workflows_to_track)
245+
current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
246+
current_metrics += get_sampled_workflow_metrics(github_repo)
180247
if len(current_metrics) == 0:
181-
print("No metrics found to upload.", file=sys.stderr)
182-
continue
248+
print("No metrics found to upload.", file=sys.stdout)
183249

184250
upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
185-
print(f"Uploaded {len(current_metrics)} metrics", file=sys.stderr)
251+
print(f"Uploaded {len(current_metrics)} metrics", file=sys.stdout)
186252

187253
for workflow_metric in reversed(current_metrics):
188-
workflows_to_track[workflow_metric.job_name] = workflow_metric.workflow_id
254+
if isinstance(workflow_metric, JobMetrics):
255+
workflows_to_track[
256+
workflow_metric.job_name
257+
] = workflow_metric.workflow_id
189258

190259
time.sleep(SCRAPE_INTERVAL_SECONDS)
191260

0 commit comments

Comments
 (0)