diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py index a5f76428cb3cc..bd2b51154768d 100644 --- a/.ci/metrics/metrics.py +++ b/.ci/metrics/metrics.py @@ -1,11 +1,8 @@ import requests -import dateutil -import json import time import os from dataclasses import dataclass import sys -import collections import logging import github @@ -15,35 +12,10 @@ GRAFANA_URL = ( "https://influx-prod-13-prod-us-east-0.grafana.net/api/v1/push/influx/write" ) +GITHUB_PROJECT = "llvm/llvm-project" +WORKFLOWS_TO_TRACK = ["LLVM Premerge Checks"] SCRAPE_INTERVAL_SECONDS = 5 * 60 -# Number of builds to fetch per page. Since we scrape regularly, this can -# remain small. -BUILDKITE_GRAPHQL_BUILDS_PER_PAGE = 10 - -# Lists the Github workflows we want to track. Maps the Github job name to -# the metric name prefix in grafana. -# This metric name is also used as a key in the job->name map. -GITHUB_WORKFLOW_TO_TRACK = {"LLVM Premerge Checks": "github_llvm_premerge_checks"} - -# Lists the Github jobs to track for a given workflow. The key is the stable -# name (metric name) of the workflow (see GITHUB_WORKFLOW_TO_TRACK). -# Each value is a map to link the github job name to the corresponding metric -# name. -GITHUB_JOB_TO_TRACK = { - "github_llvm_premerge_checks": { - "Linux Premerge Checks (Test Only - Please Ignore Results)": "premerge_linux", - "Windows Premerge Checks (Test Only - Please Ignore Results)": "premerge_windows", - } -} - -# Lists the BuildKite jobs we want to track. Maps the BuildKite job name to -# the metric name in Grafana. This is important not to lose metrics history -# if the workflow name changes. -BUILDKITE_WORKFLOW_TO_TRACK = { - ":linux: Linux x64": "buildkite_linux", - ":windows: Windows x64": "buildkite_windows", -} @dataclass class JobMetrics: @@ -63,214 +35,6 @@ class GaugeMetric: time_ns: int -def buildkite_fetch_page_build_list( - buildkite_token: str, after_cursor: str = None -) -> list[dict[str, str]]: - """Fetches a page of the build list using the GraphQL BuildKite API. - Returns the BUILDKITE_GRAPHQL_BUILDS_PER_PAGE last **finished** builds by - default, or the BUILDKITE_GRAPHQL_BUILDS_PER_PAGE **finished** builds - older than the one pointer by |cursor| if provided. - The |cursor| value is taken from the previous page returned by the API. - - The returned data had the following format: - - Args: - buildkite_token: the secret token to authenticate GraphQL requests. - after_cursor: cursor after which to start the page fetch. - - Returns: - The most recent builds after cursor (if set) with the following format: - [ - { - "cursor": , - "number": , - } - ] - """ - - BUILDKITE_GRAPHQL_QUERY = """ - query OrganizationShowQuery {{ - organization(slug: "llvm-project") {{ - pipelines(search: "Github pull requests", first: 1) {{ - edges {{ - node {{ - builds (state: [FAILED, PASSED], first: {PAGE_SIZE}, after: {AFTER}) {{ - edges {{ - cursor - node {{ - number - }} - }} - }} - }} - }} - }} - }} - }} - """ - data = BUILDKITE_GRAPHQL_QUERY.format( - PAGE_SIZE=BUILDKITE_GRAPHQL_BUILDS_PER_PAGE, - AFTER="null" if after_cursor is None else '"{}"'.format(after_cursor), - ) - data = data.replace("\n", "").replace('"', '\\"') - data = '{ "query": "' + data + '" }' - url = "https://graphql.buildkite.com/v1" - headers = { - "Authorization": "Bearer " + buildkite_token, - "Content-Type": "application/json", - } - r = requests.post(url, data=data, headers=headers) - data = r.json() - # De-nest the build list. - builds = data["data"]["organization"]["pipelines"]["edges"][0]["node"]["builds"][ - "edges" - ] - # Fold cursor info into the node dictionnary. - return [{**x["node"], "cursor": x["cursor"]} for x in builds] - - -def buildkite_get_build_info(build_number: str) -> dict: - """Returns all the info associated with the provided build number. - Note: for unknown reasons, graphql returns no jobs for a given build, - while this endpoint does, hence why this uses this API instead of graphql. - - Args: - build_number: which build number to fetch info for. - - Returns: - The info for the target build, a JSON dictionnary. - """ - - URL = "https://buildkite.com/llvm-project/github-pull-requests/builds/{}.json" - return requests.get(URL.format(build_number)).json() - - -def buildkite_get_builds_up_to(buildkite_token: str, last_cursor: str = None) -> list: - """Returns the last BUILDKITE_GRAPHQL_BUILDS_PER_PAGE builds by default, or - until the build pointed by |last_cursor| is found. - - Args: - buildkite_token: the secret token to authenticate GraphQL requests. - last_cursor: the cursor to stop at if set. If None, a full page is fetched. - """ - output = [] - cursor = None - - while True: - page = buildkite_fetch_page_build_list(buildkite_token, cursor) - # No cursor provided, return the first page. - if last_cursor is None: - return page - - # Cursor has been provided, check if present in this page. - match_index = None - for index, item in enumerate(page): - if item["cursor"] == last_cursor: - match_index = index - break - - # Not present, continue loading more pages. - if match_index is None: - output += page - cursor = page[-1]["cursor"] - continue - # Cursor found, keep results up to cursor - output += page[:match_index] - return output - - -def buildkite_get_metrics( - buildkite_token: str, last_cursor: str = None -) -> (list[JobMetrics], str): - """Returns a tuple with: - - the metrics to record until |last_cursor| is reached, or none if last cursor is None. - - the cursor of the most recent build processed. - - Args: - buildkite_token: the secret token to authenticate GraphQL requests. - last_cursor: the cursor to stop at if set. If None, a full page is fetched. - """ - builds = buildkite_get_builds_up_to(buildkite_token, last_cursor) - # Don't return any metrics if last_cursor is None. - # This happens when the program starts. - if last_cursor is None: - return [], builds[0]["cursor"] - - last_recorded_build = last_cursor - output = [] - for build in reversed(builds): - info = buildkite_get_build_info(build["number"]) - last_recorded_build = build["cursor"] - for job in info["jobs"]: - # Skip this job. - if job["name"] not in BUILDKITE_WORKFLOW_TO_TRACK: - continue - - created_at = dateutil.parser.isoparse(job["created_at"]) - scheduled_at = dateutil.parser.isoparse(job["scheduled_at"]) - started_at = dateutil.parser.isoparse(job["started_at"]) - finished_at = dateutil.parser.isoparse(job["finished_at"]) - - job_name = BUILDKITE_WORKFLOW_TO_TRACK[job["name"]] - queue_time = (started_at - scheduled_at).seconds - run_time = (finished_at - started_at).seconds - status = bool(job["passed"]) - finished_at_ns = int(finished_at.timestamp()) * 10**9 - workflow_id = build["number"] - workflow_name = "Github pull requests" - output.append( - JobMetrics( - job_name, - queue_time, - run_time, - status, - finished_at_ns, - workflow_id, - workflow_name, - ) - ) - - return output, last_recorded_build - - -def github_job_name_to_metric_name(workflow_name, job_name): - workflow_key = GITHUB_WORKFLOW_TO_TRACK[workflow_name] - job_key = GITHUB_JOB_TO_TRACK[workflow_key][job_name] - return f"{workflow_key}_{job_key}" - - -def github_count_queued_running_workflows(workflow_list): - """Returns the per-job count of running & queued jobs in the passed - workflow list. - - Args: - workflow_list: an iterable of workflows. - - Returns: - A tuple, (per-job-queue-size, per-job-running-count). The key - is the pretty job name, and the value the count of jobs. - """ - queued_count = collections.Counter() - running_count = collections.Counter() - - for workflow in workflow_list: - if workflow.name not in GITHUB_WORKFLOW_TO_TRACK: - continue - - workflow_key = GITHUB_WORKFLOW_TO_TRACK[workflow.name] - for job in workflow.jobs(): - if job.name not in GITHUB_JOB_TO_TRACK[workflow_key]: - continue - job_key = GITHUB_JOB_TO_TRACK[workflow_key][job.name] - metric_name = f"{workflow_key}_{job_key}" - - if job.status == "queued": - queued_count[metric_name] += 1 - elif job.status == "in_progress": - running_count[metric_name] += 1 - return queued_count, running_count - - def get_sampled_workflow_metrics(github_repo: github.Repository): """Gets global statistics about the Github workflow queue @@ -281,83 +45,131 @@ def get_sampled_workflow_metrics(github_repo: github.Repository): Returns a list of GaugeMetric objects, containing the relevant metrics about the workflow """ + queued_job_counts = {} + running_job_counts = {} + # Other states are available (pending, waiting, etc), but the meaning # is not documented (See #70540). # "queued" seems to be the info we want. - queued_1, running_1 = github_count_queued_running_workflows( - github_repo.get_workflow_runs(status="queued") - ) - queued_2, running_2 = github_count_queued_running_workflows( - github_repo.get_workflow_runs(status="in_progress") - ) + for queued_workflow in github_repo.get_workflow_runs(status="queued"): + if queued_workflow.name not in WORKFLOWS_TO_TRACK: + continue + for queued_workflow_job in queued_workflow.jobs(): + job_name = queued_workflow_job.name + # Workflows marked as queued can potentially only have some jobs + # queued, so make sure to also count jobs currently in progress. + if queued_workflow_job.status == "queued": + if job_name not in queued_job_counts: + queued_job_counts[job_name] = 1 + else: + queued_job_counts[job_name] += 1 + elif queued_workflow_job.status == "in_progress": + if job_name not in running_job_counts: + running_job_counts[job_name] = 1 + else: + running_job_counts[job_name] += 1 + + for running_workflow in github_repo.get_workflow_runs(status="in_progress"): + if running_workflow.name not in WORKFLOWS_TO_TRACK: + continue + for running_workflow_job in running_workflow.jobs(): + job_name = running_workflow_job.name + if running_workflow_job.status != "in_progress": + continue + + if job_name not in running_job_counts: + running_job_counts[job_name] = 1 + else: + running_job_counts[job_name] += 1 workflow_metrics = [] - for key, value in (queued_1 + queued_2).items(): + for queued_job in queued_job_counts: workflow_metrics.append( - GaugeMetric(f"workflow_queue_size_{key}", value, time.time_ns()) + GaugeMetric( + f"workflow_queue_size_{queued_job}", + queued_job_counts[queued_job], + time.time_ns(), + ) ) - for key, value in (running_1 + running_2).items(): + for running_job in running_job_counts: workflow_metrics.append( - GaugeMetric(f"running_workflow_count_{key}", value, time.time_ns()) + GaugeMetric( + f"running_workflow_count_{running_job}", + running_job_counts[running_job], + time.time_ns(), + ) ) - - # Always send a hearbeat metric so we can monitor is this container is - # still able to log to Grafana. + # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana. workflow_metrics.append( GaugeMetric("metrics_container_heartbeat", 1, time.time_ns()) ) return workflow_metrics -def get_per_workflow_metrics(github_repo: github.Repository, last_workflow_id: str): +def get_per_workflow_metrics( + github_repo: github.Repository, workflows_to_track: dict[str, int] +): """Gets the metrics for specified Github workflows. - This function loads the last workflows from GitHub up to - `last_workflow_id` and logs their metrics if they are referenced in - GITHUB_WORKFLOW_TO_TRACK. - The function returns a list of metrics, and the most recent processed - workflow. - If `last_workflow_id` is None, no metrics are returned, and the last - completed github workflow ID is returned. This is used once when the - program starts. + This function takes in a list of workflows to track, and optionally the + workflow ID of the last tracked invocation. It grabs the relevant data + from Github, returning it to the caller. Args: github_repo: A github repo object to use to query the relevant information. - last_workflow_id: the last workflow we checked. + workflows_to_track: A dictionary mapping workflow names to the last + invocation ID where metrics have been collected, or None to collect the + last five results. Returns: Returns a list of JobMetrics objects, containing the relevant metrics about the workflow. """ workflow_metrics = [] - last_recorded_workflow = None - for workflow_run in iter(github_repo.get_workflow_runs(status="completed")): - # Record the first workflow of this list as the most recent one. - if last_recorded_workflow is None: - last_recorded_workflow = workflow_run.id - - # If we saw this workflow already, break. We also break if no - # workflow has been seen, as this means the script just started. - if last_workflow_id == workflow_run.id or last_workflow_id is None: + + workflows_to_include = set(workflows_to_track.keys()) + + for workflow_run in iter(github_repo.get_workflow_runs()): + if len(workflows_to_include) == 0: break - # This workflow is not interesting to us. Skipping. - if workflow_run.name not in GITHUB_WORKFLOW_TO_TRACK: + if workflow_run.status != "completed": continue - workflow_key = GITHUB_WORKFLOW_TO_TRACK[workflow_run.name] + # This workflow was already sampled for this run, or is not tracked at + # all. Ignoring. + if workflow_run.name not in workflows_to_include: + continue - for workflow_job in workflow_run.jobs(): - # This job is not interesting, skipping. - if workflow_job.name not in GITHUB_JOB_TO_TRACK[workflow_key]: - continue + # There were no new workflow invocations since the previous scrape. + # The API returns a sorted list with the most recent invocations first, + # so we can stop looking for this particular workflow. Continue to grab + # information on the other workflows of interest, if present. + if workflows_to_track[workflow_run.name] == workflow_run.id: + workflows_to_include.remove(workflow_run.name) + continue + + workflow_jobs = workflow_run.jobs() + if workflow_jobs.totalCount == 0: + continue + + if ( + workflows_to_track[workflow_run.name] is None + or workflows_to_track[workflow_run.name] == workflow_run.id + ): + workflows_to_include.remove(workflow_run.name) + if ( + workflows_to_track[workflow_run.name] is not None + and len(workflows_to_include) == 0 + ): + break + for workflow_job in workflow_jobs: created_at = workflow_job.created_at started_at = workflow_job.started_at completed_at = workflow_job.completed_at - job_result = int(workflow_job.conclusion == "success") - job_key = GITHUB_JOB_TO_TRACK[workflow_key][workflow_job.name] + job_result = int(workflow_job.conclusion == "success") if job_result: # We still might want to mark the job as a failure if one of the steps # failed. This is required due to use setting continue-on-error in @@ -387,7 +199,7 @@ def get_per_workflow_metrics(github_repo: github.Repository, last_workflow_id: s workflow_metrics.append( JobMetrics( - workflow_key + "_" + job_key, + workflow_run.name + "-" + workflow_job.name, queue_time.seconds, run_time.seconds, job_result, @@ -397,7 +209,8 @@ def get_per_workflow_metrics(github_repo: github.Repository, last_workflow_id: s ) ) - return workflow_metrics, last_recorded_workflow + return workflow_metrics + def upload_metrics(workflow_metrics, metrics_userid, api_key): """Upload metrics to Grafana. @@ -447,14 +260,13 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key): def main(): # Authenticate with Github auth = Auth.Token(os.environ["GITHUB_TOKEN"]) + grafana_api_key = os.environ["GRAFANA_API_KEY"] grafana_metrics_userid = os.environ["GRAFANA_METRICS_USERID"] - buildkite_token = os.environ["BUILDKITE_TOKEN"] - # This script only records workflows/jobs/builds finished after it - # started. So we need to keep track of the last known build. - buildkite_last_cursor = None - github_last_workflow_id = None + workflows_to_track = {} + for workflow_to_track in WORKFLOWS_TO_TRACK: + workflows_to_track[workflow_to_track] = None # Enter the main loop. Every five minutes we wake up and dump metrics for # the relevant jobs. @@ -462,17 +274,17 @@ def main(): github_object = Github(auth=auth) github_repo = github_object.get_repo("llvm/llvm-project") - buildkite_metrics, buildkite_last_cursor = buildkite_get_metrics( - buildkite_token, buildkite_last_cursor - ) - github_metrics, github_last_workflow_id = get_per_workflow_metrics( - github_repo, github_last_workflow_id - ) - sampled_metrics = get_sampled_workflow_metrics(github_repo) + current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track) + current_metrics += get_sampled_workflow_metrics(github_repo) + + upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key) + logging.info(f"Uploaded {len(current_metrics)} metrics") - metrics = buildkite_metrics + github_metrics + sampled_metrics - upload_metrics(metrics, grafana_metrics_userid, grafana_api_key) - logging.info(f"Uploaded {len(metrics)} metrics") + for workflow_metric in reversed(current_metrics): + if isinstance(workflow_metric, JobMetrics): + workflows_to_track[ + workflow_metric.workflow_name + ] = workflow_metric.workflow_id time.sleep(SCRAPE_INTERVAL_SECONDS)