Skip to content

Commit 4246c2f

Browse files
committed
Taking a cert bundle for verification from the args
1 parent 7875288 commit 4246c2f

File tree

8 files changed

+171
-27
lines changed

8 files changed

+171
-27
lines changed

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,15 @@ Removing ghe-admin from security-managers
9898
- [requests](https://pypi.org/project/requests/) is a simple and popular HTTP library.
9999
- [defusedcsv](https://github.com/raphaelm/defusedcsv) is used over `csv` to mitigate spreadsheet application exploitation in older versions.
100100
- The `.csv` files and `.txt` files are in the `.gitignore` file to avoid accidental commits into the repo.
101+
102+
## TLS / custom certificates
103+
104+
If you are running against a GitHub Enterprise Server instance that uses a self‑signed certificate or an internal certificate authority, you can provide a custom root CA (or certificate bundle) with the `--ca-bundle` argument on all three scripts:
105+
106+
```shell
107+
./org-admin-promote.py ENTERPRISE_SLUG --github-url https://ghe.example.com --ca-bundle /path/to/internal-ca.pem
108+
./manage-sec-team.py --github-url https://ghe.example.com --ca-bundle /path/to/internal-ca.pem --sec-team-members alice bob
109+
./org-admin-demote.py ENTERPRISE_SLUG --github-url https://ghe.example.com --ca-bundle /path/to/internal-ca.pem
110+
```
111+
112+
The value passed must be a readable PEM file containing the certificate(s). If omitted, the Python default trust store is used.

manage-sec-team.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ def add_args(parser) -> None:
6868
action="store_true",
6969
help="Enable debug logging",
7070
)
71+
parser.add_argument(
72+
"--ca-bundle",
73+
required=False,
74+
help="Path to a custom CA certificate or bundle (PEM) to trust for TLS (self-signed/internal CAs)",
75+
)
7176

7277

7378
def make_security_managers_team(
@@ -77,13 +82,14 @@ def make_security_managers_team(
7782
headers: dict[str, str],
7883
legacy=False,
7984
progress=False,
85+
verify: str | bool | None = True,
8086
) -> None:
8187
"""Create or update the security managers team in the specified organization."""
8288
security_manager_role_id: str | None = None
8389

8490
if not legacy:
8591
org_roles: dict[str, Any] = organizations.list_org_roles(
86-
api_url, headers, org_name
92+
api_url, headers, org_name, verify=verify
8793
)
8894

8995
# Check if the "security manager" role exists
@@ -106,15 +112,15 @@ def make_security_managers_team(
106112
security_manager_role_id = security_manager_role_id_list[0]
107113

108114
# Get the list of teams
109-
teams_info = teams.list_teams(api_url, headers, org_name)
115+
teams_info = teams.list_teams(api_url, headers, org_name, verify=verify)
110116
teams_list = [team["name"] for team in teams_info]
111117

112118
# Create the team if it doesn't exist
113119
if sec_team_name not in teams_list:
114120
if progress:
115121
LOG.info("Creating team {}".format(sec_team_name))
116122
try:
117-
teams.create_team(api_url, headers, org_name, sec_team_name)
123+
teams.create_team(api_url, headers, org_name, sec_team_name, verify=verify)
118124
except Exception as e:
119125
LOG.error("⨯ Failed to create team {}: {}".format(sec_team_name, e))
120126

@@ -128,6 +134,7 @@ def make_security_managers_team(
128134
sec_team_name,
129135
security_manager_role_id,
130136
legacy=legacy,
137+
verify=verify,
131138
):
132139
teams.change_team_role(
133140
api_url,
@@ -136,6 +143,7 @@ def make_security_managers_team(
136143
sec_team_name,
137144
security_manager_role_id,
138145
legacy=legacy,
146+
verify=verify,
139147
)
140148
if progress:
141149
LOG.info(
@@ -162,17 +170,18 @@ def add_security_managers_to_team(
162170
api_url: str,
163171
headers: dict[str, str],
164172
progress: bool = False,
173+
verify: str | bool | None = True,
165174
) -> None:
166175
"""Add security managers to the specified team in the organization."""
167176
# Get the list of org members, adding the missing ones to the org
168-
org_members = organizations.list_org_users(api_url, headers, org_name)
177+
org_members = organizations.list_org_users(api_url, headers, org_name, verify=verify)
169178
org_members_list = [member["login"] for member in org_members]
170179
for username in sec_team_members:
171180
if username not in org_members_list:
172181
if progress:
173182
LOG.info("Adding {} to {}".format(username, org_name))
174183
try:
175-
organizations.add_org_user(api_url, headers, org_name, username)
184+
organizations.add_org_user(api_url, headers, org_name, username, verify=verify)
176185
except Exception as e:
177186
LOG.error(
178187
"⨯ Failed to add user {} to org {}: {}".format(
@@ -182,15 +191,15 @@ def add_security_managers_to_team(
182191
return
183192

184193
# Get the list of team members, adding the missing ones to the team and removing the extra ones
185-
team_members = teams.list_team_members(api_url, headers, org_name, sec_team_name)
194+
team_members = teams.list_team_members(api_url, headers, org_name, sec_team_name, verify=verify)
186195
team_members_list = [member["login"] for member in team_members]
187196
for username in team_members_list:
188197
if username not in sec_team_members:
189198
if progress:
190199
LOG.info("Removing {} from {}".format(username, sec_team_name))
191200
try:
192201
teams.remove_team_member(
193-
api_url, headers, org_name, sec_team_name, username
202+
api_url, headers, org_name, sec_team_name, username, verify=verify
194203
)
195204
except Exception as e:
196205
LOG.error(
@@ -205,7 +214,7 @@ def add_security_managers_to_team(
205214
LOG.info("Adding {} to {}".format(username, sec_team_name))
206215
try:
207216
teams.add_team_member(
208-
api_url, headers, org_name, sec_team_name, username
217+
api_url, headers, org_name, sec_team_name, username, verify=verify
209218
)
210219
except Exception as e:
211220
LOG.error(
@@ -260,6 +269,13 @@ def main() -> None:
260269

261270
api_url = util.rest_api_url_from_server_url(args.github_url)
262271

272+
# Optional custom CA bundle / cert file
273+
verify: str | bool | None = True
274+
try:
275+
verify = util.validate_ca_bundle(args.ca_bundle)
276+
except FileNotFoundError:
277+
return
278+
263279
# Set up the headers
264280
headers = {
265281
"Authorization": "token {}".format(github_pat),
@@ -276,6 +292,7 @@ def main() -> None:
276292
headers,
277293
legacy=args.legacy,
278294
progress=args.progress,
295+
verify=verify,
279296
)
280297
add_security_managers_to_team(
281298
org_name,
@@ -284,6 +301,7 @@ def main() -> None:
284301
api_url,
285302
headers,
286303
progress=args.progress,
304+
verify=verify,
287305
)
288306

289307

org-admin-demote.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ def add_args(parser: ArgumentParser) -> None:
5757
action="store_true",
5858
help="Enable debug logging",
5959
)
60+
parser.add_argument(
61+
"--ca-bundle",
62+
required=False,
63+
help="Path to a custom CA certificate or bundle (PEM) for TLS verification (self-signed/internal roots)",
64+
)
6065

6166

6267
def demote_admin(
@@ -65,6 +70,7 @@ def demote_admin(
6570
enterprise_id: str,
6671
org_ids: Iterable[str],
6772
progress: bool = False,
73+
verify: str | bool | None = True,
6874
) -> None:
6975
"""Demote the enterprise admin from each organization ID provided."""
7076
org_ids_list = list(org_ids)
@@ -77,7 +83,7 @@ def demote_admin(
7783
)
7884
)
7985
enterprises.promote_admin(
80-
api_url, headers, enterprise_id, org_id, "UNAFFILIATED"
86+
api_url, headers, enterprise_id, org_id, "UNAFFILIATED", verify=verify
8187
)
8288

8389

@@ -101,8 +107,15 @@ def main() -> None:
101107
"Authorization": f"token {github_pat}",
102108
}
103109

110+
# Optional custom CA bundle / cert file
111+
verify: str | bool | None = True
112+
try:
113+
verify = util.validate_ca_bundle(args.ca_bundle)
114+
except FileNotFoundError:
115+
return
116+
104117
enterprise_id = enterprises.get_enterprise_id(
105-
api_url, args.enterprise_slug, headers
118+
api_url, args.enterprise_slug, headers, verify=verify
106119
)
107120

108121
unmanaged_orgs = util.read_lines(args.unmanaged_orgs)
@@ -111,7 +124,9 @@ def main() -> None:
111124
LOG.error("⨯ No unmanaged organizations found to demote admin from")
112125
return
113126

114-
demote_admin(api_url, headers, enterprise_id, unmanaged_orgs, args.progress)
127+
demote_admin(
128+
api_url, headers, enterprise_id, unmanaged_orgs, args.progress, verify=verify
129+
)
115130

116131

117132
if __name__ == "__main__": # pragma: no cover

org-admin-promote.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ def add_args(parser: ArgumentParser) -> None:
7777
action="store_true",
7878
help="Enable debug logging",
7979
)
80+
parser.add_argument(
81+
"--ca-bundle",
82+
required=False,
83+
help="Path to a custom CA certificate or bundle (PEM) for TLS verification (self-signed/internal roots)",
84+
)
8085

8186

8287
def write_unmanaged_orgs(path: str, unmanaged_org_ids: List[str]) -> None:
@@ -95,17 +100,20 @@ def promote_all(
95100
orgs_subset: set[str] | None,
96101
unmanaged_out: str,
97102
progress: bool = False,
103+
verify: str | bool | None = True,
98104
) -> List[str] | None:
99105
"""
100106
Promote the enterprise admin to owner on all unmanaged organizations.
101107
102108
If a subset of organizations is provided, only attempt to promote on those; otherwise, try on all organizations.
103109
"""
104-
total_org_count = organizations.get_total_count(api_url, enterprise_slug, headers)
110+
total_org_count = organizations.get_total_count(
111+
api_url, enterprise_slug, headers, verify=verify
112+
)
105113
if total_org_count == 0:
106114
LOG.warning("⚠️ No organizations found.")
107115
return []
108-
orgs = organizations.list_orgs(api_url, enterprise_slug, headers)
116+
orgs = organizations.list_orgs(api_url, enterprise_slug, headers, verify=verify)
109117
if len(orgs) != total_org_count:
110118
LOG.error(
111119
"⨯ Total count of organizations returned by the query is different from the expected count"
@@ -118,7 +126,9 @@ def promote_all(
118126

119127
LOG.info("Organizations in scope: {}".format(len(orgs)))
120128

121-
enterprise_id = enterprises.get_enterprise_id(api_url, enterprise_slug, headers)
129+
enterprise_id = enterprises.get_enterprise_id(
130+
api_url, enterprise_slug, headers, verify=verify
131+
)
122132
unmanaged_orgs = [
123133
org["node"]["id"] for org in orgs if not org["node"]["viewerCanAdminister"]
124134
]
@@ -134,7 +144,9 @@ def promote_all(
134144
org_id, i + 1, len(unmanaged_orgs)
135145
)
136146
)
137-
enterprises.promote_admin(api_url, headers, enterprise_id, org_id, "OWNER")
147+
enterprises.promote_admin(
148+
api_url, headers, enterprise_id, org_id, "OWNER", verify=verify
149+
)
138150
write_unmanaged_orgs(unmanaged_out, unmanaged_orgs)
139151
LOG.info("Promoted on organizations: {}".format(len(unmanaged_orgs)))
140152
return unmanaged_orgs
@@ -159,6 +171,13 @@ def main() -> None:
159171
"Authorization": f"token {github_pat}",
160172
}
161173

174+
# Optional custom CA bundle / cert file
175+
verify: str | bool | None = True
176+
try:
177+
verify = util.validate_ca_bundle(args.ca_bundle)
178+
except FileNotFoundError:
179+
return
180+
162181
orgs_subset_list: list[str] | None = (
163182
args.orgs or util.read_lines(args.orgs_file) or None
164183
)
@@ -174,14 +193,17 @@ def main() -> None:
174193
orgs_subset,
175194
args.unmanaged_orgs,
176195
args.progress,
196+
verify=verify,
177197
)
178198
is None
179199
):
180200
LOG.error("⨯ Promotion failed")
181201
return
182202

183203
# Refresh and write all orgs CSV after promotions
184-
orgs = organizations.list_orgs(api_url, args.enterprise_slug, headers)
204+
orgs = organizations.list_orgs(
205+
api_url, args.enterprise_slug, headers, verify=verify
206+
)
185207

186208
# Filter by the list of orgs, if provided
187209
if orgs_subset is not None:

src/enterprises.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010

1111

1212
def get_enterprise_id(
13-
api_endpoint: str, enterprise_slug: str, headers: dict[str, str]
13+
api_endpoint: str,
14+
enterprise_slug: str,
15+
headers: dict[str, str],
16+
verify: str | bool | None = True,
1417
) -> str:
1518
"""
1619
Get the ID of an enterprise by its slug.
@@ -28,6 +31,7 @@ def get_enterprise_id(
2831
api_endpoint,
2932
json={"query": enterprise_query},
3033
headers=add_request_headers(headers),
34+
verify=verify,
3135
)
3236
response.raise_for_status()
3337
return response.json()["data"]["enterprise"]["id"]
@@ -58,6 +62,7 @@ def promote_admin(
5862
enterprise_id: str,
5963
org_id: str,
6064
role: str,
65+
verify: str | bool | None = True,
6166
) -> dict[str, Any]:
6267
"""
6368
Promote an enterprise admin to an organization owner.
@@ -67,6 +72,7 @@ def promote_admin(
6772
api_endpoint,
6873
json={"query": promote_query},
6974
headers=add_request_headers(headers),
75+
verify=verify,
7076
)
7177
response.raise_for_status()
7278
return response.json()

0 commit comments

Comments
 (0)