-
Notifications
You must be signed in to change notification settings - Fork 213
284 lines (246 loc) · 12.9 KB
/
cicd-approve-test-queue.yml
File metadata and controls
284 lines (246 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: Approve Test Queue
on:
schedule:
- cron: "*/5 * * * *" # Runs every 5 minutes
workflow_dispatch: # Allows manual triggering
jobs:
approve-queue:
runs-on: ubuntu-latest
environment: main
if: github.repository == 'NVIDIA-NeMo/Megatron-Bridge'
strategy:
matrix:
branch: [main, others, workflow_dispatch]
contributor_type: [internal, external]
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install requests
- name: Download SSO users list
run: |
gh release download v0.1.0 \
--repo NVIDIA-GitHub-Management/github-audits \
--pattern users_sso.json \
--output users_sso.json || echo '{}' > users_sso.json
env:
GH_TOKEN: ${{ secrets.NVIDIA_MANAGEMENT_ORG_PAT }}
- name: Approve waiting deployments
env:
GITHUB_TOKEN: ${{ secrets.PAT }}
MAX_CONCURRENCY: ${{ vars.MAX_CONCURRENCY || 1 }}
MAX_CONCURRENCY_EXTERNAL: ${{ vars.MAX_CONCURRENCY_EXTERNAL || 3 }}
MAX_CONCURRENCY_WORKFLOW_DISPATCH: ${{ vars.MAX_CONCURRENCY || 1 }}
CONTRIBUTOR_TYPE: ${{ matrix.contributor_type }}
MATRIX_BRANCH: ${{ matrix.branch }}
SSO_USERS_FILE: users_sso.json
PYTHONUNBUFFERED: 1
shell: python
run: |
import os
import json
import requests
import re
# GitHub API configuration
GITHUB_TOKEN = os.environ["GITHUB_TOKEN"]
REPO = os.environ["GITHUB_REPOSITORY"]
CONTRIBUTOR_TYPE = os.environ["CONTRIBUTOR_TYPE"]
MATRIX_BRANCH = os.environ["MATRIX_BRANCH"]
if MATRIX_BRANCH == "workflow_dispatch":
MAX_CONCURRENCY = int(os.environ["MAX_CONCURRENCY_WORKFLOW_DISPATCH"])
API_BASE = f"https://api.github.com/repos/{REPO}"
WORKFLOW_NAME = "CICD NeMo"
else:
if CONTRIBUTOR_TYPE == "external":
MAX_CONCURRENCY = int(os.environ["MAX_CONCURRENCY_EXTERNAL"])
else:
MAX_CONCURRENCY = int(os.environ["MAX_CONCURRENCY"])
API_BASE = "https://api.github.com/repos/NVIDIA-NeMo/Megatron-Bridge"
WORKFLOW_NAME = "CICD NeMo"
# Load SSO users for internal/external classification
with open(os.environ["SSO_USERS_FILE"]) as f:
sso_users = json.load(f)
# Headers for GitHub API
headers = {
"Authorization": f"token {GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3+json",
"X-GitHub-Api-Version": "2022-11-28",
}
def make_request(endpoint, method="GET", data=None):
"""Make a request to the GitHub API with error handling."""
url = f"{API_BASE}/{endpoint}"
try:
if method == "GET":
response = requests.get(url, headers=headers)
else:
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error making request to {endpoint}: {str(e)}")
if hasattr(e.response, 'text'):
print(f"Response: {e.response.text}")
return None
def is_internal_contributor(pr_info):
"""Return True if the PR author is a member of NVIDIA or NVIDIA-NeMo org (is_org_member)."""
login = pr_info.get("user", {}).get("login", "")
org_roles = sso_users.get(login, {}).get("org_roles", [])
return any(role in ("NVIDIA:Member", "NVIDIA-NeMo:Member") for role in org_roles)
def get_pr_base_branch(workflow_run):
"""
Return the base branch of the PR associated with a workflow run, or None.
Extracts PR number from head branch like 'pull-request/1913' and fetches PR info.
Returns (base_branch, pr_info) tuple, or (None, None) if not a PR run.
"""
print(workflow_run.get("head_branch", ""))
head_branch = workflow_run.get("head_branch", "")
match = re.match(r"pull-request/(\d+)", head_branch)
if not match:
return None, None # Not a PR branch pattern
pr_number = int(match.group(1))
# Fetch PR info from GitHub API
pr_info = make_request(f"pulls/{pr_number}")
if not pr_info:
print(f"Failed to fetch PR #{pr_number}")
return None, None
base_branch = pr_info.get("base", {}).get("ref")
return base_branch, pr_info
def is_internal_actor(workflow_run):
"""Return True if the actor who triggered the workflow run is an NVIDIA/NVIDIA-NeMo member."""
login = (workflow_run.get("triggering_actor") or workflow_run.get("actor") or {}).get("login", "")
org_roles = sso_users.get(login, {}).get("org_roles", [])
return any(role in ("NVIDIA:Member", "NVIDIA-NeMo:Member") for role in org_roles)
def is_pr_run(workflow_run):
"""Return True if this run was triggered by a PR (head_branch matches pull-request/<number>)."""
return bool(re.match(r"pull-request/\d+", workflow_run.get("head_branch", "")))
def is_workflow_dispatch_run(workflow_run):
"""Return True if this run was manually triggered (head_branch starts with mcore-testing-)."""
return workflow_run.get("head_branch", "").startswith("mcore-testing-")
def matches_queue(workflow_run, target_branch, contributor_type):
"""
Return True if the workflow run belongs to this queue cell:
matching target branch AND matching contributor type (internal/external).
workflow_dispatch runs (head_branch: mcore-testing-*) are routed to the 'workflow_dispatch' queue only.
PR runs (head_branch: pull-request/<n>) are routed to 'main' or 'others' queues only.
"""
if target_branch == "workflow_dispatch":
if not is_workflow_dispatch_run(workflow_run):
return False
internal = is_internal_actor(workflow_run)
contributor_match = (contributor_type == "internal") == internal
if contributor_match:
actor = (workflow_run.get("triggering_actor") or workflow_run.get("actor") or {}).get("login", "unknown")
print(f"workflow_dispatch run by {actor}, contributor_type={contributor_type} (internal={internal})")
return contributor_match
# PR queue: skip non-PR runs
if not is_pr_run(workflow_run):
return False
base_branch, pr_info = get_pr_base_branch(workflow_run)
if base_branch is None:
return False
branch_match = (
(base_branch == target_branch) or
(base_branch != "main" and base_branch != "dev" and target_branch == "others")
)
if not branch_match:
return False
pr_number = re.match(r"pull-request/(\d+)", workflow_run.get("head_branch", "")).group(1)
internal = is_internal_contributor(pr_info)
contributor_match = (contributor_type == "internal") == internal
if branch_match and contributor_match:
print(f"PR #{pr_number} targets {target_branch}, contributor_type={contributor_type} (internal={internal})")
return branch_match and contributor_match
# Get current running and queued workflows
print(f"\n=== Queue cell: branch=${{ matrix.branch }}, contributor_type={CONTRIBUTOR_TYPE} ===")
print("Fetching workflow runs...")
queued_workflow_runs = make_request("actions/runs?status=queued").get("workflow_runs", [])
in_progress_workflow_runs = make_request("actions/runs?status=in_progress").get("workflow_runs", [])
def log_and_filter(runs, label):
cicd_runs = [r for r in runs if r["name"] == WORKFLOW_NAME]
print(f"{label}: {len(runs)} total, {len(cicd_runs)} {WORKFLOW_NAME}")
for r in cicd_runs:
actor = (r.get("triggering_actor") or r.get("actor") or {}).get("login", "unknown")
matched = matches_queue(r, "${{ matrix.branch }}", CONTRIBUTOR_TYPE)
print(f" run={r['id']} head_branch={r.get('head_branch')} event={r.get('event')} actor={actor} -> matched={matched}")
return [r for r in cicd_runs if matches_queue(r, "${{ matrix.branch }}", CONTRIBUTOR_TYPE)]
queued_workflow_runs = log_and_filter(queued_workflow_runs, "queued")
in_progress_workflow_runs = log_and_filter(in_progress_workflow_runs, "in_progress")
# Count running and queued workflows
queued_workflows = len(queued_workflow_runs)
in_progress_workflows = len(in_progress_workflow_runs)
total_workflows = queued_workflows + in_progress_workflows
print(f"Current queued workflows (PRs targeting ${{ matrix.branch }}, {CONTRIBUTOR_TYPE}): {queued_workflows}")
print(f"Current running workflows (PRs targeting ${{ matrix.branch }}, {CONTRIBUTOR_TYPE}): {in_progress_workflows}")
print(f"Total workflows: {total_workflows}")
print(f"Max concurrency: {MAX_CONCURRENCY}")
if total_workflows >= MAX_CONCURRENCY:
print("Maximum concurrency reached, no new approvals will be made")
exit(0)
# Get waiting CI workflows for test environment
print("Fetching waiting deployments...")
pending_workflows = log_and_filter(
make_request("actions/runs?status=waiting").get("workflow_runs", []),
"waiting"
)
# Sort deployments by creation date (oldest first)
print("Sorting workflows...")
pending_workflows = sorted(pending_workflows, key=lambda x: x["created_at"])
# Process each deployment
print(f"Processing {len(pending_workflows)} pending workflows...")
for workflow in pending_workflows:
if total_workflows >= MAX_CONCURRENCY:
print("Maximum concurrency reached, stopping approvals")
break
workflow_id = workflow["id"]
workflow_name = workflow["display_title"]
print(f"Approving workflow {workflow_name} with Run Id: {workflow_id}")
deployment_url = f"actions/runs/{workflow_id}/pending_deployments"
deployment = make_request(deployment_url)[0]
environment_id = deployment["environment"]["id"]
# Approve the deployment
status_data = {
"environment_ids": [environment_id],
"state": "approved",
"comment": "Automatically approved by queue manager"
}
result = make_request(deployment_url, method="POST", data=status_data)
if result:
total_workflows += 1
else:
print(f"Failed to approve deployment {deployment['id']}")
exit(1)
notify:
if: failure()
runs-on: ubuntu-latest
needs: [approve-queue]
steps:
- name: Notify
env:
SLACK_WEBHOOK: ${{ secrets.SLACK_CI_CHANNEL_WEBHOOK }}
SLACK_WEBHOOK_ADMIN: <!subteam^${{ secrets.SLACK_TEAM_GROUP_ID }}>
GITHUB_RUN_ID: ${{ github.run_id }}
GITHUB_REPOSITORY: ${{ github.repository }}
run: |
curl -X POST \
-H 'Content-type: application/json' \
--data "{\"text\":\":robot_joy: <https://github.com/${GITHUB_REPOSITORY}/actions/runs/${GITHUB_RUN_ID}|Test-queue-approval-bot workflow> failed. Please review manually.\n\ncc ${SLACK_WEBHOOK_ADMIN}\"}" \
$SLACK_WEBHOOK