@@ -45,31 +45,35 @@ def _list_s3_keys(file_key_prefix: str) -> list[str]:
4545 return keys
4646
4747
48- def _get_perms_from_s3 (file_key : str ) -> list [ str ] :
48+ def _get_perms_from_s3 (file_key : str ) -> str | None :
4949 s3 = _get_s3_client ()
5050
51- item = s3 .get_object (Bucket = nrl_auth_bucket_name , Key = file_key )
51+ try :
52+ item = s3 .get_object (Bucket = nrl_auth_bucket_name , Key = file_key )
53+ except s3 .exceptions .NoSuchKey :
54+ print (f"Permissions file { file_key } does not exist in the bucket." )
55+ return None
5256
53- if not item :
54- print (f"No permissions found for { file_key } ." )
55- return []
57+ if "Body" not in item :
58+ print (f"No body found for permissions file { file_key } ." )
59+ return None
5660
5761 return item ["Body" ].read ().decode ("utf-8" )
5862
5963
60- def list_apps () -> set [str ]:
64+ def list_apps () -> list [str ]:
6165 keys = _list_s3_keys ("" )
62- apps = set ( [key .split ("/" )[0 ] for key in keys ])
66+ apps = [key .split ("/" )[0 ] for key in keys ]
6367
6468 if not apps :
6569 print ("No applications found in the bucket." )
66- return set ()
70+ return []
6771
6872 print (f"Listing all { len (apps )} apps in bucket..." )
6973 return apps
7074
7175
72- def list_orgs (app_id : str ) -> set [str ]:
76+ def list_orgs (app_id : str ) -> list [str ]:
7377 keys = _list_s3_keys (f"{ app_id } /" )
7478 orgs = [
7579 key .split ("/" , maxsplit = 2 )[1 ].removesuffix (".json" )
@@ -79,13 +83,13 @@ def list_orgs(app_id: str) -> set[str]:
7983
8084 if not orgs :
8185 print (f"No organizations found for app { app_id } ." )
82- return set ()
86+ return []
8387
8488 print (f"Listing { len (orgs )} organizations for { app_id } ..." )
8589 return orgs
8690
8791
88- def get_perms (app_id : str , org_ods : str ) -> list [str ]:
92+ def get (app_id : str , org_ods : str ) -> list [str ]:
8993 perms = _get_perms_from_s3 (f"{ app_id } /{ org_ods } .json" )
9094
9195 if not perms :
@@ -112,10 +116,39 @@ def get_perms(app_id: str, org_ods: str) -> list[str]:
112116 return types
113117
114118
115- def set_perms (app_id : str , org_ods : str , pointer_types : list [str ]) -> list [str ]:
116- # This function would contain the logic to set permissions
117- print (f"Setting permissions for { app_id } /{ org_ods } to { pointer_types } ..." )
118- return []
119+ def set (app_id : str , org_ods : str , * pointer_types : str ) -> list [str ]:
120+ if not pointer_types :
121+ print (
122+ "No pointer types provided. Please specify at least one pointer type or use clear_perms command."
123+ )
124+ return []
125+
126+ unknown_types = [pt for pt in pointer_types if pt not in TYPE_ATTRIBUTES ]
127+ if unknown_types :
128+ print (f"Warning: Unknown pointer types provided: { ', ' .join (unknown_types )} " )
129+ print ()
130+
131+ permissions_content = json .dumps (pointer_types , indent = 4 )
132+ s3 = _get_s3_client ()
133+ s3 .put_object (
134+ Bucket = nrl_auth_bucket_name ,
135+ Key = f"{ app_id } /{ org_ods } .json" ,
136+ Body = permissions_content ,
137+ ContentType = "application/json" ,
138+ )
139+
140+ return get (app_id , org_ods )
141+
142+
143+ def clear (app_id : str , org_ods : str ) -> None :
144+ s3 = _get_s3_client ()
145+ s3 .put_object (
146+ Bucket = nrl_auth_bucket_name ,
147+ Key = f"{ app_id } /{ org_ods } .json" ,
148+ Body = "[]" ,
149+ ContentType = "application/json" ,
150+ )
151+ print (f"Cleared permissions for { app_id } /{ org_ods } ." )
119152
120153
121154if __name__ == "__main__" :
0 commit comments