@@ -102,9 +102,15 @@ def validate_token(self, token: str) -> Dict[str, Any]:
102102 logger .error (f"Error validating token: { e } " )
103103 return {}
104104
105- def get_user_roles (self , token_info : dict ) -> list [str ]:
105+ def get_user_roles_from_token_info (self , token_info : dict ) -> list [str ]:
106106 """
107- Extract roles from a Keycloak token.
107+ Extract roles from token introspection data.
108+
109+ Args:
110+ token_info: Token introspection response
111+
112+ Returns:
113+ List of role names
108114 """
109115 roles : list [str ] = []
110116
@@ -122,19 +128,85 @@ def get_user_roles(self, token_info: dict) -> list[str]:
122128
123129 return roles
124130
125- def get_user_organizations (self , token_info : dict ) -> List [Dict [str , Any ]]:
131+ def get_user_organizations_from_token_info (self , token_info : dict ) -> List [Dict [str , Any ]]:
126132 """
127- Get the organizations a user belongs to from their token info.
133+ Get organizations from token introspection data.
134+
135+ Args:
136+ token_info: Token introspection response
137+
138+ Returns:
139+ List of organization information
140+ """
141+ try :
142+ # Get organization info from resource_access or attributes
143+ resource_access = token_info .get ("resource_access" , {})
144+ client_roles = resource_access .get (self .client_id , {}).get ("roles" , [])
145+
146+ # Extract organization info from roles
147+ organizations = []
148+ for role in client_roles :
149+ if role .startswith ("org_" ):
150+ parts = role .split ("_" )
151+ if len (parts ) >= 3 :
152+ org_id = parts [1 ]
153+ role_name = parts [2 ]
154+ organizations .append ({"organization_id" : org_id , "role" : role_name })
155+
156+ return organizations
157+ except Exception as e :
158+ logger .error (f"Error getting user organizations: { e } " )
159+ return []
160+
161+ def get_user_roles (self , token : str ) -> list [str ]:
162+ """
163+ Extract roles from a Keycloak token.
164+
165+ Args:
166+ token: The user's token
167+
168+ Returns:
169+ List of role names
170+ """
171+ try :
172+ # Decode the token to get user info
173+ token_info = self .keycloak_openid .decode_token (token )
174+
175+ roles : list [str ] = []
176+
177+ # Extract realm roles
178+ realm_access = token_info .get ("realm_access" , {})
179+ if realm_access and "roles" in realm_access :
180+ roles .extend (realm_access ["roles" ]) # type: ignore[no-any-return]
181+
182+ # Extract client roles
183+ resource_access = token_info .get ("resource_access" , {})
184+ client_id = settings .KEYCLOAK_CLIENT_ID
185+ if resource_access and client_id in resource_access :
186+ client_roles = resource_access [client_id ].get ("roles" , [])
187+ roles .extend (client_roles )
188+
189+ return roles
190+ except Exception as e :
191+ logger .error (f"Error getting user roles: { e } " )
192+ return []
193+
194+ def get_user_organizations (self , token : str ) -> List [Dict [str , Any ]]:
195+ """
196+ Get the organizations a user belongs to from their token.
128197 This assumes that organization information is stored in the token
129198 as client roles or in user attributes.
130199
131200 Args:
132- token_info : The decoded token information
201+ token : The user's token
133202
134203 Returns:
135204 List of organization information
136205 """
137206 try :
207+ # Decode the token to get user info
208+ token_info = self .keycloak_openid .decode_token (token )
209+
138210 # Get organization info from resource_access or attributes
139211 # This implementation depends on how organizations are represented in Keycloak
140212 # This is a simplified example - adjust based on your Keycloak configuration
0 commit comments