55import pytest
66from cryptojwt import JWT
77from cryptojwt import as_unicode
8+ from cryptojwt .key_jar import build_keyjar
89from cryptojwt .utils import as_bytes
10+ from oidcmsg .oauth2 import TokenIntrospectionRequest
911from oidcmsg .oidc import AccessTokenRequest
10-
11- from oidcendpoint . oidc . token_coop import TokenCoop
12+ from oidcmsg . oidc import AuthorizationRequest
13+ from oidcmsg . time_util import utc_time_sans_frac
1214
1315from oidcendpoint .client_authn import ClientSecretPost
1416from oidcendpoint .client_authn import UnknownOrNoAuthnMethod
1517from oidcendpoint .client_authn import WrongAuthnMethod
1618from oidcendpoint .client_authn import verify_client
1719from oidcendpoint .endpoint_context import EndpointContext
18- from oidcendpoint .oauth2 .introspection import Introspection
1920from oidcendpoint .oauth2 .authorization import Authorization
21+ from oidcendpoint .oauth2 .introspection import Introspection
22+ from oidcendpoint .oidc .token_coop import TokenCoop
2023from oidcendpoint .session import setup_session
2124from oidcendpoint .user_authn .authn_context import INTERNETPROTOCOLPASSWORD
2225from oidcendpoint .user_info import UserInfo
23- from oidcmsg .oauth2 import TokenIntrospectionRequest
24- from oidcmsg .oidc import AuthorizationRequest
2526
2627KEYDEFS = [
2728 {"type" : "RSA" , "key" : "" , "use" : ["sig" ]},
@@ -252,7 +253,7 @@ def test_do_response(self):
252253 "iss" ,
253254 "jti" ,
254255 }
255- assert _payload ["active" ] == True
256+ assert _payload ["active" ] is True
256257
257258 def test_do_response_no_token (self ):
258259 _context = self .introspection_endpoint .endpoint_context
@@ -294,3 +295,89 @@ def test_access_token(self):
294295 assert "sub" in _resp_args
295296 assert _resp_args ["active" ]
296297 assert _resp_args ["scope" ] == "openid"
298+
299+ def test_jwt_unknown_key (self ):
300+ _keyjar = build_keyjar (KEYDEFS )
301+
302+ _jwt = JWT (
303+ _keyjar ,
304+ iss = self .introspection_endpoint .endpoint_context .issuer ,
305+ lifetime = 3600
306+ )
307+
308+ _jwt .with_jti = True
309+
310+ _payload = {"sub" : "subject_id" }
311+ _token = _jwt .pack (_payload , aud = "client_1" )
312+ _context = self .introspection_endpoint .endpoint_context
313+
314+ _req = self .introspection_endpoint .parse_request (
315+ {
316+ "token" : _token ,
317+ "client_id" : "client_1" ,
318+ "client_secret" : _context .cdb ["client_1" ]["client_secret" ],
319+ }
320+ )
321+
322+ _req = self .introspection_endpoint .parse_request (_req )
323+ _resp = self .introspection_endpoint .process_request (_req )
324+ assert _resp ["response_args" ]["active" ] is False
325+
326+ def test_expired_access_token (self ):
327+ _context = self .introspection_endpoint .endpoint_context
328+
329+ session_id = setup_session (
330+ _context ,
331+ AUTH_REQ ,
332+ uid = "user" ,
333+ acr = INTERNETPROTOCOLPASSWORD ,
334+ )
335+ _token_request = TOKEN_REQ_DICT .copy ()
336+ _token_request ["code" ] = _context .sdb [session_id ]["code" ]
337+ _context .sdb .update (session_id , user = "diana" )
338+
339+ _req = self .token_endpoint .parse_request (_token_request )
340+ _resp = self .token_endpoint .process_request (request = _req )
341+
342+ _info = self .token_endpoint .endpoint_context .sdb [_resp ["response_args" ]["access_token" ]]
343+ _info ['expires_at' ] = utc_time_sans_frac () - 1000
344+ self .token_endpoint .endpoint_context .sdb [_resp ["response_args" ]["access_token" ]] = _info
345+
346+ _req = self .introspection_endpoint .parse_request (
347+ {
348+ "token" : _resp ["response_args" ]["access_token" ],
349+ "client_id" : "client_1" ,
350+ "client_secret" : _context .cdb ["client_1" ]["client_secret" ],
351+ }
352+ )
353+ _resp = self .introspection_endpoint .process_request (_req )
354+ assert _resp ["response_args" ]["active" ] is False
355+
356+ def test_revoked_access_token (self ):
357+ _context = self .introspection_endpoint .endpoint_context
358+
359+ session_id = setup_session (
360+ _context ,
361+ AUTH_REQ ,
362+ uid = "user" ,
363+ acr = INTERNETPROTOCOLPASSWORD ,
364+ )
365+ _token_request = TOKEN_REQ_DICT .copy ()
366+ _token_request ["code" ] = _context .sdb [session_id ]["code" ]
367+ _context .sdb .update (session_id , user = "diana" )
368+
369+ _req = self .token_endpoint .parse_request (_token_request )
370+ _resp = self .token_endpoint .process_request (request = _req )
371+
372+ self .token_endpoint .endpoint_context .sdb .revoke_session (
373+ token = _resp ["response_args" ]["access_token" ])
374+
375+ _req = self .introspection_endpoint .parse_request (
376+ {
377+ "token" : _resp ["response_args" ]["access_token" ],
378+ "client_id" : "client_1" ,
379+ "client_secret" : _context .cdb ["client_1" ]["client_secret" ],
380+ }
381+ )
382+ _resp = self .introspection_endpoint .process_request (_req )
383+ assert _resp ["response_args" ]["active" ] is False
0 commit comments