Skip to content

Commit 4fbbb91

Browse files
Merge pull request #978 from NHSDigital/feature/made14-NRL-1351-perms-manager-cli
NRL-1351 Add Permissions Manager CLI script
2 parents f5da0e9 + 2ac7c6b commit 4fbbb91

File tree

1 file changed

+257
-0
lines changed

1 file changed

+257
-0
lines changed

scripts/manage_permissions.py

Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
#!/usr/bin/env python
2+
3+
import json
4+
import os
5+
6+
import fire
7+
from aws_session_assume import get_boto_session
8+
9+
from nrlf.core.constants import TYPE_ATTRIBUTES
10+
11+
nrl_env = os.getenv("ENV", "dev")
12+
nrl_auth_bucket_name = os.getenv(
13+
"NRL_AUTH_BUCKET_NAME", f"nhsd-nrlf--{nrl_env}-authorization-store"
14+
)
15+
16+
COMPARE_AND_CONFIRM = (
17+
True
18+
if nrl_env == "prod"
19+
else os.getenv("COMPARE_AND_CONFIRM", "false").lower() == "true"
20+
)
21+
22+
print(f"Using NRL environment: {nrl_env}")
23+
print(f"Using NRL auth bucket: {nrl_auth_bucket_name}")
24+
print(f"Compare and confirm mode: {COMPARE_AND_CONFIRM}")
25+
print()
26+
27+
28+
def _get_s3_client():
29+
boto_session = get_boto_session(nrl_env)
30+
return boto_session.client("s3")
31+
32+
33+
def _list_s3_keys(file_key_prefix: str) -> list[str]:
34+
s3 = _get_s3_client()
35+
paginator = s3.get_paginator("list_objects_v2")
36+
37+
params = {
38+
"Bucket": nrl_auth_bucket_name,
39+
"Prefix": file_key_prefix,
40+
}
41+
42+
page_iterator = paginator.paginate(**params)
43+
keys: list[str] = []
44+
for page in page_iterator:
45+
if "Contents" in page:
46+
keys.extend([item["Key"] for item in page["Contents"]])
47+
48+
if not keys:
49+
print(f"No files found with prefix: {file_key_prefix}")
50+
return []
51+
52+
return keys
53+
54+
55+
def _get_perms_from_s3(file_key: str) -> str | None:
56+
s3 = _get_s3_client()
57+
58+
try:
59+
item = s3.get_object(Bucket=nrl_auth_bucket_name, Key=file_key)
60+
except s3.exceptions.NoSuchKey:
61+
print(f"Permissions file {file_key} does not exist in the bucket.")
62+
return None
63+
64+
if "Body" not in item:
65+
print(f"No body found for permissions file {file_key}.")
66+
return None
67+
68+
return item["Body"].read().decode("utf-8")
69+
70+
71+
def list_apps() -> None:
72+
"""
73+
List all applications in the NRL environment.
74+
"""
75+
keys = _list_s3_keys("")
76+
apps = {key.split("/")[0] for key in keys}
77+
78+
if not apps:
79+
print("No applications found in the bucket.")
80+
return
81+
82+
print(f"There are {len(apps)} apps in {nrl_env} env:")
83+
for app in apps:
84+
print(f"- {app}")
85+
86+
87+
def list_orgs(app_id: str) -> None:
88+
"""
89+
List all organizations for a specific application.
90+
"""
91+
keys = _list_s3_keys(f"{app_id}/")
92+
orgs = [
93+
key.split("/", maxsplit=2)[1].removesuffix(".json")
94+
for key in keys
95+
if key and key.endswith(".json")
96+
]
97+
98+
if not orgs:
99+
print(f"No organizations found for app {app_id}.")
100+
101+
print(f"There are {len(orgs)} organizations for app {app_id}:")
102+
for org in orgs:
103+
print(f"- {org}")
104+
105+
106+
def list_allowed_types() -> None:
107+
"""
108+
List all pointer types that can be used in permissions.
109+
"""
110+
print("The following pointer-types are allowed:")
111+
112+
for pointer_type, attributes in TYPE_ATTRIBUTES.items():
113+
print("- %-45s (%s)" % (pointer_type, attributes["display"][:45]))
114+
115+
116+
def show_perms(app_id: str, org_ods: str) -> None:
117+
"""
118+
Show the permissions for a specific application and organization.
119+
"""
120+
perms = _get_perms_from_s3(f"{app_id}/{org_ods}.json")
121+
122+
if not perms:
123+
print(f"No permissions file found for {app_id}/{org_ods}.")
124+
return
125+
126+
pointertype_perms = json.loads(perms)
127+
if not pointertype_perms:
128+
print(f"No pointer-types found in permission file for {app_id}/{org_ods}.")
129+
return
130+
131+
type_data = {
132+
pointertype_perm: TYPE_ATTRIBUTES.get(
133+
pointertype_perm, {"display": "Unknown type"}
134+
)
135+
for pointertype_perm in pointertype_perms
136+
}
137+
types = [
138+
"%-45s (%s)" % (type_data[pointertype_perm]["display"][:44], pointertype_perm)
139+
for pointertype_perm in pointertype_perms
140+
]
141+
142+
print(f"{app_id}/{org_ods} is allowed to access these pointer-types:")
143+
for type_display in types:
144+
print(f"- {type_display}")
145+
146+
147+
def set_perms(app_id: str, org_ods: str, *pointer_types: str) -> None:
148+
"""
149+
Set permissions for an application and organization to access specific pointer types.
150+
"""
151+
if not pointer_types:
152+
print(
153+
"No pointer types provided. Please specify at least one pointer type or use clear_perms command."
154+
)
155+
return
156+
157+
if len(pointer_types) == 1 and pointer_types[0] == "all":
158+
print("Setting permissions for access to all pointer types.")
159+
pointer_types = tuple(TYPE_ATTRIBUTES.keys())
160+
161+
unknown_types = [pt for pt in pointer_types if pt not in TYPE_ATTRIBUTES]
162+
if unknown_types:
163+
print(f"Error: Unknown pointer types provided: {', '.join(unknown_types)}")
164+
print()
165+
return
166+
167+
permissions_content = json.dumps(pointer_types, indent=4)
168+
169+
if COMPARE_AND_CONFIRM:
170+
current_perms = _get_perms_from_s3(f"{app_id}/{org_ods}.json")
171+
if current_perms == permissions_content:
172+
print(
173+
f"No changes needed for {app_id}/{org_ods}. Current permissions match the new ones."
174+
)
175+
return
176+
177+
print()
178+
print(f"Current permissions for {app_id}/{org_ods}:")
179+
print(current_perms if current_perms else "No permissions set.")
180+
181+
print()
182+
print("New permissions to be set to:")
183+
print(f"{permissions_content}")
184+
185+
print()
186+
confirm = (
187+
input("Do you want to proceed with these changes? (yes/NO): ")
188+
.strip()
189+
.lower()
190+
)
191+
if confirm != "yes":
192+
print("Operation cancelled at user request.")
193+
return
194+
195+
s3 = _get_s3_client()
196+
s3.put_object(
197+
Bucket=nrl_auth_bucket_name,
198+
Key=f"{app_id}/{org_ods}.json",
199+
Body=permissions_content,
200+
ContentType="application/json",
201+
)
202+
203+
print()
204+
print(f"Set permissions for {app_id}/{org_ods}")
205+
206+
print()
207+
show_perms(app_id, org_ods)
208+
209+
210+
def clear_perms(app_id: str, org_ods: str) -> None:
211+
"""
212+
Clear permissions for an application and organization.
213+
This will remove all permissions for the specified app and org.
214+
"""
215+
if COMPARE_AND_CONFIRM:
216+
current_perms = _get_perms_from_s3(f"{app_id}/{org_ods}.json")
217+
if not current_perms or current_perms == "[]":
218+
print(
219+
f"No need to clear permissions for {app_id}/{org_ods} as it currently has no permissions set."
220+
)
221+
return
222+
223+
print()
224+
print(f"Current permissions for {app_id}/{org_ods}:")
225+
print(current_perms)
226+
227+
print()
228+
confirm = (
229+
input("Are you SURE you want to clear these permissions? (yes/NO): ")
230+
.strip()
231+
.lower()
232+
)
233+
if confirm != "yes":
234+
print("Operation cancelled at user request.")
235+
return
236+
237+
s3 = _get_s3_client()
238+
s3.put_object(
239+
Bucket=nrl_auth_bucket_name,
240+
Key=f"{app_id}/{org_ods}.json",
241+
Body="[]",
242+
ContentType="application/json",
243+
)
244+
print(f"Cleared permissions for {app_id}/{org_ods}.")
245+
246+
247+
if __name__ == "__main__":
248+
fire.Fire(
249+
{
250+
"list_apps": list_apps,
251+
"list_orgs": list_orgs,
252+
"list_allowed_types": list_allowed_types,
253+
"show_perms": show_perms,
254+
"set_perms": set_perms,
255+
"clear_perms": clear_perms,
256+
}
257+
)

0 commit comments

Comments
 (0)