|
1 | 1 | #!/usr/bin/env python3 |
2 | 2 |
|
3 | 3 | """ |
4 | | -This script manages a team in each organization that the user running this owns |
| 4 | +Manages a team in each organization that the user running this owns |
5 | 5 | within the enterprise. It is meant to be run after `org-admin-promote.py` to |
6 | 6 | create/update a team with the "security manager" role. |
7 | 7 |
|
8 | 8 | Inputs: |
9 | | -- GitHub API endpoint |
10 | | -- PAT with `enterprise:admin` scope, read from a file called `token.txt` |
| 9 | +- GitHub API endpoint (defaults to https://api.github.com) |
| 10 | +- PAT with `admin:enterprise` and `admin:org` scope, read from a named file (or GITHUB_TOKEN if that is not provided) |
11 | 11 | - `all_orgs.csv` file from `org-admin-promote.py` |
12 | 12 | - Team name for the security manager team |
13 | 13 | - List of security manager team members by handle |
14 | 14 |
|
15 | 15 | Outputs: |
16 | | -- Prints the org names the enterprise admin was removed from to `stdout` |
| 16 | +- Prints the members that were added to and removed from the security managers team |
17 | 17 | """ |
18 | 18 |
|
19 | | -# Imports |
| 19 | +from argparse import ArgumentParser |
| 20 | +from typing import Any |
20 | 21 | from defusedcsv import csv |
21 | | -from src import teams, organizations |
22 | | - |
23 | | -# Inputs |
24 | | -api_url = "https://api.github.com" # for GHEC |
25 | | -# api_url = "https://GHES-HOSTNAME-HERE/api/v3" # for GHES/GHAE |
26 | | - |
27 | | -# github_pat = "GITHUB-PAT-HERE" # if you want to set that manually |
28 | | -with open("token.txt", "r") as f: |
29 | | - github_pat = f.read().strip() |
30 | | - f.close() |
31 | | - |
32 | | -# List of organizations (filename) |
33 | | -org_list = "all_orgs.csv" |
34 | | - |
35 | | -# Team name for the security manager team |
36 | | -sec_team_name = "security-managers" |
37 | | - |
38 | | -# List of security manager team members by handle |
39 | | -sec_team_members = ["teammate1", "teammate2", "teammate3"] |
40 | | - |
41 | | -# Read in the org list |
42 | | -with open(org_list, "r") as f: |
43 | | - orgs = list(csv.DictReader(f)) |
44 | | - |
45 | | -# Set up the headers |
46 | | -headers = { |
47 | | - "Authorization": "token {}".format(github_pat), |
48 | | - "Accept": "application/vnd.github.v3+json", |
49 | | -} |
50 | | - |
| 22 | +from src import teams, organizations, github_token |
| 23 | +import logging |
| 24 | + |
| 25 | +LOG = logging.getLogger(__name__) |
| 26 | + |
| 27 | + |
| 28 | +def add_args(parser) -> None: |
| 29 | + """Add arguments to the command line parser.""" |
| 30 | + parser.add_argument( |
| 31 | + "--api-url", |
| 32 | + default="https://api.github.com", |
| 33 | + help="GitHub API URL (https://github-hostname-here/api/v3/ for GHES, EMU or data residency)", |
| 34 | + ) |
| 35 | + parser.add_argument( |
| 36 | + "--token-file", required=False, help="GitHub Personal Access Token file (or use GITHUB_TOKEN)" |
| 37 | + ) |
| 38 | + parser.add_argument( |
| 39 | + "--org-list", default="all_orgs.csv", help="CSV file of organizations" |
| 40 | + ) |
| 41 | + parser.add_argument( |
| 42 | + "--sec-team-name", default="security-managers", help="Security team name" |
| 43 | + ) |
| 44 | + parser.add_argument("--sec-team-members", nargs="*", help="Security team members") |
| 45 | + parser.add_argument("--sec-team-members-file", required=False, help="Security team members file") |
| 46 | + parser.add_argument("--legacy", action="store_true", help="Use legacy API endpoints to manage the security managers") |
| 47 | + parser.add_argument( |
| 48 | + "--debug", |
| 49 | + "-d", |
| 50 | + action="store_true", |
| 51 | + help="Enable debug logging", |
| 52 | + ) |
| 53 | + |
| 54 | + |
| 55 | +def make_security_managers_team( |
| 56 | + org_name: str, sec_team_name: str, api_url: str, headers: dict[str, str], legacy=False |
| 57 | +) -> None: |
| 58 | + """Create or update the security managers team in the specified organization.""" |
| 59 | + security_manager_role_id: str | None = None |
| 60 | + |
| 61 | + if not legacy: |
| 62 | + org_roles: dict[str, Any] = organizations.list_org_roles(api_url, headers, org_name) |
| 63 | + |
| 64 | + # Check if the "security manager" role exists |
| 65 | + if "roles" not in org_roles: |
| 66 | + LOG.error("⨯ Malformed response from GitHub API") |
| 67 | + return |
| 68 | + |
| 69 | + security_manager_role_id_list = [role["id"] for role in org_roles["roles"] if role["name"] == "security_manager"] |
| 70 | + if not security_manager_role_id_list: |
| 71 | + LOG.error("⨯ Organization {} does not have a security manager role".format(org_name)) |
| 72 | + return |
| 73 | + security_manager_role_id = security_manager_role_id_list[0] |
| 74 | + |
| 75 | + # Get the list of teams |
| 76 | + teams_info = teams.list_teams(api_url, headers, org_name) |
| 77 | + teams_list = [team["name"] for team in teams_info] |
| 78 | + |
| 79 | + # Create the team if it doesn't exist |
| 80 | + if sec_team_name not in teams_list: |
| 81 | + LOG.info("Creating team {}".format(sec_team_name)) |
| 82 | + try: |
| 83 | + teams.create_team(api_url, headers, org_name, sec_team_name) |
| 84 | + except Exception as e: |
| 85 | + LOG.error("⨯ Failed to create team {}: {}".format(sec_team_name, e)) |
| 86 | + |
| 87 | + # Update that team to have the "security manager" role |
| 88 | + try: |
| 89 | + # only update it if the team does not already have the role |
| 90 | + if not teams.has_team_role(api_url, headers, org_name, sec_team_name, security_manager_role_id, legacy=legacy): |
| 91 | + teams.change_team_role(api_url, headers, org_name, sec_team_name, security_manager_role_id, legacy=legacy) |
| 92 | + LOG.info( |
| 93 | + "✓ Team {} updated as a security manager for {}".format(sec_team_name, org_name) |
| 94 | + ) |
| 95 | + else: |
| 96 | + LOG.debug("✓ Team {} already has the security manager role for {}".format(sec_team_name, org_name)) |
| 97 | + except Exception as e: |
| 98 | + LOG.error("⨯ Failed to update team {}: {}".format(sec_team_name, e)) |
| 99 | + if LOG.getEffectiveLevel() == logging.DEBUG: |
| 100 | + raise e |
| 101 | + |
| 102 | + |
| 103 | +def add_security_managers_to_team( |
| 104 | + org_name: str, |
| 105 | + sec_team_name: str, |
| 106 | + sec_team_members: list[str], |
| 107 | + api_url: str, |
| 108 | + headers: dict[str, str], |
| 109 | +) -> None: |
| 110 | + """Add security managers to the specified team in the organization.""" |
| 111 | + # Get the list of org members, adding the missing ones to the org |
| 112 | + org_members = organizations.list_org_users(api_url, headers, org_name) |
| 113 | + org_members_list = [member["login"] for member in org_members] |
| 114 | + for username in sec_team_members: |
| 115 | + if username not in org_members_list: |
| 116 | + LOG.info("Adding {} to {}".format(username, org_name)) |
| 117 | + try: |
| 118 | + organizations.add_org_user(api_url, headers, org_name, username) |
| 119 | + except Exception as e: |
| 120 | + LOG.error("⨯ Failed to add user {} to org {}: {}".format(username, org_name, e)) |
| 121 | + return |
| 122 | + |
| 123 | + # Get the list of team members, adding the missing ones to the team and removing the extra ones |
| 124 | + team_members = teams.list_team_members(api_url, headers, org_name, sec_team_name) |
| 125 | + team_members_list = [member["login"] for member in team_members] |
| 126 | + for username in team_members_list: |
| 127 | + if username not in sec_team_members: |
| 128 | + LOG.info("Removing {} from {}".format(username, sec_team_name)) |
| 129 | + try: |
| 130 | + teams.remove_team_member( |
| 131 | + api_url, headers, org_name, sec_team_name, username |
| 132 | + ) |
| 133 | + except Exception as e: |
| 134 | + LOG.error("⨯ Failed to remove user {} from team {}: {}".format(username, sec_team_name, e)) |
| 135 | + return |
| 136 | + for username in sec_team_members: |
| 137 | + if username not in team_members_list: |
| 138 | + LOG.info("Adding {} to {}".format(username, sec_team_name)) |
| 139 | + try: |
| 140 | + teams.add_team_member(api_url, headers, org_name, sec_team_name, username) |
| 141 | + except Exception as e: |
| 142 | + LOG.error("⨯ Failed to add user {} to team {}: {}".format(username, sec_team_name, e)) |
| 143 | + return |
| 144 | + else: |
| 145 | + LOG.debug("✓ User {} is already a member of {}".format(username, sec_team_name)) |
| 146 | + |
| 147 | + |
| 148 | +def main() -> None: |
| 149 | + """Command line entrypoint.""" |
| 150 | + parser = ArgumentParser(description=__doc__) |
| 151 | + add_args(parser) |
| 152 | + args = parser.parse_args() |
| 153 | + |
| 154 | + logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO) |
| 155 | + |
| 156 | + # Read in the org list |
| 157 | + with open(args.org_list, "r") as f: |
| 158 | + orgs = list(csv.DictReader(f)) |
| 159 | + |
| 160 | + github_pat = github_token.read_token(args.token_file) |
| 161 | + |
| 162 | + if not github_pat: |
| 163 | + LOG.error("⨯ GitHub Personal Access Token not found") |
| 164 | + return |
| 165 | + |
| 166 | + if args.sec_team_members and args.sec_team_members_file: |
| 167 | + LOG.error("⨯ Please use either --sec-team-members or --sec-team-members-file") |
| 168 | + return |
| 169 | + |
| 170 | + sec_team_members = [] |
| 171 | + if args.sec_team_members_file: |
| 172 | + with open(args.sec_team_members_file, "r") as f: |
| 173 | + sec_team_members = [line.strip() for line in f if line.strip()] |
| 174 | + elif args.sec_team_members: |
| 175 | + sec_team_members = args.sec_team_members |
| 176 | + else: |
| 177 | + LOG.error("⨯ Please provide either --sec-team-members or --sec-team-members-file") |
| 178 | + return |
| 179 | + |
| 180 | + # Set up the headers |
| 181 | + headers = { |
| 182 | + "Authorization": "token {}".format(github_pat), |
| 183 | + } |
51 | 184 |
|
52 | | -if __name__ == "__main__": |
53 | 185 | # For each organization, do |
54 | 186 | for org in orgs: |
55 | 187 | org_name = org["login"] |
56 | 188 |
|
57 | | - # Get the list of teams |
58 | | - teams_info = teams.list_teams(api_url, headers, org_name) |
59 | | - teams_list = [team["name"] for team in teams_info] |
60 | | - |
61 | | - # Create the team if it doesn't exist |
62 | | - if sec_team_name not in teams_list: |
63 | | - print("Creating team {}".format(sec_team_name)) |
64 | | - teams.create_team(api_url, headers, org_name, sec_team_name) |
65 | | - |
66 | | - # Update that team to have the "security manager" role |
67 | | - teams.change_team_role(api_url, headers, org_name, sec_team_name) |
68 | | - print( |
69 | | - "Team {} updated as a security manager for {}!".format( |
70 | | - sec_team_name, org_name |
71 | | - ) |
| 189 | + make_security_managers_team(org_name, args.sec_team_name, args.api_url, headers, legacy=args.legacy) |
| 190 | + add_security_managers_to_team( |
| 191 | + org_name, args.sec_team_name, sec_team_members, args.api_url, headers |
72 | 192 | ) |
73 | 193 |
|
74 | | - # Get the list of org members, adding the missing ones to the org |
75 | | - org_members = organizations.list_org_users(api_url, headers, org_name) |
76 | | - org_members_list = [member["login"] for member in org_members] |
77 | | - for username in sec_team_members: |
78 | | - if username not in org_members_list: |
79 | | - print("Adding {} to {}".format(username, org_name)) |
80 | | - organizations.add_org_user(api_url, headers, org_name, username) |
81 | 194 |
|
82 | | - # Get the list of team members, adding the missing ones to the team and removing the extra ones |
83 | | - team_members = teams.list_team_members( |
84 | | - api_url, headers, org_name, sec_team_name |
85 | | - ) |
86 | | - team_members_list = [member["login"] for member in team_members] |
87 | | - for username in team_members_list: |
88 | | - if username not in sec_team_members: |
89 | | - print("Removing {} from {}".format(username, sec_team_name)) |
90 | | - teams.remove_team_member( |
91 | | - api_url, headers, org_name, sec_team_name, username |
92 | | - ) |
93 | | - for username in sec_team_members: |
94 | | - if username not in team_members_list: |
95 | | - print("Adding {} to {}".format(username, sec_team_name)) |
96 | | - teams.add_team_member( |
97 | | - api_url, headers, org_name, sec_team_name, username |
98 | | - ) |
| 195 | +if __name__ == "__main__": |
| 196 | + main() |
0 commit comments