Skip to content

[CI] Rework github workflow processing #130317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 11, 2025
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 113 additions & 115 deletions .ci/metrics/metrics.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import requests
import collections
import time
import os
from dataclasses import dataclass
Expand All @@ -12,9 +13,29 @@
GRAFANA_URL = (
"https://influx-prod-13-prod-us-east-0.grafana.net/api/v1/push/influx/write"
)
GITHUB_PROJECT = "llvm/llvm-project"
WORKFLOWS_TO_TRACK = ["LLVM Premerge Checks"]
SCRAPE_INTERVAL_SECONDS = 5 * 60
SCRAPE_INTERVAL_SECONDS = 60

# Lists the Github workflows we want to track. Maps the Github job name to
# the metric name prefix in grafana.
# This metric name is also used as a key in the job->name map.
GITHUB_WORKFLOW_TO_TRACK = {"LLVM Premerge Checks": "github_llvm_premerge_checks"}

# Lists the Github jobs to track for a given workflow. The key is the stable
# name (metric name) of the workflow (see GITHUB_WORKFLOW_TO_TRACK).
# Each value is a map to link the github job name to the corresponding metric
# name.
GITHUB_JOB_TO_TRACK = {
"github_llvm_premerge_checks": {
"Linux Premerge Checks (Test Only - Please Ignore Results)": "premerge_linux",
"Windows Premerge Checks (Test Only - Please Ignore Results)": "premerge_windows",
}
}

# The number of workflows to pull when sampling queue size & running count.
# Filtering at the query level doesn't work, and thus sampling workflow counts
# cannot be done in a clean way.
# If we miss running/queued workflows, we might want to bump this value.
GITHUB_WORKFLOWS_COUNT_FOR_SAMPLING = 200


@dataclass
Expand All @@ -34,7 +55,6 @@ class GaugeMetric:
value: int
time_ns: int


def get_sampled_workflow_metrics(github_repo: github.Repository):
"""Gets global statistics about the Github workflow queue

Expand All @@ -45,131 +65,116 @@ def get_sampled_workflow_metrics(github_repo: github.Repository):
Returns a list of GaugeMetric objects, containing the relevant metrics about
the workflow
"""
queued_job_counts = {}
running_job_counts = {}

# Other states are available (pending, waiting, etc), but the meaning
# is not documented (See #70540).
# "queued" seems to be the info we want.
for queued_workflow in github_repo.get_workflow_runs(status="queued"):
if queued_workflow.name not in WORKFLOWS_TO_TRACK:
continue
for queued_workflow_job in queued_workflow.jobs():
job_name = queued_workflow_job.name
# Workflows marked as queued can potentially only have some jobs
# queued, so make sure to also count jobs currently in progress.
if queued_workflow_job.status == "queued":
if job_name not in queued_job_counts:
queued_job_counts[job_name] = 1
else:
queued_job_counts[job_name] += 1
elif queued_workflow_job.status == "in_progress":
if job_name not in running_job_counts:
running_job_counts[job_name] = 1
else:
running_job_counts[job_name] += 1

for running_workflow in github_repo.get_workflow_runs(status="in_progress"):
if running_workflow.name not in WORKFLOWS_TO_TRACK:
queued_count = collections.Counter()
running_count = collections.Counter()

# Do not apply any filters to this query.
# See https://github.com/orgs/community/discussions/86766
# Applying filters like `status=completed` will break pagination, and
# return a non-sorted and incomplete list of workflows.
i = 0
for task in iter(github_repo.get_workflow_runs()):
if i > GITHUB_WORKFLOWS_COUNT_FOR_SAMPLING:
break
i += 1

if task.name not in GITHUB_WORKFLOW_TO_TRACK:
continue
for running_workflow_job in running_workflow.jobs():
job_name = running_workflow_job.name
if running_workflow_job.status != "in_progress":

prefix_name = GITHUB_WORKFLOW_TO_TRACK[task.name]
for job in task.jobs():
if job.name not in GITHUB_JOB_TO_TRACK[prefix_name]:
continue
suffix_name = GITHUB_JOB_TO_TRACK[prefix_name][job.name]
metric_name = f"{prefix_name}_{suffix_name}"

if job_name not in running_job_counts:
running_job_counts[job_name] = 1
else:
running_job_counts[job_name] += 1
# Other states are available (pending, waiting, etc), but the meaning
# is not documented (See #70540).
# "queued" seems to be the info we want.
if job.status == "queued":
queued_count[metric_name] += 1
elif job.status == "in_progress":
running_count[metric_name] += 1

workflow_metrics = []
for queued_job in queued_job_counts:
for name, value in queued_count.items():
workflow_metrics.append(
GaugeMetric(
f"workflow_queue_size_{queued_job}",
queued_job_counts[queued_job],
time.time_ns(),
)
GaugeMetric(f"workflow_queue_size_{name}", value, time.time_ns())
)
for running_job in running_job_counts:
for name, value in running_count.items():
workflow_metrics.append(
GaugeMetric(
f"running_workflow_count_{running_job}",
running_job_counts[running_job],
time.time_ns(),
)
GaugeMetric(f"running_workflow_count_{name}", value, time.time_ns())
)

# Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
workflow_metrics.append(
GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
)
return workflow_metrics


def get_per_workflow_metrics(
github_repo: github.Repository, workflows_to_track: dict[str, int]
):
def get_per_workflow_metrics(github_repo: github.Repository, last_seen_workflow: str):
"""Gets the metrics for specified Github workflows.

This function takes in a list of workflows to track, and optionally the
workflow ID of the last tracked invocation. It grabs the relevant data
from Github, returning it to the caller.
If the last_seen_workflow parameter is None, this returns no metrics, but
returns the id of the most recent workflow.

Args:
github_repo: A github repo object to use to query the relevant information.
workflows_to_track: A dictionary mapping workflow names to the last
invocation ID where metrics have been collected, or None to collect the
last five results.
last_seen_workflow: the last workflow this function processed.

Returns:
Returns a list of JobMetrics objects, containing the relevant metrics about
the workflow.
Returns a tuple with 2 elements:
- a list of JobMetrics objects, one per processed job.
- the ID of the most recent processed workflow run.
"""
workflow_metrics = []
most_recent_workflow_processed = None

# Do not apply any filters to this query.
# See https://github.com/orgs/community/discussions/86766
# Applying filters like `status=completed` will break pagination, and
# return a non-sorted and incomplete list of workflows.
for task in iter(github_repo.get_workflow_runs()):
# Ignoring non-completed workflows.
if task.status != "completed":
continue

workflows_to_include = set(workflows_to_track.keys())
# Record the most recent workflow we processed so this script
# only processes it once.
if most_recent_workflow_processed is None:
most_recent_workflow_processed = task.id

for workflow_run in iter(github_repo.get_workflow_runs()):
if len(workflows_to_include) == 0:
# This condition only happens when this script starts:
# this is used to determine a start point. Don't return any
# metrics, just the most recent workflow ID.
if last_seen_workflow is None:
break

if workflow_run.status != "completed":
continue

# This workflow was already sampled for this run, or is not tracked at
# all. Ignoring.
if workflow_run.name not in workflows_to_include:
continue
# This workflow has already been processed. We can stop now.
if last_seen_workflow == task.id:
break

# There were no new workflow invocations since the previous scrape.
# The API returns a sorted list with the most recent invocations first,
# so we can stop looking for this particular workflow. Continue to grab
# information on the other workflows of interest, if present.
if workflows_to_track[workflow_run.name] == workflow_run.id:
workflows_to_include.remove(workflow_run.name)
# This workflow is not interesting to us.
if task.name not in GITHUB_WORKFLOW_TO_TRACK:
continue

workflow_jobs = workflow_run.jobs()
if workflow_jobs.totalCount == 0:
continue
name_prefix = GITHUB_WORKFLOW_TO_TRACK[task.name]

if (
workflows_to_track[workflow_run.name] is None
or workflows_to_track[workflow_run.name] == workflow_run.id
):
workflows_to_include.remove(workflow_run.name)
if (
workflows_to_track[workflow_run.name] is not None
and len(workflows_to_include) == 0
):
break
for job in task.jobs():
# This job is not interesting to us.
if job.name not in GITHUB_JOB_TO_TRACK[name_prefix]:
continue

for workflow_job in workflow_jobs:
created_at = workflow_job.created_at
started_at = workflow_job.started_at
completed_at = workflow_job.completed_at
name_suffix = GITHUB_JOB_TO_TRACK[name_prefix][job.name]
created_at = job.created_at
started_at = job.started_at
completed_at = job.completed_at

job_result = int(workflow_job.conclusion == "success")
job_result = int(job.conclusion == "success")
if job_result:
# We still might want to mark the job as a failure if one of the steps
# failed. This is required due to use setting continue-on-error in
Expand All @@ -178,7 +183,7 @@ def get_per_workflow_metrics(
# TODO(boomanaiden154): Remove this once the premerge pipeline is no
# longer in a testing state and we can directly assert the workflow
# result.
for step in workflow_job.steps:
for step in job.steps:
if step.conclusion != "success" and step.conclusion != "skipped":
job_result = 0
break
Expand All @@ -191,25 +196,23 @@ def get_per_workflow_metrics(

# The timestamp associated with the event is expected by Grafana to be
# in nanoseconds.
created_at_ns = int(created_at.timestamp()) * 10**9
completed_at_ns = int(completed_at.timestamp()) * 10**9

logging.info(
f"Adding a job metric for job {workflow_job.id} in workflow {workflow_run.id}"
)
logging.info(f"Adding a job metric for job {job.id} in workflow {task.id}")

workflow_metrics.append(
JobMetrics(
workflow_run.name + "-" + workflow_job.name,
name_prefix + "_" + name_suffix,
queue_time.seconds,
run_time.seconds,
job_result,
created_at_ns,
completed_at_ns,
workflow_run.id,
workflow_run.name,
)
)

return workflow_metrics
return workflow_metrics, most_recent_workflow_processed


def upload_metrics(workflow_metrics, metrics_userid, api_key):
Expand Down Expand Up @@ -259,32 +262,27 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):

def main():
# Authenticate with Github
auth = Auth.Token(os.environ["GITHUB_TOKEN"])

github_auth = Auth.Token(os.environ["GITHUB_TOKEN"])
grafana_api_key = os.environ["GRAFANA_API_KEY"]
grafana_metrics_userid = os.environ["GRAFANA_METRICS_USERID"]

workflows_to_track = {}
for workflow_to_track in WORKFLOWS_TO_TRACK:
workflows_to_track[workflow_to_track] = None
# The last workflow this script processed.
github_last_seen_workflow = None

# Enter the main loop. Every five minutes we wake up and dump metrics for
# the relevant jobs.
while True:
github_object = Github(auth=auth)
github_object = Github(auth=github_auth)
github_repo = github_object.get_repo("llvm/llvm-project")

current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
current_metrics += get_sampled_workflow_metrics(github_repo)

upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
logging.info(f"Uploaded {len(current_metrics)} metrics")
github_metrics, github_last_seen_workflow = get_per_workflow_metrics(
github_repo, github_last_seen_workflow
)
sampled_metrics = get_sampled_workflow_metrics(github_repo)
metrics = github_metrics + sampled_metrics

for workflow_metric in reversed(current_metrics):
if isinstance(workflow_metric, JobMetrics):
workflows_to_track[
workflow_metric.workflow_name
] = workflow_metric.workflow_id
upload_metrics(metrics, grafana_metrics_userid, grafana_api_key)
logging.info(f"Uploaded {len(metrics)} metrics")

time.sleep(SCRAPE_INTERVAL_SECONDS)

Expand Down