Approve Test Queue #37756
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| name: Approve Test Queue | |
| on: | |
| schedule: | |
| - cron: "*/5 * * * *" # Runs every 5 minutes | |
| workflow_dispatch: # Allows manual triggering | |
| jobs: | |
| approve-queue: | |
| runs-on: ubuntu-latest | |
| environment: main | |
| if: github.repository == 'NVIDIA-NeMo/Megatron-Bridge' | |
| strategy: | |
| matrix: | |
| branch: [main, others, workflow_dispatch] | |
| contributor_type: [internal, external] | |
| steps: | |
| - name: Checkout repository | |
| uses: actions/checkout@v4 | |
| - name: Set up Python | |
| uses: actions/setup-python@v5 | |
| with: | |
| python-version: "3.12" | |
| - name: Install dependencies | |
| run: | | |
| python -m pip install --upgrade pip | |
| pip install requests | |
| - name: Download SSO users list | |
| run: | | |
| gh release download v0.1.0 \ | |
| --repo NVIDIA-GitHub-Management/github-audits \ | |
| --pattern users_sso.json \ | |
| --output users_sso.json || echo '{}' > users_sso.json | |
| env: | |
| GH_TOKEN: ${{ secrets.NVIDIA_MANAGEMENT_ORG_PAT }} | |
| - name: Approve waiting deployments | |
| env: | |
| GITHUB_TOKEN: ${{ secrets.PAT }} | |
| MAX_CONCURRENCY: ${{ vars.MAX_CONCURRENCY || 1 }} | |
| MAX_CONCURRENCY_EXTERNAL: ${{ vars.MAX_CONCURRENCY_EXTERNAL || 3 }} | |
| MAX_CONCURRENCY_WORKFLOW_DISPATCH: ${{ vars.MAX_CONCURRENCY || 1 }} | |
| CONTRIBUTOR_TYPE: ${{ matrix.contributor_type }} | |
| MATRIX_BRANCH: ${{ matrix.branch }} | |
| SSO_USERS_FILE: users_sso.json | |
| PYTHONUNBUFFERED: 1 | |
| shell: python | |
| run: | | |
| import os | |
| import json | |
| import requests | |
| import re | |
| # GitHub API configuration | |
| GITHUB_TOKEN = os.environ["GITHUB_TOKEN"] | |
| REPO = os.environ["GITHUB_REPOSITORY"] | |
| CONTRIBUTOR_TYPE = os.environ["CONTRIBUTOR_TYPE"] | |
| MATRIX_BRANCH = os.environ["MATRIX_BRANCH"] | |
| if MATRIX_BRANCH == "workflow_dispatch": | |
| MAX_CONCURRENCY = int(os.environ["MAX_CONCURRENCY_WORKFLOW_DISPATCH"]) | |
| API_BASE = f"https://api.github.com/repos/{REPO}" | |
| WORKFLOW_NAME = "CICD NeMo" | |
| else: | |
| if CONTRIBUTOR_TYPE == "external": | |
| MAX_CONCURRENCY = int(os.environ["MAX_CONCURRENCY_EXTERNAL"]) | |
| else: | |
| MAX_CONCURRENCY = int(os.environ["MAX_CONCURRENCY"]) | |
| API_BASE = "https://api.github.com/repos/NVIDIA-NeMo/Megatron-Bridge" | |
| WORKFLOW_NAME = "CICD NeMo" | |
| # Load SSO users for internal/external classification | |
| with open(os.environ["SSO_USERS_FILE"]) as f: | |
| sso_users = json.load(f) | |
| # Headers for GitHub API | |
| headers = { | |
| "Authorization": f"token {GITHUB_TOKEN}", | |
| "Accept": "application/vnd.github.v3+json", | |
| "X-GitHub-Api-Version": "2022-11-28", | |
| } | |
| def make_request(endpoint, method="GET", data=None): | |
| """Make a request to the GitHub API with error handling.""" | |
| url = f"{API_BASE}/{endpoint}" | |
| try: | |
| if method == "GET": | |
| response = requests.get(url, headers=headers) | |
| else: | |
| response = requests.post(url, headers=headers, json=data) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error making request to {endpoint}: {str(e)}") | |
| if hasattr(e.response, 'text'): | |
| print(f"Response: {e.response.text}") | |
| return None | |
| def is_internal_contributor(pr_info): | |
| """Return True if the PR author is a member of NVIDIA or NVIDIA-NeMo org (is_org_member).""" | |
| login = pr_info.get("user", {}).get("login", "") | |
| org_roles = sso_users.get(login, {}).get("org_roles", []) | |
| return any(role in ("NVIDIA:Member", "NVIDIA-NeMo:Member") for role in org_roles) | |
| def get_pr_base_branch(workflow_run): | |
| """ | |
| Return the base branch of the PR associated with a workflow run, or None. | |
| Extracts PR number from head branch like 'pull-request/1913' and fetches PR info. | |
| Returns (base_branch, pr_info) tuple, or (None, None) if not a PR run. | |
| """ | |
| print(workflow_run.get("head_branch", "")) | |
| head_branch = workflow_run.get("head_branch", "") | |
| match = re.match(r"pull-request/(\d+)", head_branch) | |
| if not match: | |
| return None, None # Not a PR branch pattern | |
| pr_number = int(match.group(1)) | |
| # Fetch PR info from GitHub API | |
| pr_info = make_request(f"pulls/{pr_number}") | |
| if not pr_info: | |
| print(f"Failed to fetch PR #{pr_number}") | |
| return None, None | |
| base_branch = pr_info.get("base", {}).get("ref") | |
| return base_branch, pr_info | |
| def is_internal_actor(workflow_run): | |
| """Return True if the actor who triggered the workflow run is an NVIDIA/NVIDIA-NeMo member.""" | |
| login = (workflow_run.get("triggering_actor") or workflow_run.get("actor") or {}).get("login", "") | |
| org_roles = sso_users.get(login, {}).get("org_roles", []) | |
| return any(role in ("NVIDIA:Member", "NVIDIA-NeMo:Member") for role in org_roles) | |
| def is_pr_run(workflow_run): | |
| """Return True if this run was triggered by a PR (head_branch matches pull-request/<number>).""" | |
| return bool(re.match(r"pull-request/\d+", workflow_run.get("head_branch", ""))) | |
| def is_workflow_dispatch_run(workflow_run): | |
| """Return True if this run was manually triggered (head_branch starts with mcore-testing-).""" | |
| return workflow_run.get("head_branch", "").startswith("mcore-testing-") | |
| def matches_queue(workflow_run, target_branch, contributor_type): | |
| """ | |
| Return True if the workflow run belongs to this queue cell: | |
| matching target branch AND matching contributor type (internal/external). | |
| workflow_dispatch runs (head_branch: mcore-testing-*) are routed to the 'workflow_dispatch' queue only. | |
| PR runs (head_branch: pull-request/<n>) are routed to 'main' or 'others' queues only. | |
| """ | |
| if target_branch == "workflow_dispatch": | |
| if not is_workflow_dispatch_run(workflow_run): | |
| return False | |
| internal = is_internal_actor(workflow_run) | |
| contributor_match = (contributor_type == "internal") == internal | |
| if contributor_match: | |
| actor = (workflow_run.get("triggering_actor") or workflow_run.get("actor") or {}).get("login", "unknown") | |
| print(f"workflow_dispatch run by {actor}, contributor_type={contributor_type} (internal={internal})") | |
| return contributor_match | |
| # PR queue: skip non-PR runs | |
| if not is_pr_run(workflow_run): | |
| return False | |
| base_branch, pr_info = get_pr_base_branch(workflow_run) | |
| if base_branch is None: | |
| return False | |
| branch_match = ( | |
| (base_branch == target_branch) or | |
| (base_branch != "main" and base_branch != "dev" and target_branch == "others") | |
| ) | |
| if not branch_match: | |
| return False | |
| pr_number = re.match(r"pull-request/(\d+)", workflow_run.get("head_branch", "")).group(1) | |
| internal = is_internal_contributor(pr_info) | |
| contributor_match = (contributor_type == "internal") == internal | |
| if branch_match and contributor_match: | |
| print(f"PR #{pr_number} targets {target_branch}, contributor_type={contributor_type} (internal={internal})") | |
| return branch_match and contributor_match | |
| # Get current running and queued workflows | |
| print(f"\n=== Queue cell: branch=${{ matrix.branch }}, contributor_type={CONTRIBUTOR_TYPE} ===") | |
| print("Fetching workflow runs...") | |
| queued_workflow_runs = make_request("actions/runs?status=queued").get("workflow_runs", []) | |
| in_progress_workflow_runs = make_request("actions/runs?status=in_progress").get("workflow_runs", []) | |
| def log_and_filter(runs, label): | |
| cicd_runs = [r for r in runs if r["name"] == WORKFLOW_NAME] | |
| print(f"{label}: {len(runs)} total, {len(cicd_runs)} {WORKFLOW_NAME}") | |
| for r in cicd_runs: | |
| actor = (r.get("triggering_actor") or r.get("actor") or {}).get("login", "unknown") | |
| matched = matches_queue(r, "${{ matrix.branch }}", CONTRIBUTOR_TYPE) | |
| print(f" run={r['id']} head_branch={r.get('head_branch')} event={r.get('event')} actor={actor} -> matched={matched}") | |
| return [r for r in cicd_runs if matches_queue(r, "${{ matrix.branch }}", CONTRIBUTOR_TYPE)] | |
| queued_workflow_runs = log_and_filter(queued_workflow_runs, "queued") | |
| in_progress_workflow_runs = log_and_filter(in_progress_workflow_runs, "in_progress") | |
| # Count running and queued workflows | |
| queued_workflows = len(queued_workflow_runs) | |
| in_progress_workflows = len(in_progress_workflow_runs) | |
| total_workflows = queued_workflows + in_progress_workflows | |
| print(f"Current queued workflows (PRs targeting ${{ matrix.branch }}, {CONTRIBUTOR_TYPE}): {queued_workflows}") | |
| print(f"Current running workflows (PRs targeting ${{ matrix.branch }}, {CONTRIBUTOR_TYPE}): {in_progress_workflows}") | |
| print(f"Total workflows: {total_workflows}") | |
| print(f"Max concurrency: {MAX_CONCURRENCY}") | |
| if total_workflows >= MAX_CONCURRENCY: | |
| print("Maximum concurrency reached, no new approvals will be made") | |
| exit(0) | |
| # Get waiting CI workflows for test environment | |
| print("Fetching waiting deployments...") | |
| pending_workflows = log_and_filter( | |
| make_request("actions/runs?status=waiting").get("workflow_runs", []), | |
| "waiting" | |
| ) | |
| # Sort deployments by creation date (oldest first) | |
| print("Sorting workflows...") | |
| pending_workflows = sorted(pending_workflows, key=lambda x: x["created_at"]) | |
| # Process each deployment | |
| print(f"Processing {len(pending_workflows)} pending workflows...") | |
| for workflow in pending_workflows: | |
| if total_workflows >= MAX_CONCURRENCY: | |
| print("Maximum concurrency reached, stopping approvals") | |
| break | |
| workflow_id = workflow["id"] | |
| workflow_name = workflow["display_title"] | |
| print(f"Approving workflow {workflow_name} with Run Id: {workflow_id}") | |
| deployment_url = f"actions/runs/{workflow_id}/pending_deployments" | |
| deployment = make_request(deployment_url)[0] | |
| environment_id = deployment["environment"]["id"] | |
| # Approve the deployment | |
| status_data = { | |
| "environment_ids": [environment_id], | |
| "state": "approved", | |
| "comment": "Automatically approved by queue manager" | |
| } | |
| result = make_request(deployment_url, method="POST", data=status_data) | |
| if result: | |
| total_workflows += 1 | |
| else: | |
| print(f"Failed to approve deployment {deployment['id']}") | |
| exit(1) | |
| notify: | |
| if: failure() | |
| runs-on: ubuntu-latest | |
| needs: [approve-queue] | |
| steps: | |
| - name: Notify | |
| env: | |
| SLACK_WEBHOOK: ${{ secrets.SLACK_CI_CHANNEL_WEBHOOK }} | |
| SLACK_WEBHOOK_ADMIN: <!subteam^${{ secrets.SLACK_TEAM_GROUP_ID }}> | |
| GITHUB_RUN_ID: ${{ github.run_id }} | |
| GITHUB_REPOSITORY: ${{ github.repository }} | |
| run: | | |
| curl -X POST \ | |
| -H 'Content-type: application/json' \ | |
| --data "{\"text\":\":robot_joy: <https://github.com/${GITHUB_REPOSITORY}/actions/runs/${GITHUB_RUN_ID}|Test-queue-approval-bot workflow> failed. Please review manually.\n\ncc ${SLACK_WEBHOOK_ADMIN}\"}" \ | |
| $SLACK_WEBHOOK |