22from oidcmsg .message import SINGLE_REQUIRED_STRING
33from oidcmsg .oidc import AuthorizationResponse
44
5- from oidcservice .oidc .pkce import add_code_challenge
6- from oidcservice .oidc .pkce import add_code_verifier
7- from oidcservice .oidc .pkce import put_state_in_post_args
5+ from oidcservice .oidc .add_on .pkce import add_code_challenge
6+ from oidcservice .oidc .add_on .pkce import add_code_verifier
7+ from oidcservice .oidc .add_on .pkce import add_pkce_support
8+ from oidcservice .oidc .add_on .pkce import put_state_in_post_args
89from oidcservice .service import Service
910from oidcservice .service import init_services
1011from oidcservice .service_context import ServiceContext
@@ -36,12 +37,12 @@ def test_add_code_challenge_default_values():
3637 service = DummyService (service_context , state_db = InMemoryStateDataBase ())
3738 _state = State (iss = 'Issuer' )
3839 service .state_db .set ('state' , _state .to_json ())
39- spec = add_code_challenge ({'state' : 'state' }, service )
40+ request_args , _ = add_code_challenge ({'state' : 'state' }, service )
4041
4142 # default values are length:64 method:S256
42- assert set (spec .keys ()) == {'code_challenge' , 'code_challenge_method' ,
43+ assert set (request_args .keys ()) == {'code_challenge' , 'code_challenge_method' ,
4344 'state' }
44- assert spec ['code_challenge_method' ] == 'S256'
45+ assert request_args ['code_challenge_method' ] == 'S256'
4546
4647 request_args = add_code_verifier ({}, service , state = 'state' )
4748 assert len (request_args ['code_verifier' ]) == 64
@@ -61,10 +62,10 @@ def test_add_code_challenge_spec_values():
6162 _state = State (iss = 'Issuer' )
6263 service .state_db .set ('state' , _state .to_json ())
6364
64- spec = add_code_challenge ({'state' : 'state' }, service )
65- assert set (spec .keys ()) == {'code_challenge' , 'code_challenge_method' ,
65+ request_args , _ = add_code_challenge ({'state' : 'state' }, service )
66+ assert set (request_args .keys ()) == {'code_challenge' , 'code_challenge_method' ,
6667 'state' }
67- assert spec ['code_challenge_method' ] == 'S384'
68+ assert request_args ['code_challenge_method' ] == 'S384'
6869
6970 request_args = add_code_verifier ({}, service , state = 'state' )
7071 assert len (request_args ['code_verifier' ]) == 128
@@ -82,7 +83,7 @@ def test_authorization_and_pkce():
8283 state_db = InMemoryStateDataBase (),
8384 service_context = service_context )
8485 service .post_construct .append (add_code_challenge )
85- request = service .construct_request ()
86+ request , _ = service .construct_request ()
8687 assert set (request .keys ()) == {'client_id' , 'code_challenge' ,
8788 'code_challenge_method' , 'state' , 'nonce' ,
8889 'redirect_uri' , 'response_type' , 'scope' }
@@ -102,7 +103,7 @@ def test_access_token_and_pkce():
102103 authz_service = service_factory ('Authorization' , ['oidc' ], state_db = db ,
103104 service_context = service_context )
104105 authz_service .post_construct .append (add_code_challenge )
105- request = authz_service .construct_request ()
106+ request , _ = authz_service .construct_request ()
106107 _state = request ['state' ]
107108
108109 auth_response = AuthorizationResponse (code = 'access code' )
@@ -134,28 +135,17 @@ def test_pkce_config():
134135 service_definitions = {
135136 'authorization' : {
136137 'class' : 'oidcservice.oidc.authorization.Authorization' ,
137- 'kwargs' : {},
138- 'post_functions' : [
139- {
140- 'function' : 'oidcservice.oidc.pkce.add_code_challenge'
141- }
142- ]
138+ 'kwargs' : {}
143139 },
144140 'access_token' : {
145141 'class' : 'oidcservice.oidc.access_token.AccessToken' ,
146- 'kwargs' : {},
147- 'pre_functions' : [
148- {
149- 'function' : 'oidcservice.oidc.pkce.put_state_in_post_args'
150- }
151- ],
152- 'post_functions' : [
153- {'function' : 'oidcservice.oidc.pkce.add_code_verifier' }
154- ]
142+ 'kwargs' : {}
155143 }
156144 }
157145 service = init_services (service_definitions , service_context , db )
158146
147+ add_pkce_support (service , 64 , 'S256' )
148+
159149 request = service ['authorization' ].construct_request ()
160150 _state = request ['state' ]
161151
@@ -166,49 +156,3 @@ def test_pkce_config():
166156 assert set (request .keys ()) == {'client_id' , 'redirect_uri' , 'grant_type' ,
167157 'client_secret' , 'code_verifier' , 'code' ,
168158 'state' }
169-
170- # class TestPKCE(object):
171- # def test_pkce_create(self):
172- # _cli = Client(
173- # config={'code_challenge': {'method': 'S256', 'length': 64}})
174- # args, cv = _cli.add_code_challenge()
175- # assert args['code_challenge_method'] == 'S256'
176- # assert _eq(list(args.keys()),
177- # ['code_challenge_method', 'code_challenge'])
178- #
179- # def test_pkce_verify_256(self, session_db_factory):
180- # _cli = Client(
181- # config={'code_challenge': {'method': 'S256', 'length': 64}})
182- # args, cv = _cli.add_code_challenge()
183- #
184- # authn_broker = AuthnBroker()
185- # authn_broker.add("UNDEFINED", DummyAuthn(None, "username"))
186- # _prov = Provider("as",
187- # session_db_factory('https://connect-op.heroku.com'),
188- # {},
189- # authn_broker, Implicit(), verify_client)
190- #
191- # assert _prov.verify_code_challenge(cv, args['code_challenge']) is True
192- # assert _prov.verify_code_challenge(cv, args['code_challenge'],
193- # 'S256') is True
194- # resp = _prov.verify_code_challenge('XXX', args['code_challenge'])
195- # assert isinstance(resp, Response)
196- # assert resp.info()['status_code'] == 401
197- #
198- # def test_pkce_verify_512(self, session_db_factory):
199- # _cli = Client(
200- # config={'code_challenge': {'method': 'S512', 'length': 96}})
201- # args, cv = _cli.add_code_challenge()
202- #
203- # authn_broker = AuthnBroker()
204- # authn_broker.add("UNDEFINED", DummyAuthn(None, "username"))
205- # _prov = Provider("as",
206- # session_db_factory('https://connect-op.heroku.com'),
207- # {},
208- # authn_broker, Implicit(), verify_client)
209- #
210- # assert _prov.verify_code_challenge(cv, args['code_challenge'],
211- # 'S512') is True
212- # resp = _prov.verify_code_challenge('XXX', args['code_challenge'])
213- # assert isinstance(resp, Response)
214- # assert resp.info()['status_code'] == 401
0 commit comments