diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py index bd2b51154768d..a5f76428cb3cc 100644 --- a/.ci/metrics/metrics.py +++ b/.ci/metrics/metrics.py @@ -1,8 +1,11 @@ import requests +import dateutil +import json import time import os from dataclasses import dataclass import sys +import collections import logging import github @@ -12,10 +15,35 @@ GRAFANA_URL = ( "https://influx-prod-13-prod-us-east-0.grafana.net/api/v1/push/influx/write" ) -GITHUB_PROJECT = "llvm/llvm-project" -WORKFLOWS_TO_TRACK = ["LLVM Premerge Checks"] SCRAPE_INTERVAL_SECONDS = 5 * 60 +# Number of builds to fetch per page. Since we scrape regularly, this can +# remain small. +BUILDKITE_GRAPHQL_BUILDS_PER_PAGE = 10 + +# Lists the Github workflows we want to track. Maps the Github job name to +# the metric name prefix in grafana. +# This metric name is also used as a key in the job->name map. +GITHUB_WORKFLOW_TO_TRACK = {"LLVM Premerge Checks": "github_llvm_premerge_checks"} + +# Lists the Github jobs to track for a given workflow. The key is the stable +# name (metric name) of the workflow (see GITHUB_WORKFLOW_TO_TRACK). +# Each value is a map to link the github job name to the corresponding metric +# name. +GITHUB_JOB_TO_TRACK = { + "github_llvm_premerge_checks": { + "Linux Premerge Checks (Test Only - Please Ignore Results)": "premerge_linux", + "Windows Premerge Checks (Test Only - Please Ignore Results)": "premerge_windows", + } +} + +# Lists the BuildKite jobs we want to track. Maps the BuildKite job name to +# the metric name in Grafana. This is important not to lose metrics history +# if the workflow name changes. +BUILDKITE_WORKFLOW_TO_TRACK = { + ":linux: Linux x64": "buildkite_linux", + ":windows: Windows x64": "buildkite_windows", +} @dataclass class JobMetrics: @@ -35,6 +63,214 @@ class GaugeMetric: time_ns: int +def buildkite_fetch_page_build_list( + buildkite_token: str, after_cursor: str = None +) -> list[dict[str, str]]: + """Fetches a page of the build list using the GraphQL BuildKite API. + Returns the BUILDKITE_GRAPHQL_BUILDS_PER_PAGE last **finished** builds by + default, or the BUILDKITE_GRAPHQL_BUILDS_PER_PAGE **finished** builds + older than the one pointer by |cursor| if provided. + The |cursor| value is taken from the previous page returned by the API. + + The returned data had the following format: + + Args: + buildkite_token: the secret token to authenticate GraphQL requests. + after_cursor: cursor after which to start the page fetch. + + Returns: + The most recent builds after cursor (if set) with the following format: + [ + { + "cursor": , + "number": , + } + ] + """ + + BUILDKITE_GRAPHQL_QUERY = """ + query OrganizationShowQuery {{ + organization(slug: "llvm-project") {{ + pipelines(search: "Github pull requests", first: 1) {{ + edges {{ + node {{ + builds (state: [FAILED, PASSED], first: {PAGE_SIZE}, after: {AFTER}) {{ + edges {{ + cursor + node {{ + number + }} + }} + }} + }} + }} + }} + }} + }} + """ + data = BUILDKITE_GRAPHQL_QUERY.format( + PAGE_SIZE=BUILDKITE_GRAPHQL_BUILDS_PER_PAGE, + AFTER="null" if after_cursor is None else '"{}"'.format(after_cursor), + ) + data = data.replace("\n", "").replace('"', '\\"') + data = '{ "query": "' + data + '" }' + url = "https://graphql.buildkite.com/v1" + headers = { + "Authorization": "Bearer " + buildkite_token, + "Content-Type": "application/json", + } + r = requests.post(url, data=data, headers=headers) + data = r.json() + # De-nest the build list. + builds = data["data"]["organization"]["pipelines"]["edges"][0]["node"]["builds"][ + "edges" + ] + # Fold cursor info into the node dictionnary. + return [{**x["node"], "cursor": x["cursor"]} for x in builds] + + +def buildkite_get_build_info(build_number: str) -> dict: + """Returns all the info associated with the provided build number. + Note: for unknown reasons, graphql returns no jobs for a given build, + while this endpoint does, hence why this uses this API instead of graphql. + + Args: + build_number: which build number to fetch info for. + + Returns: + The info for the target build, a JSON dictionnary. + """ + + URL = "https://buildkite.com/llvm-project/github-pull-requests/builds/{}.json" + return requests.get(URL.format(build_number)).json() + + +def buildkite_get_builds_up_to(buildkite_token: str, last_cursor: str = None) -> list: + """Returns the last BUILDKITE_GRAPHQL_BUILDS_PER_PAGE builds by default, or + until the build pointed by |last_cursor| is found. + + Args: + buildkite_token: the secret token to authenticate GraphQL requests. + last_cursor: the cursor to stop at if set. If None, a full page is fetched. + """ + output = [] + cursor = None + + while True: + page = buildkite_fetch_page_build_list(buildkite_token, cursor) + # No cursor provided, return the first page. + if last_cursor is None: + return page + + # Cursor has been provided, check if present in this page. + match_index = None + for index, item in enumerate(page): + if item["cursor"] == last_cursor: + match_index = index + break + + # Not present, continue loading more pages. + if match_index is None: + output += page + cursor = page[-1]["cursor"] + continue + # Cursor found, keep results up to cursor + output += page[:match_index] + return output + + +def buildkite_get_metrics( + buildkite_token: str, last_cursor: str = None +) -> (list[JobMetrics], str): + """Returns a tuple with: + - the metrics to record until |last_cursor| is reached, or none if last cursor is None. + - the cursor of the most recent build processed. + + Args: + buildkite_token: the secret token to authenticate GraphQL requests. + last_cursor: the cursor to stop at if set. If None, a full page is fetched. + """ + builds = buildkite_get_builds_up_to(buildkite_token, last_cursor) + # Don't return any metrics if last_cursor is None. + # This happens when the program starts. + if last_cursor is None: + return [], builds[0]["cursor"] + + last_recorded_build = last_cursor + output = [] + for build in reversed(builds): + info = buildkite_get_build_info(build["number"]) + last_recorded_build = build["cursor"] + for job in info["jobs"]: + # Skip this job. + if job["name"] not in BUILDKITE_WORKFLOW_TO_TRACK: + continue + + created_at = dateutil.parser.isoparse(job["created_at"]) + scheduled_at = dateutil.parser.isoparse(job["scheduled_at"]) + started_at = dateutil.parser.isoparse(job["started_at"]) + finished_at = dateutil.parser.isoparse(job["finished_at"]) + + job_name = BUILDKITE_WORKFLOW_TO_TRACK[job["name"]] + queue_time = (started_at - scheduled_at).seconds + run_time = (finished_at - started_at).seconds + status = bool(job["passed"]) + finished_at_ns = int(finished_at.timestamp()) * 10**9 + workflow_id = build["number"] + workflow_name = "Github pull requests" + output.append( + JobMetrics( + job_name, + queue_time, + run_time, + status, + finished_at_ns, + workflow_id, + workflow_name, + ) + ) + + return output, last_recorded_build + + +def github_job_name_to_metric_name(workflow_name, job_name): + workflow_key = GITHUB_WORKFLOW_TO_TRACK[workflow_name] + job_key = GITHUB_JOB_TO_TRACK[workflow_key][job_name] + return f"{workflow_key}_{job_key}" + + +def github_count_queued_running_workflows(workflow_list): + """Returns the per-job count of running & queued jobs in the passed + workflow list. + + Args: + workflow_list: an iterable of workflows. + + Returns: + A tuple, (per-job-queue-size, per-job-running-count). The key + is the pretty job name, and the value the count of jobs. + """ + queued_count = collections.Counter() + running_count = collections.Counter() + + for workflow in workflow_list: + if workflow.name not in GITHUB_WORKFLOW_TO_TRACK: + continue + + workflow_key = GITHUB_WORKFLOW_TO_TRACK[workflow.name] + for job in workflow.jobs(): + if job.name not in GITHUB_JOB_TO_TRACK[workflow_key]: + continue + job_key = GITHUB_JOB_TO_TRACK[workflow_key][job.name] + metric_name = f"{workflow_key}_{job_key}" + + if job.status == "queued": + queued_count[metric_name] += 1 + elif job.status == "in_progress": + running_count[metric_name] += 1 + return queued_count, running_count + + def get_sampled_workflow_metrics(github_repo: github.Repository): """Gets global statistics about the Github workflow queue @@ -45,131 +281,83 @@ def get_sampled_workflow_metrics(github_repo: github.Repository): Returns a list of GaugeMetric objects, containing the relevant metrics about the workflow """ - queued_job_counts = {} - running_job_counts = {} - # Other states are available (pending, waiting, etc), but the meaning # is not documented (See #70540). # "queued" seems to be the info we want. - for queued_workflow in github_repo.get_workflow_runs(status="queued"): - if queued_workflow.name not in WORKFLOWS_TO_TRACK: - continue - for queued_workflow_job in queued_workflow.jobs(): - job_name = queued_workflow_job.name - # Workflows marked as queued can potentially only have some jobs - # queued, so make sure to also count jobs currently in progress. - if queued_workflow_job.status == "queued": - if job_name not in queued_job_counts: - queued_job_counts[job_name] = 1 - else: - queued_job_counts[job_name] += 1 - elif queued_workflow_job.status == "in_progress": - if job_name not in running_job_counts: - running_job_counts[job_name] = 1 - else: - running_job_counts[job_name] += 1 - - for running_workflow in github_repo.get_workflow_runs(status="in_progress"): - if running_workflow.name not in WORKFLOWS_TO_TRACK: - continue - for running_workflow_job in running_workflow.jobs(): - job_name = running_workflow_job.name - if running_workflow_job.status != "in_progress": - continue - - if job_name not in running_job_counts: - running_job_counts[job_name] = 1 - else: - running_job_counts[job_name] += 1 + queued_1, running_1 = github_count_queued_running_workflows( + github_repo.get_workflow_runs(status="queued") + ) + queued_2, running_2 = github_count_queued_running_workflows( + github_repo.get_workflow_runs(status="in_progress") + ) workflow_metrics = [] - for queued_job in queued_job_counts: + for key, value in (queued_1 + queued_2).items(): workflow_metrics.append( - GaugeMetric( - f"workflow_queue_size_{queued_job}", - queued_job_counts[queued_job], - time.time_ns(), - ) + GaugeMetric(f"workflow_queue_size_{key}", value, time.time_ns()) ) - for running_job in running_job_counts: + for key, value in (running_1 + running_2).items(): workflow_metrics.append( - GaugeMetric( - f"running_workflow_count_{running_job}", - running_job_counts[running_job], - time.time_ns(), - ) + GaugeMetric(f"running_workflow_count_{key}", value, time.time_ns()) ) - # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana. + + # Always send a hearbeat metric so we can monitor is this container is + # still able to log to Grafana. workflow_metrics.append( GaugeMetric("metrics_container_heartbeat", 1, time.time_ns()) ) return workflow_metrics -def get_per_workflow_metrics( - github_repo: github.Repository, workflows_to_track: dict[str, int] -): +def get_per_workflow_metrics(github_repo: github.Repository, last_workflow_id: str): """Gets the metrics for specified Github workflows. - This function takes in a list of workflows to track, and optionally the - workflow ID of the last tracked invocation. It grabs the relevant data - from Github, returning it to the caller. + This function loads the last workflows from GitHub up to + `last_workflow_id` and logs their metrics if they are referenced in + GITHUB_WORKFLOW_TO_TRACK. + The function returns a list of metrics, and the most recent processed + workflow. + If `last_workflow_id` is None, no metrics are returned, and the last + completed github workflow ID is returned. This is used once when the + program starts. Args: github_repo: A github repo object to use to query the relevant information. - workflows_to_track: A dictionary mapping workflow names to the last - invocation ID where metrics have been collected, or None to collect the - last five results. + last_workflow_id: the last workflow we checked. Returns: Returns a list of JobMetrics objects, containing the relevant metrics about the workflow. """ workflow_metrics = [] - - workflows_to_include = set(workflows_to_track.keys()) - - for workflow_run in iter(github_repo.get_workflow_runs()): - if len(workflows_to_include) == 0: + last_recorded_workflow = None + for workflow_run in iter(github_repo.get_workflow_runs(status="completed")): + # Record the first workflow of this list as the most recent one. + if last_recorded_workflow is None: + last_recorded_workflow = workflow_run.id + + # If we saw this workflow already, break. We also break if no + # workflow has been seen, as this means the script just started. + if last_workflow_id == workflow_run.id or last_workflow_id is None: break - if workflow_run.status != "completed": + # This workflow is not interesting to us. Skipping. + if workflow_run.name not in GITHUB_WORKFLOW_TO_TRACK: continue - # This workflow was already sampled for this run, or is not tracked at - # all. Ignoring. - if workflow_run.name not in workflows_to_include: - continue + workflow_key = GITHUB_WORKFLOW_TO_TRACK[workflow_run.name] - # There were no new workflow invocations since the previous scrape. - # The API returns a sorted list with the most recent invocations first, - # so we can stop looking for this particular workflow. Continue to grab - # information on the other workflows of interest, if present. - if workflows_to_track[workflow_run.name] == workflow_run.id: - workflows_to_include.remove(workflow_run.name) - continue - - workflow_jobs = workflow_run.jobs() - if workflow_jobs.totalCount == 0: - continue - - if ( - workflows_to_track[workflow_run.name] is None - or workflows_to_track[workflow_run.name] == workflow_run.id - ): - workflows_to_include.remove(workflow_run.name) - if ( - workflows_to_track[workflow_run.name] is not None - and len(workflows_to_include) == 0 - ): - break + for workflow_job in workflow_run.jobs(): + # This job is not interesting, skipping. + if workflow_job.name not in GITHUB_JOB_TO_TRACK[workflow_key]: + continue - for workflow_job in workflow_jobs: created_at = workflow_job.created_at started_at = workflow_job.started_at completed_at = workflow_job.completed_at - job_result = int(workflow_job.conclusion == "success") + job_key = GITHUB_JOB_TO_TRACK[workflow_key][workflow_job.name] + if job_result: # We still might want to mark the job as a failure if one of the steps # failed. This is required due to use setting continue-on-error in @@ -199,7 +387,7 @@ def get_per_workflow_metrics( workflow_metrics.append( JobMetrics( - workflow_run.name + "-" + workflow_job.name, + workflow_key + "_" + job_key, queue_time.seconds, run_time.seconds, job_result, @@ -209,8 +397,7 @@ def get_per_workflow_metrics( ) ) - return workflow_metrics - + return workflow_metrics, last_recorded_workflow def upload_metrics(workflow_metrics, metrics_userid, api_key): """Upload metrics to Grafana. @@ -260,13 +447,14 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key): def main(): # Authenticate with Github auth = Auth.Token(os.environ["GITHUB_TOKEN"]) - grafana_api_key = os.environ["GRAFANA_API_KEY"] grafana_metrics_userid = os.environ["GRAFANA_METRICS_USERID"] + buildkite_token = os.environ["BUILDKITE_TOKEN"] - workflows_to_track = {} - for workflow_to_track in WORKFLOWS_TO_TRACK: - workflows_to_track[workflow_to_track] = None + # This script only records workflows/jobs/builds finished after it + # started. So we need to keep track of the last known build. + buildkite_last_cursor = None + github_last_workflow_id = None # Enter the main loop. Every five minutes we wake up and dump metrics for # the relevant jobs. @@ -274,17 +462,17 @@ def main(): github_object = Github(auth=auth) github_repo = github_object.get_repo("llvm/llvm-project") - current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track) - current_metrics += get_sampled_workflow_metrics(github_repo) - - upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key) - logging.info(f"Uploaded {len(current_metrics)} metrics") + buildkite_metrics, buildkite_last_cursor = buildkite_get_metrics( + buildkite_token, buildkite_last_cursor + ) + github_metrics, github_last_workflow_id = get_per_workflow_metrics( + github_repo, github_last_workflow_id + ) + sampled_metrics = get_sampled_workflow_metrics(github_repo) - for workflow_metric in reversed(current_metrics): - if isinstance(workflow_metric, JobMetrics): - workflows_to_track[ - workflow_metric.workflow_name - ] = workflow_metric.workflow_id + metrics = buildkite_metrics + github_metrics + sampled_metrics + upload_metrics(metrics, grafana_metrics_userid, grafana_api_key) + logging.info(f"Uploaded {len(metrics)} metrics") time.sleep(SCRAPE_INTERVAL_SECONDS)