Skip to content

[CI] Extend metrics container to log BuildKite metrics #130996

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 14, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 211 additions & 2 deletions .ci/metrics/metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import collections
import datetime
import dateutil
import github
import json
import logging
import os
import requests
Expand Down Expand Up @@ -53,6 +55,18 @@
# by trial and error).
GRAFANA_METRIC_MAX_AGE_MN = 120

# Lists the BuildKite jobs we want to track. Maps the BuildKite job name to
# the metric name in Grafana. This is important not to lose metrics history
# if the workflow name changes.
BUILDKITE_WORKFLOW_TO_TRACK = {
":linux: Linux x64": "buildkite_linux",
":windows: Windows x64": "buildkite_windows",
}

# Number of builds to fetch per page. Since we scrape regularly, this can
# remain small.
BUILDKITE_GRAPHQL_BUILDS_PER_PAGE = 50

@dataclass
class JobMetrics:
job_name: str
Expand All @@ -70,6 +84,191 @@ class GaugeMetric:
time_ns: int


def buildkite_fetch_page_build_list(
buildkite_token: str, after_cursor: str = None
) -> list[dict[str, str]]:
"""Fetches a page of the build list using the GraphQL BuildKite API.

Returns the BUILDKITE_GRAPHQL_BUILDS_PER_PAGE last running/queued builds,
or the BUILDKITE_GRAPHQL_BUILDS_PER_PAGE running/queued builds
older than the one pointer by |after_cursor| if provided.
The |after_cursor| value is taken from the previous page returned by the
API.

Args:
buildkite_token: the secret token to authenticate GraphQL requests.
after_cursor: cursor after which to start the page fetch.

Returns:
The most recent builds after cursor (if set) with the following format:
[
{
"cursor": <value>,
"number": <build-number>,
}
]
"""

BUILDKITE_GRAPHQL_QUERY = """
query OrganizationShowQuery {{
organization(slug: "llvm-project") {{
pipelines(search: "Github pull requests", first: 1) {{
edges {{
node {{
builds (state: [CANCELING, CREATING, FAILING, RUNNING], first: {PAGE_SIZE}, after: {AFTER}) {{
edges {{
cursor
node {{
number
}}
}}
}}
}}
}}
}}
}}
}}
"""
query = BUILDKITE_GRAPHQL_QUERY.format(
PAGE_SIZE=BUILDKITE_GRAPHQL_BUILDS_PER_PAGE,
AFTER="null" if after_cursor is None else '"{}"'.format(after_cursor),
)
query = json.dumps({"query": query})
url = "https://graphql.buildkite.com/v1"
headers = {
"Authorization": "Bearer " + buildkite_token,
"Content-Type": "application/json",
}
data = requests.post(url, data=query, headers=headers).json()
# De-nest the build list.
if "errors" in data:
logging.info("Failed to fetch BuildKite jobs: {}".format(data["errors"]))
return []
builds = data["data"]["organization"]["pipelines"]["edges"][0]["node"]["builds"][
"edges"
]
# Fold cursor info into the node dictionnary.
return [{**x["node"], "cursor": x["cursor"]} for x in builds]


def buildkite_get_build_info(build_number: str) -> dict:
"""Returns all the info associated with the provided build number.

Note: for unknown reasons, graphql returns no jobs for a given build,
while this endpoint does, hence why this uses this API instead of graphql.

Args:
build_number: which build number to fetch info for.

Returns:
The info for the target build, a JSON dictionnary.
"""

URL = "https://buildkite.com/llvm-project/github-pull-requests/builds/{}.json"
return requests.get(URL.format(build_number)).json()


def buildkite_get_incomplete_tasks(buildkite_token: str) -> list:
"""Returns all the running/pending BuildKite builds.

Args:
buildkite_token: the secret token to authenticate GraphQL requests.
last_cursor: the cursor to stop at if set. If None, a full page is fetched.
"""
output = []
cursor = None
while True:
page = buildkite_fetch_page_build_list(buildkite_token, cursor)
if len(page) == 0:
break
cursor = page[-1]["cursor"]
output += page
return output


def buildkite_get_metrics(
buildkite_token: str, previously_incomplete: set[int]
) -> (list[JobMetrics], set[int]):
"""Returns a tuple with:

- the metrics recorded for newly completed workflow jobs.
- the set of workflow still running now.

Args:
buildkite_token: the secret token to authenticate GraphQL requests.
previously_incomplete: the set of running workflows the last time this
function was called.
"""

running_builds = buildkite_get_incomplete_tasks(buildkite_token)
incomplete_now = set([x["number"] for x in running_builds])
output = []

for build_id in previously_incomplete:
if build_id in incomplete_now:
continue

info = buildkite_get_build_info(build_id)
metric_timestamp = dateutil.parser.isoparse(info["finished_at"])
for job in info["jobs"]:
# This workflow is not interesting to us.
if job["name"] not in BUILDKITE_WORKFLOW_TO_TRACK:
continue

created_at = dateutil.parser.isoparse(job["created_at"])
scheduled_at = (
created_at
if job["scheduled_at"] is None
else dateutil.parser.isoparse(job["scheduled_at"])
)
started_at = (
scheduled_at
if job["started_at"] is None
else dateutil.parser.isoparse(job["started_at"])
)
if job["canceled_at"] is None:
finished_at = (
started_at
if job["finished_at"] is None
else dateutil.parser.isoparse(job["finished_at"])
)
else:
finished_at = dateutil.parser.isoparse(job["canceled_at"])

job_name = BUILDKITE_WORKFLOW_TO_TRACK[job["name"]]
queue_time = (started_at - scheduled_at).seconds
run_time = (finished_at - started_at).seconds
status = bool(job["passed"])

# Grafana will refuse to ingest metrics older than ~2 hours, so we
# should avoid sending historical data.
metric_age_mn = (
datetime.datetime.now(datetime.timezone.utc) - metric_timestamp
).total_seconds() / 60
if metric_age_mn > GRAFANA_METRIC_MAX_AGE_MN:
logging.warning(
f"Job {job['name']} from workflow {build_id} dropped due"
+ f" to staleness: {metric_age_mn}mn old."
)
continue

metric_timestamp_ns = int(metric_timestamp.timestamp()) * 10**9
workflow_id = build_id
workflow_name = "Github pull requests"
output.append(
JobMetrics(
job_name,
queue_time,
run_time,
status,
metric_timestamp_ns,
workflow_id,
workflow_name,
)
)

return output, incomplete_now

def github_get_metrics(
github_repo: github.Repository, last_workflows_seen_as_completed: set[int]
) -> tuple[list[JobMetrics], int]:
Expand Down Expand Up @@ -195,7 +394,7 @@ def github_get_metrics(
datetime.datetime.now(datetime.timezone.utc) - completed_at
).total_seconds() / 60
if metric_age_mn > GRAFANA_METRIC_MAX_AGE_MN:
logging.info(
logging.warning(
f"Job {job.id} from workflow {task.id} dropped due"
+ f" to staleness: {metric_age_mn}mn old."
)
Expand Down Expand Up @@ -292,23 +491,33 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
def main():
# Authenticate with Github
github_auth = Auth.Token(os.environ["GITHUB_TOKEN"])
buildkite_token = os.environ["BUILDKITE_TOKEN"]
grafana_api_key = os.environ["GRAFANA_API_KEY"]
grafana_metrics_userid = os.environ["GRAFANA_METRICS_USERID"]

# The last workflow this script processed.
# Because the Github queries are broken, we'll simply log a 'processed'
# bit for the last COUNT_TO_PROCESS workflows.
gh_last_workflows_seen_as_completed = set()
# Stores the list of pending/running builds in BuildKite we need to check
# at the next iteration.
bk_incomplete = set()

# Enter the main loop. Every five minutes we wake up and dump metrics for
# the relevant jobs.
while True:
github_object = Github(auth=github_auth)
github_repo = github_object.get_repo("llvm/llvm-project")

metrics, gh_last_workflows_seen_as_completed = github_get_metrics(
gh_metrics, gh_last_workflows_seen_as_completed = github_get_metrics(
github_repo, gh_last_workflows_seen_as_completed
)

bk_metrics, bk_incomplete = buildkite_get_metrics(
buildkite_token, bk_incomplete
)

metrics = gh_metrics + bk_metrics
upload_metrics(metrics, grafana_metrics_userid, grafana_api_key)
logging.info(f"Uploaded {len(metrics)} metrics")

Expand Down