|
| 1 | +# Helper to get the id of the currently running job in a GitHub Actions |
| 2 | +# workflow. GitHub does not provide this information to workflow runs, so we |
| 3 | +# need to figure it out based on what they *do* provide. |
| 4 | + |
| 5 | +import argparse |
| 6 | +import json |
| 7 | +import operator |
| 8 | +import os |
| 9 | +import re |
| 10 | +import sys |
| 11 | +import time |
| 12 | +import urllib |
| 13 | +import urllib.parse |
| 14 | +from typing import Any, Callable, Dict, List, Optional, Tuple |
| 15 | +from urllib.request import Request, urlopen |
| 16 | + |
| 17 | + |
| 18 | +def parse_json_and_links(conn: Any) -> Tuple[Any, Dict[str, Dict[str, str]]]: |
| 19 | + links = {} |
| 20 | + # Extract links which GH uses for pagination |
| 21 | + # see https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link |
| 22 | + if "Link" in conn.headers: |
| 23 | + for elem in re.split(", *<", conn.headers["Link"]): |
| 24 | + try: |
| 25 | + url, params_ = elem.split(";", 1) |
| 26 | + except ValueError: |
| 27 | + continue |
| 28 | + url = urllib.parse.unquote(url.strip("<> ")) |
| 29 | + qparams = urllib.parse.parse_qs(params_.strip(), separator=";") |
| 30 | + params = { |
| 31 | + k: v[0].strip('"') |
| 32 | + for k, v in qparams.items() |
| 33 | + if type(v) is list and len(v) > 0 |
| 34 | + } |
| 35 | + params["url"] = url |
| 36 | + if "rel" in params: |
| 37 | + links[params["rel"]] = params |
| 38 | + |
| 39 | + return json.load(conn), links |
| 40 | + |
| 41 | + |
| 42 | +def fetch_url( |
| 43 | + url: str, |
| 44 | + *, |
| 45 | + headers: Optional[Dict[str, str]] = None, |
| 46 | + reader: Callable[[Any], Any] = lambda x: x.read(), |
| 47 | + retries: Optional[int] = 3, |
| 48 | + backoff_timeout: float = 0.5, |
| 49 | +) -> Any: |
| 50 | + if headers is None: |
| 51 | + headers = {} |
| 52 | + try: |
| 53 | + with urlopen(Request(url, headers=headers)) as conn: |
| 54 | + return reader(conn) |
| 55 | + except urllib.error.HTTPError as err: |
| 56 | + if isinstance(retries, (int, float)) and retries > 0: |
| 57 | + time.sleep(backoff_timeout) |
| 58 | + return fetch_url( |
| 59 | + url, |
| 60 | + headers=headers, |
| 61 | + reader=reader, |
| 62 | + retries=retries - 1, |
| 63 | + backoff_timeout=backoff_timeout, |
| 64 | + ) |
| 65 | + exception_message = ( |
| 66 | + "Is github alright?", |
| 67 | + f"Recieved status code '{err.code}' when attempting to retrieve {url}:\n", |
| 68 | + f"{err.reason}\n\nheaders={err.headers}", |
| 69 | + ) |
| 70 | + raise RuntimeError(exception_message) from err |
| 71 | + |
| 72 | + |
| 73 | +def parse_args() -> Any: |
| 74 | + parser = argparse.ArgumentParser() |
| 75 | + parser.add_argument( |
| 76 | + "workflow_run_id", help="The id of the workflow run, should be GITHUB_RUN_ID" |
| 77 | + ) |
| 78 | + parser.add_argument( |
| 79 | + "runner_name", |
| 80 | + help="The name of the runner to retrieve the job id, should be RUNNER_NAME", |
| 81 | + ) |
| 82 | + |
| 83 | + return parser.parse_args() |
| 84 | + |
| 85 | + |
| 86 | +def fetch_jobs(url: str, headers: Dict[str, str]) -> List[Dict[str, str]]: |
| 87 | + response, links = fetch_url(url, headers=headers, reader=parse_json_and_links) |
| 88 | + jobs = response["jobs"] |
| 89 | + assert type(jobs) is list |
| 90 | + while "next" in links: |
| 91 | + response, links = fetch_url( |
| 92 | + links["next"]["url"], headers=headers, reader=parse_json_and_links |
| 93 | + ) |
| 94 | + jobs.extend(response["jobs"]) |
| 95 | + |
| 96 | + return jobs |
| 97 | + |
| 98 | + |
| 99 | +# Our strategy is to retrieve the parent workflow run, then filter its jobs on |
| 100 | +# RUNNER_NAME to figure out which job we're currently running. |
| 101 | +# |
| 102 | +# Why RUNNER_NAME? Because it's the only thing that uniquely identifies a job within a workflow. |
| 103 | +# GITHUB_JOB doesn't work, as it corresponds to the job yaml id |
| 104 | +# (https://bit.ly/37e78oI), which has two problems: |
| 105 | +# 1. It's not present in the workflow job JSON object, so we can't use it as a filter. |
| 106 | +# 2. It isn't unique; for matrix jobs the job yaml id is the same for all jobs in the matrix. |
| 107 | +# |
| 108 | +# RUNNER_NAME on the other hand is unique across the pool of runners. Also, |
| 109 | +# since only one job can be scheduled on a runner at a time, we know that |
| 110 | +# looking for RUNNER_NAME will uniquely identify the job we're currently |
| 111 | +# running. |
| 112 | + |
| 113 | + |
| 114 | +def find_job_id_name(args: Any) -> Tuple[str, str]: |
| 115 | + # From https://docs.github.com/en/actions/learn-github-actions/environment-variables |
| 116 | + PYTORCH_REPO = os.environ.get("GITHUB_REPOSITORY", "pytorch/pytorch") |
| 117 | + PYTORCH_GITHUB_API = f"https://api.github.com/repos/{PYTORCH_REPO}" |
| 118 | + GITHUB_TOKEN = os.environ["GITHUB_TOKEN"] |
| 119 | + REQUEST_HEADERS = { |
| 120 | + "Accept": "application/vnd.github.v3+json", |
| 121 | + "Authorization": "token " + GITHUB_TOKEN, |
| 122 | + } |
| 123 | + |
| 124 | + url = f"{PYTORCH_GITHUB_API}/actions/runs/{args.workflow_run_id}/jobs?per_page=100" |
| 125 | + jobs = fetch_jobs(url, REQUEST_HEADERS) |
| 126 | + |
| 127 | + # Sort the jobs list by start time, in descending order. We want to get the most |
| 128 | + # recently scheduled job on the runner. |
| 129 | + jobs.sort(key=operator.itemgetter("started_at"), reverse=True) |
| 130 | + |
| 131 | + for job in jobs: |
| 132 | + if job["runner_name"] == args.runner_name: |
| 133 | + return (job["id"], job["name"]) |
| 134 | + |
| 135 | + raise RuntimeError(f"Can't find job id for runner {args.runner_name}") |
| 136 | + |
| 137 | + |
| 138 | +def set_output(name: str, val: Any) -> None: |
| 139 | + if os.getenv("GITHUB_OUTPUT"): |
| 140 | + with open(str(os.getenv("GITHUB_OUTPUT")), "a") as env: |
| 141 | + print(f"{name}={val}", file=env) |
| 142 | + print(f"setting {name}={val}") |
| 143 | + else: |
| 144 | + print(f"::set-output name={name}::{val}") |
| 145 | + |
| 146 | + |
| 147 | +def main() -> None: |
| 148 | + args = parse_args() |
| 149 | + try: |
| 150 | + # Get both the job ID and job name because we have already spent a request |
| 151 | + # here to get the job info |
| 152 | + job_id, job_name = find_job_id_name(args) |
| 153 | + set_output("job-id", job_id) |
| 154 | + set_output("job-name", job_name) |
| 155 | + except Exception as e: |
| 156 | + print(repr(e), file=sys.stderr) |
| 157 | + print(f"workflow-{args.workflow_run_id}") |
| 158 | + |
| 159 | + |
| 160 | +if __name__ == "__main__": |
| 161 | + main() |
0 commit comments