|
1 | 1 | #!/usr/bin/env python |
2 | 2 |
|
| 3 | +import json |
3 | 4 | import os |
4 | 5 |
|
5 | 6 | import fire |
6 | 7 | from aws_session_assume import get_boto_session |
7 | 8 |
|
| 9 | +from nrlf.core.constants import TYPE_ATTRIBUTES |
| 10 | + |
8 | 11 | nrl_env = os.getenv("ENV", "dev") |
9 | 12 | nrl_auth_bucket_name = os.getenv( |
10 | 13 | "NRL_AUTH_BUCKET_NAME", f"nhsd-nrlf--{nrl_env}-authorization-store" |
11 | 14 | ) |
12 | 15 |
|
| 16 | +print(f"Using NRL environment: {nrl_env}") |
| 17 | +print(f"Using NRL auth bucket: {nrl_auth_bucket_name}") |
| 18 | +print() |
13 | 19 |
|
14 | | -def _list_s3_files(file_key_prefix: str) -> list[str]: |
15 | | - # This function would contain the logic to list files in S3 |
16 | | - print(f"Listing files in S3 with prefix {file_key_prefix}...") |
17 | | - return [] |
| 20 | + |
| 21 | +def _get_s3_client(): |
| 22 | + boto_session = get_boto_session(nrl_env) |
| 23 | + return boto_session.client("s3") |
| 24 | + |
| 25 | + |
| 26 | +def _list_s3_keys(file_key_prefix: str) -> list[str]: |
| 27 | + s3 = _get_s3_client() |
| 28 | + paginator = s3.get_paginator("list_objects_v2") |
| 29 | + |
| 30 | + params = { |
| 31 | + "Bucket": nrl_auth_bucket_name, |
| 32 | + "Prefix": file_key_prefix, |
| 33 | + } |
| 34 | + |
| 35 | + page_iterator = paginator.paginate(**params) |
| 36 | + keys = [] |
| 37 | + for page in page_iterator: |
| 38 | + if "Contents" in page: |
| 39 | + keys.extend([item["Key"] for item in page["Contents"]]) |
| 40 | + |
| 41 | + if not keys: |
| 42 | + print(f"No files found with prefix: {file_key_prefix}") |
| 43 | + return [] |
| 44 | + |
| 45 | + return keys |
18 | 46 |
|
19 | 47 |
|
20 | 48 | def _get_perms_from_s3(file_key: str) -> list[str]: |
21 | | - # This function would contain the logic to get permissions from S3 |
22 | | - print(f"Getting permissions from S3 for {file_key}...") |
23 | | - return [] |
| 49 | + s3 = _get_s3_client() |
24 | 50 |
|
| 51 | + item = s3.get_object(Bucket=nrl_auth_bucket_name, Key=file_key) |
25 | 52 |
|
26 | | -def list_apps() -> list[str]: |
27 | | - # This function would contain the logic to list apps |
28 | | - print("Listing all apps...") |
29 | | - return [] |
| 53 | + if not item: |
| 54 | + print(f"No permissions found for {file_key}.") |
| 55 | + return [] |
30 | 56 |
|
| 57 | + return item["Body"].read().decode("utf-8") |
31 | 58 |
|
32 | | -def list_orgs(app_id: str) -> list[str]: |
33 | | - # This function would contain the logic to list organizations |
34 | | - print(f"Listing organizations for {app_id}...") |
35 | | - return [] |
36 | 59 |
|
| 60 | +def list_apps() -> set[str]: |
| 61 | + keys = _list_s3_keys("") |
| 62 | + apps = set([key.split("/")[0] for key in keys]) |
| 63 | + |
| 64 | + if not apps: |
| 65 | + print("No applications found in the bucket.") |
| 66 | + return set() |
| 67 | + |
| 68 | + print(f"Listing all {len(apps)} apps in bucket...") |
| 69 | + return apps |
| 70 | + |
| 71 | + |
| 72 | +def list_orgs(app_id: str) -> set[str]: |
| 73 | + keys = _list_s3_keys(f"{app_id}/") |
| 74 | + orgs = [ |
| 75 | + key.split("/", maxsplit=2)[1].removesuffix(".json") |
| 76 | + for key in keys |
| 77 | + if key and key.endswith(".json") |
| 78 | + ] |
| 79 | + |
| 80 | + if not orgs: |
| 81 | + print(f"No organizations found for app {app_id}.") |
| 82 | + return set() |
| 83 | + |
| 84 | + print(f"Listing {len(orgs)} organizations for {app_id}...") |
| 85 | + return orgs |
| 86 | + |
| 87 | + |
| 88 | +def get_perms(app_id: str, org_ods: str) -> list[str]: |
| 89 | + perms = _get_perms_from_s3(f"{app_id}/{org_ods}.json") |
| 90 | + |
| 91 | + if not perms: |
| 92 | + print(f"No permissions file found for {app_id}/{org_ods}.") |
| 93 | + return [] |
| 94 | + |
| 95 | + pointertype_perms = json.loads(perms) |
| 96 | + if not pointertype_perms: |
| 97 | + print(f"No pointer-types found in permission file for {app_id}/{org_ods}.") |
| 98 | + return [] |
| 99 | + |
| 100 | + type_data = { |
| 101 | + pointertype_perm: TYPE_ATTRIBUTES.get( |
| 102 | + pointertype_perm, {"display": "Unknown type"} |
| 103 | + ) |
| 104 | + for pointertype_perm in pointertype_perms |
| 105 | + } |
| 106 | + types = [ |
| 107 | + f"{type_data[pointertype_perm]['display']} ({pointertype_perm})" |
| 108 | + for pointertype_perm in pointertype_perms |
| 109 | + ] |
37 | 110 |
|
38 | | -def get(app_id: str, org_ods: str) -> list[str]: |
39 | | - # This function would contain the logic to show current permissions |
40 | 111 | print(f"The current permissions for {app_id}/{org_ods} are:") |
41 | | - return [] |
| 112 | + return types |
42 | 113 |
|
43 | 114 |
|
44 | | -def set(app_id: str, org_ods: str, pointer_types: list[str]) -> list[str]: |
| 115 | +def set_perms(app_id: str, org_ods: str, pointer_types: list[str]) -> list[str]: |
45 | 116 | # This function would contain the logic to set permissions |
46 | 117 | print(f"Setting permissions for {app_id}/{org_ods} to {pointer_types}...") |
47 | 118 | return [] |
|
0 commit comments