@@ -40,13 +40,12 @@ def test_auth(authenticated_user: AuthenticatedUser = Depends(authenticate_user)
40
40
from fastapi_oidc import discovery
41
41
from fastapi_oidc .types import IDToken
42
42
43
+ # class AuthBearer(HTTPBearer):
44
+ # async def __call__(self, request: Request):
45
+ # return await super().__call__(request)
43
46
44
- class AuthBearer (HTTPBearer ):
45
- async def __call__ (self , request : Request ):
46
- return await super ().__call__ (request )
47
47
48
-
49
- class EmptyOAuth2 (OAuth2 ):
48
+ class OAuth2Facade (OAuth2 ):
50
49
async def __call__ (self , request : Request ) -> Optional [str ]:
51
50
return None
52
51
@@ -92,13 +91,15 @@ def __init__(
92
91
)
93
92
94
93
self .oidc_scheme = OpenIdConnect (
95
- openIdConnectUrl = openid_connect_url , auto_error = auto_error
94
+ openIdConnectUrl = openid_connect_url ,
95
+ auto_error = False ,
96
96
)
97
97
self .password_scheme = OAuth2PasswordBearer (
98
98
tokenUrl = self .discover .token_url (oidc_discoveries ),
99
99
scopes = scopes ,
100
+ auto_error = False ,
100
101
)
101
- self .implicit_scheme = EmptyOAuth2 (
102
+ self .implicit_scheme = OAuth2Facade (
102
103
flows = OAuthFlows (
103
104
implicit = {
104
105
"authorizationUrl" : self .discover .authorization_url (
@@ -108,22 +109,17 @@ def __init__(
108
109
}
109
110
),
110
111
scheme_name = "OAuth2ImplicitBearer" ,
111
- auto_error = auto_error ,
112
+ auto_error = False ,
112
113
)
113
114
self .authcode_scheme = OAuth2AuthorizationCodeBearer (
114
115
authorizationUrl = self .discover .authorization_url (oidc_discoveries ),
115
116
tokenUrl = self .discover .token_url (oidc_discoveries ),
116
117
# refreshUrl=self.discover.refresh_url(oidc_discoveries),
117
118
scopes = scopes ,
119
+ auto_error = False ,
118
120
)
119
121
120
- def authenticate_user (
121
- self ,
122
- security_scopes : SecurityScopes ,
123
- authorization_credentials : Optional [HTTPAuthorizationCredentials ] = Depends (
124
- AuthBearer (auto_error = False )
125
- ),
126
- ) -> Optional [IDToken ]:
122
+ def authenticate_user (self , auto_error = None ):
127
123
"""Validate and parse OIDC ID token against issuer in config.
128
124
Note this function caches the signatures and algorithms of the issuing server
129
125
for signature_cache_ttl seconds.
@@ -133,53 +129,64 @@ def authenticate_user(
133
129
scenes by Depends.
134
130
135
131
Return:
136
- Dict : Dictionary with IDToken information
132
+ IDToken : Dictionary with IDToken information
137
133
138
134
raises:
139
135
HTTPException(status_code=401, detail=f"Unauthorized: {err}")
140
136
"""
141
137
142
- if authorization_credentials is None :
143
- if self .auto_error :
144
- raise HTTPException (
145
- status .HTTP_401_UNAUTHORIZED , detail = "Missing bearer token"
146
- )
147
- else :
148
- return None
138
+ if auto_error is None :
139
+ auto_error = self .auto_error
149
140
150
- oidc_discoveries = self .discover .auth_server (
151
- openid_connect_url = self .openid_connect_url
152
- )
153
- key = self .discover .public_keys (oidc_discoveries )
154
- algorithms = self .discover .signing_algos (oidc_discoveries )
155
-
156
- try :
157
- id_token = jwt .decode (
158
- authorization_credentials .credentials ,
159
- key ,
160
- algorithms ,
161
- audience = self .audience ,
162
- issuer = self .issuer ,
163
- options = {
164
- # Disabled at_hash check since we aren't using the access token
165
- "verify_at_hash" : False ,
166
- "verify_iss" : self .issuer is not None ,
167
- "verify_aud" : self .audience is not None ,
168
- },
141
+ def authenticate_user_ (
142
+ security_scopes : SecurityScopes ,
143
+ authorization_credentials : Optional [HTTPAuthorizationCredentials ] = Depends (
144
+ HTTPBearer (auto_error = auto_error )
145
+ ),
146
+ ) -> Optional [IDToken ]:
147
+ if authorization_credentials is None :
148
+ if auto_error :
149
+ raise HTTPException (
150
+ status .HTTP_401_UNAUTHORIZED , detail = "Missing bearer token"
151
+ )
152
+ else :
153
+ return None
154
+
155
+ oidc_discoveries = self .discover .auth_server (
156
+ openid_connect_url = self .openid_connect_url
169
157
)
170
- except (ExpiredSignatureError , JWTError , JWTClaimsError ) as err :
171
- if self .auto_error :
172
- raise HTTPException (status_code = 401 , detail = f"Unauthorized: { err } " )
173
- else :
174
- return None
175
-
176
- if not set (security_scopes .scopes ).issubset (id_token ["scope" ].split (" " )):
177
- if self .auto_error :
178
- raise HTTPException (
179
- status .HTTP_401_UNAUTHORIZED ,
180
- detail = f"""Missing scope token, only have { id_token ["scopes" ]} """ ,
158
+ key = self .discover .public_keys (oidc_discoveries )
159
+ algorithms = self .discover .signing_algos (oidc_discoveries )
160
+
161
+ try :
162
+ id_token = jwt .decode (
163
+ authorization_credentials .credentials ,
164
+ key ,
165
+ algorithms ,
166
+ audience = self .audience ,
167
+ issuer = self .issuer ,
168
+ options = {
169
+ # Disabled at_hash check since we aren't using the access token
170
+ "verify_at_hash" : False ,
171
+ "verify_iss" : self .issuer is not None ,
172
+ "verify_aud" : self .audience is not None ,
173
+ },
181
174
)
182
- else :
183
- return None
184
-
185
- return self .idtoken_model (** id_token )
175
+ except (ExpiredSignatureError , JWTError , JWTClaimsError ) as err :
176
+ if auto_error :
177
+ raise HTTPException (status_code = 401 , detail = f"Unauthorized: { err } " )
178
+ else :
179
+ return None
180
+
181
+ if not set (security_scopes .scopes ).issubset (id_token ["scope" ].split (" " )):
182
+ if auto_error :
183
+ raise HTTPException (
184
+ status .HTTP_401_UNAUTHORIZED ,
185
+ detail = f"""Missing scope token, only have { id_token ["scopes" ]} """ ,
186
+ )
187
+ else :
188
+ return None
189
+
190
+ return self .idtoken_model (** id_token )
191
+
192
+ return authenticate_user_
0 commit comments