Approve Test Queue #20137
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| name: Approve Test Queue | |
| on: | |
| schedule: | |
| - cron: "*/5 * * * *" # Runs every 5 minutes | |
| workflow_dispatch: # Allows manual triggering | |
| jobs: | |
| approve-queue: | |
| runs-on: ubuntu-latest | |
| environment: main | |
| if: github.repository == 'NVIDIA/Megatron-LM' | |
| strategy: | |
| matrix: | |
| branch: [main, dev, others] | |
| contributor_type: [internal, external] | |
| steps: | |
| - name: Checkout repository | |
| uses: actions/checkout@v6 | |
| - name: Set up Python | |
| uses: actions/setup-python@v6 | |
| with: | |
| python-version: "3.12" | |
| - name: Install dependencies | |
| run: | | |
| python -m pip install --upgrade pip | |
| pip install requests | |
| - name: Download SSO users list | |
| run: | | |
| gh release download v0.1.0 \ | |
| --repo NVIDIA-GitHub-Management/github-audits \ | |
| --pattern users_sso.json \ | |
| --output users_sso.json || echo '{}' > users_sso.json | |
| env: | |
| GH_TOKEN: ${{ secrets.NVIDIA_MANAGEMENT_ORG_PAT }} | |
| - name: Approve waiting deployments | |
| env: | |
| GITHUB_TOKEN: ${{ secrets.PAT }} | |
| MAX_CONCURRENCY: ${{ vars.MAX_CONCURRENCY || 1 }} | |
| MAX_CONCURRENCY_EXTERNAL: ${{ vars.MAX_CONCURRENCY_EXTERNAL || 1 }} | |
| CONTRIBUTOR_TYPE: ${{ matrix.contributor_type }} | |
| SSO_USERS_FILE: users_sso.json | |
| PYTHONUNBUFFERED: 1 | |
| shell: python | |
| run: | | |
| import os | |
| import json | |
| import requests | |
| import re | |
| # GitHub API configuration | |
| GITHUB_TOKEN = os.environ["GITHUB_TOKEN"] | |
| REPO = os.environ["GITHUB_REPOSITORY"] | |
| CONTRIBUTOR_TYPE = os.environ["CONTRIBUTOR_TYPE"] | |
| if CONTRIBUTOR_TYPE == "external": | |
| # Global limit across all branches — no division needed since we count globally. | |
| MAX_CONCURRENCY = int(os.environ["MAX_CONCURRENCY_EXTERNAL"]) | |
| else: | |
| MAX_CONCURRENCY = int(os.environ["MAX_CONCURRENCY"]) // 2 | |
| API_BASE = f"https://api.github.com/repos/NVIDIA/Megatron-LM" | |
| # Load SSO users for internal/external classification | |
| with open(os.environ["SSO_USERS_FILE"]) as f: | |
| sso_users = json.load(f) | |
| # Headers for GitHub API | |
| headers = { | |
| "Authorization": f"token {GITHUB_TOKEN}", | |
| "Accept": "application/vnd.github.v3+json", | |
| "X-GitHub-Api-Version": "2022-11-28", | |
| } | |
| def make_request(endpoint, method="GET", data=None): | |
| """Make a request to the GitHub API with error handling.""" | |
| url = f"{API_BASE}/{endpoint}" | |
| try: | |
| if method == "GET": | |
| response = requests.get(url, headers=headers) | |
| else: | |
| response = requests.post(url, headers=headers, json=data) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error making request to {endpoint}: {str(e)}") | |
| if hasattr(e.response, 'text'): | |
| print(f"Response: {e.response.text}") | |
| return None | |
| def is_internal_contributor(pr_info): | |
| """Return True if the PR author is a member of NVIDIA or NVIDIA-NeMo org (is_org_member).""" | |
| login = pr_info.get("user", {}).get("login", "") | |
| org_roles = sso_users.get(login, {}).get("org_roles", []) | |
| return any(role in ("NVIDIA:Member", "NVIDIA-NeMo:Member") for role in org_roles) | |
| def get_pr_base_branch(workflow_run): | |
| """ | |
| Return the base branch of the PR associated with a workflow run, or None. | |
| Extracts PR number from head branch like 'pull-request/1913' and fetches PR info. | |
| Returns (base_branch, pr_info) tuple, or (None, None) if not a PR run. | |
| """ | |
| print(workflow_run.get("head_branch", "")) | |
| head_branch = workflow_run.get("head_branch", "") | |
| match = re.match(r"pull-request/(\d+)", head_branch) | |
| if not match: | |
| return None, None # Not a PR branch pattern | |
| pr_number = int(match.group(1)) | |
| # Fetch PR info from GitHub API | |
| pr_info = make_request(f"pulls/{pr_number}") | |
| if not pr_info: | |
| print(f"Failed to fetch PR #{pr_number}") | |
| return None, None | |
| base_branch = pr_info.get("base", {}).get("ref") | |
| return base_branch, pr_info | |
| def matches_contributor(workflow_run, contributor_type): | |
| """Return True if the workflow run matches the contributor type (ignores branch).""" | |
| _, pr_info = get_pr_base_branch(workflow_run) | |
| if pr_info is None: | |
| return False | |
| internal = is_internal_contributor(pr_info) | |
| return (contributor_type == "internal") == internal | |
| def matches_queue(workflow_run, target_branch, contributor_type): | |
| """ | |
| Return True if the workflow run belongs to this queue cell: | |
| matching target branch AND matching contributor type (internal/external). | |
| """ | |
| base_branch, pr_info = get_pr_base_branch(workflow_run) | |
| if base_branch is None: | |
| return False | |
| branch_match = ( | |
| (base_branch == target_branch) or | |
| (base_branch != "main" and base_branch != "dev" and target_branch == "others") | |
| ) | |
| if not branch_match: | |
| return False | |
| pr_number = re.match(r"pull-request/(\d+)", workflow_run.get("head_branch", "")).group(1) | |
| internal = is_internal_contributor(pr_info) | |
| contributor_match = (contributor_type == "internal") == internal | |
| if branch_match and contributor_match: | |
| print(f"PR #{pr_number} targets {target_branch}, contributor_type={contributor_type} (internal={internal})") | |
| return branch_match and contributor_match | |
| # Get current running and queued workflows | |
| print("Fetching workflow runs...") | |
| queued_workflow_runs = make_request("actions/runs?status=queued").get("workflow_runs", []) | |
| in_progress_workflow_runs = make_request("actions/runs?status=in_progress").get("workflow_runs", []) | |
| # For external contributors, enforce a single global concurrency limit across ALL branches. | |
| # For internal contributors, enforce per-branch limits as before. | |
| if CONTRIBUTOR_TYPE == "external": | |
| queued_workflow_runs = [run for run in queued_workflow_runs | |
| if run["name"] == "CICD Megatron-LM" and matches_contributor(run, CONTRIBUTOR_TYPE)] | |
| in_progress_workflow_runs = [run for run in in_progress_workflow_runs | |
| if run["name"] == "CICD Megatron-LM" and matches_contributor(run, CONTRIBUTOR_TYPE)] | |
| else: | |
| # Filter for workflows belonging to PRs targeting ${{ matrix.branch }} with matching contributor type | |
| queued_workflow_runs = [run for run in queued_workflow_runs | |
| if run["name"] == "CICD Megatron-LM" and matches_queue(run, "${{ matrix.branch }}", CONTRIBUTOR_TYPE)] | |
| in_progress_workflow_runs = [run for run in in_progress_workflow_runs | |
| if run["name"] == "CICD Megatron-LM" and matches_queue(run, "${{ matrix.branch }}", CONTRIBUTOR_TYPE)] | |
| # Count running and queued workflows | |
| queued_workflows = len(queued_workflow_runs) | |
| in_progress_workflows = len(in_progress_workflow_runs) | |
| total_workflows = queued_workflows + in_progress_workflows | |
| print(f"Current queued workflows (PRs targeting ${{ matrix.branch }}, {CONTRIBUTOR_TYPE}): {queued_workflows}") | |
| print(f"Current running workflows (PRs targeting ${{ matrix.branch }}, {CONTRIBUTOR_TYPE}): {in_progress_workflows}") | |
| print(f"Total workflows: {total_workflows}") | |
| print(f"Max concurrency: {MAX_CONCURRENCY}") | |
| if total_workflows >= MAX_CONCURRENCY: | |
| print("Maximum concurrency reached, no new approvals will be made") | |
| exit(0) | |
| # Get waiting CI workflows for test environment | |
| print("Fetching deployments...") | |
| pending_workflows = make_request("actions/runs?status=waiting").get("workflow_runs", []) | |
| print("Pending workflows:", len(pending_workflows)) | |
| pending_workflows = [run for run in pending_workflows | |
| if run["name"] == "CICD Megatron-LM" and matches_queue(run, "${{ matrix.branch }}", CONTRIBUTOR_TYPE)] | |
| # Sort deployments by creation date (oldest first) | |
| print("Sorting workflows...") | |
| pending_workflows = sorted(pending_workflows, key=lambda x: x["created_at"]) | |
| # Process each deployment | |
| print(f"Processing {len(pending_workflows)} pending workflows...") | |
| for workflow in pending_workflows: | |
| if total_workflows >= MAX_CONCURRENCY: | |
| print("Maximum concurrency reached, stopping approvals") | |
| break | |
| workflow_id = workflow["id"] | |
| workflow_name = workflow["display_title"] | |
| print(f"Approving workflow {workflow_name} with Run Id: {workflow_id}") | |
| deployment_url = f"actions/runs/{workflow_id}/pending_deployments" | |
| deployment = make_request(deployment_url)[0] | |
| environment_id = deployment["environment"]["id"] | |
| # Approve the deployment | |
| status_data = { | |
| "environment_ids": [environment_id], | |
| "state": "approved", | |
| "comment": "Automatically approved by queue manager" | |
| } | |
| result = make_request(deployment_url, method="POST", data=status_data) | |
| if result: | |
| total_workflows += 1 | |
| else: | |
| print(f"Failed to approve deployment {deployment['id']}") | |
| exit(1) | |
| notify: | |
| if: failure() | |
| runs-on: ubuntu-latest | |
| needs: [approve-queue] | |
| steps: | |
| - name: Notify | |
| env: | |
| SLACK_WEBHOOK: ${{ secrets.SLACK_CI_CHANNEL_WEBHOOK }} | |
| SLACK_WEBHOOK_ADMIN: <!subteam^${{ secrets.SLACK_TEAM_GROUP_ID }}> | |
| GITHUB_RUN_ID: ${{ github.run_id }} | |
| GITHUB_REPOSITORY: ${{ github.repository }} | |
| run: | | |
| curl -X POST \ | |
| -H 'Content-type: application/json' \ | |
| --data "{\"text\":\":robot_joy: <https://github.com/${GITHUB_REPOSITORY}/actions/runs/${GITHUB_RUN_ID}|Test-queue-approval-bot workflow> failed. Please review manually.\n\ncc ${SLACK_WEBHOOK_ADMIN}\"}" \ | |
| $SLACK_WEBHOOK |