1+
2+ import importlib
13import json
4+ import logging
25from typing import Dict , List
36from urllib .parse import quote
47
1013from ...tools import prepare_url_prefix
1114from ..access_control import Role , User , UserAlreadyExists
1215
16+ try :
17+ jmespath = importlib .import_module ("jmespath" )
18+ except ModuleNotFoundError :
19+ logging .error ("Module 'jmespath' is not installed." )
20+
21+ logger = logging .getLogger ('services.oauth2' )
22+
1323
1424class OAuth2 (SSOAuth ):
1525 LOGIN_URL = 'auth/oauth2/login'
1626 LOGOUT_URL = 'auth/oauth2/logout'
1727 sso = True
1828
1929 class OAuth2Config (BaseAuth .Config ):
20- pass
30+ roles_path : str
31+
32+ def __init__ (self , roles_path = None ):
33+ self .roles_path = roles_path
34+
35+ def get_roles_path (self ):
36+ return self .roles_path
2137
2238 @staticmethod
2339 def enabled ():
2440 return mgr .get_module_option ('sso_oauth2' )
2541
26- def to_dict (self ) -> 'BaseAuth.Config' :
27- return self .OAuth2Config ()
42+ def to_dict (self ) -> 'OAuth2Config' :
43+ return {
44+ 'roles_path' : self .roles_path
45+ }
2846
2947 @classmethod
3048 def from_dict (cls , s_dict : OAuth2Config ) -> 'OAuth2' :
31- # pylint: disable=unused-argument
32- return OAuth2 ()
49+ try :
50+ return OAuth2 (s_dict ['roles_path' ])
51+ except KeyError :
52+ return OAuth2 ({})
3353
3454 @classmethod
3555 def get_auth_name (cls ):
@@ -66,25 +86,30 @@ def set_token_payload(cls, token):
6686
6787 @classmethod
6888 def get_user_roles (cls ):
69- roles : List [Role ] = []
89+ roles : List [str ] = []
7090 user_roles : List [Role ] = []
7191 try :
7292 jwt_payload = cherrypy .request .jwt_payload
7393 except AttributeError :
74- raise cherrypy .HTTPError ()
75-
76- # check for client roes
77- if 'resource_access' in jwt_payload :
78- # Find the first value where the key is not 'account'
79- roles = next ((value ['roles' ] for key , value in jwt_payload ['resource_access' ].items ()
80- if key != "account" ), user_roles )
81- # check for global roles
82- elif 'realm_access' in jwt_payload :
83- roles = next ((value ['roles' ] for _ , value in jwt_payload ['realm_access' ].items ()),
84- user_roles )
94+ raise cherrypy .HTTPError (401 )
95+
96+ if jmespath and hasattr (mgr .SSO_DB .config , 'roles_path' ):
97+ logger .debug ("Using 'roles_path' to fetch roles" )
98+ roles = jmespath .search (mgr .SSO_DB .config .roles_path , jwt_payload )
99+ # e.g Keycloak
100+ elif 'resource_access' in jwt_payload or 'realm_access' in jwt_payload :
101+ logger .debug ("Using 'resource_access' or 'realm_access' to fetch roles" )
102+ roles = jmespath .search (
103+ "resource_access.*[?@!='account'].roles[] || realm_access.roles[]" ,
104+ jwt_payload )
105+ elif 'roles' in jwt_payload :
106+ logger .debug ("Using 'roles' to fetch roles" )
107+ roles = jwt_payload ['roles' ]
108+ if isinstance (roles , str ):
109+ roles = [roles ]
85110 else :
86- raise cherrypy .HTTPError ()
87- user_roles = Role .map_to_system_roles (roles )
111+ raise cherrypy .HTTPError (403 )
112+ user_roles = Role .map_to_system_roles (roles or [] )
88113 return user_roles
89114
90115 @classmethod
@@ -106,6 +131,7 @@ def _create_user(cls):
106131 user = mgr .ACCESS_CTRL_DB .create_user (
107132 jwt_payload ['sub' ], None , jwt_payload ['name' ], jwt_payload ['email' ])
108133 except UserAlreadyExists :
134+ logger .debug ("User already exists" )
109135 user = mgr .ACCESS_CTRL_DB .get_user (jwt_payload ['sub' ])
110136 user .set_roles (cls .get_user_roles ())
111137 # set user last update to token time issued
0 commit comments