1
1
"""
2
2
Tests for the SAML frontend module src/frontends/saml2.py.
3
3
"""
4
+ import copy
4
5
import json
5
6
from base64 import urlsafe_b64encode
6
7
from collections import Counter
28
29
BASE_URL = "https://op.example.com"
29
30
CLIENT_ID = "client1"
30
31
CLIENT_SECRET = "client_secret"
31
-
32
+ EXTRA_CLAIMS = {
33
+ "eduPersonScopedAffiliation" : {
34
+ "saml" : ["eduPersonScopedAffiliation" ],
35
+ "openid" : ["eduperson_scoped_affiliation" ],
36
+ },
37
+ "eduPersonPrincipalName" : {
38
+ "saml" : ["eduPersonPrincipalName" ],
39
+ "openid" : ["eduperson_principal_name" ],
40
+ },
41
+ }
42
+ EXTRA_SCOPES = {
43
+ "eduperson" : ["eduperson_scoped_affiliation" , "eduperson_principal_name" ]
44
+ }
32
45
33
46
class TestOpenIDConnectFrontend (object ):
34
47
@pytest .fixture
@@ -43,17 +56,48 @@ def frontend_config(self, signing_key_path):
43
56
44
57
return config
45
58
59
+ @pytest .fixture
60
+ def frontend_config_with_extra_scopes (self , signing_key_path ):
61
+ config = {
62
+ "signing_key_path" : signing_key_path ,
63
+ "provider" : {
64
+ "response_types_supported" : ["code" , "id_token" , "code id_token token" ],
65
+ "scopes_supported" : ["openid" , "email" ],
66
+ "extra_scopes" : EXTRA_SCOPES ,
67
+ },
68
+ }
69
+
70
+ return config
71
+
46
72
def create_frontend (self , frontend_config ):
47
73
# will use in-memory storage
48
74
instance = OpenIDConnectFrontend (lambda ctx , req : None , INTERNAL_ATTRIBUTES ,
49
75
frontend_config , BASE_URL , "oidc_frontend" )
50
76
instance .register_endpoints (["foo_backend" ])
51
77
return instance
52
78
79
+ def create_frontend_with_extra_scopes (self , frontend_config_with_extra_scopes ):
80
+ # will use in-memory storage
81
+ internal_attributes_with_extra_scopes = copy .deepcopy (INTERNAL_ATTRIBUTES )
82
+ internal_attributes_with_extra_scopes ["attributes" ].update (EXTRA_CLAIMS )
83
+ instance = OpenIDConnectFrontend (
84
+ lambda ctx , req : None ,
85
+ internal_attributes_with_extra_scopes ,
86
+ frontend_config_with_extra_scopes ,
87
+ BASE_URL ,
88
+ "oidc_frontend_with_extra_scopes" ,
89
+ )
90
+ instance .register_endpoints (["foo_backend" ])
91
+ return instance
92
+
53
93
@pytest .fixture
54
94
def frontend (self , frontend_config ):
55
95
return self .create_frontend (frontend_config )
56
96
97
+ @pytest .fixture
98
+ def frontend_with_extra_scopes (self , frontend_config_with_extra_scopes ):
99
+ return self .create_frontend_with_extra_scopes (frontend_config_with_extra_scopes )
100
+
57
101
@pytest .fixture
58
102
def authn_req (self ):
59
103
state = "my_state"
@@ -65,6 +109,23 @@ def authn_req(self):
65
109
nonce = nonce , claims = claims_req )
66
110
return req
67
111
112
+ @pytest .fixture
113
+ def authn_req_with_extra_scopes (self ):
114
+ state = "my_state"
115
+ nonce = "nonce"
116
+ redirect_uri = "https://client.example.com"
117
+ claims_req = ClaimsRequest (id_token = Claims (email = None ))
118
+ req = AuthorizationRequest (
119
+ client_id = CLIENT_ID ,
120
+ state = state ,
121
+ scope = "openid email eduperson" ,
122
+ response_type = "id_token" ,
123
+ redirect_uri = redirect_uri ,
124
+ nonce = nonce ,
125
+ claims = claims_req ,
126
+ )
127
+ return req
128
+
68
129
def insert_client_in_client_db (self , frontend , redirect_uri , extra_metadata = {}):
69
130
frontend .provider .clients = {
70
131
CLIENT_ID : {"response_types" : ["code" , "id_token" ],
@@ -73,7 +134,12 @@ def insert_client_in_client_db(self, frontend, redirect_uri, extra_metadata={}):
73
134
frontend .provider .clients [CLIENT_ID ].update (extra_metadata )
74
135
75
136
def insert_user_in_user_db (self , frontend , user_id ):
76
- frontend .
user_db [
user_id ]
= {
"email" :
"[email protected] " }
137
+ user_attributes = AttributeMapper (frontend .internal_attributes ).to_internal (
138
+ "saml" , USERS ["testuser1" ]
139
+ )
140
+ frontend .user_db [user_id ] = frontend .converter .from_internal (
141
+ "openid" , user_attributes
142
+ )
77
143
78
144
def create_access_token (self , frontend , user_id , auth_req ):
79
145
sub = frontend .provider .authz_state .get_subject_identifier ('pairwise' , user_id , 'client1.example.com' )
@@ -84,9 +150,13 @@ def create_access_token(self, frontend, user_id, auth_req):
84
150
def setup_for_authn_response (self , context , frontend , auth_req ):
85
151
context .state [frontend .name ] = {"oidc_request" : auth_req .to_urlencoded ()}
86
152
87
- auth_info = AuthenticationInformation (PASSWORD , "2015-09-30T12:21:37Z" , "unittest_idp.xml" )
153
+ auth_info = AuthenticationInformation (
154
+ PASSWORD , "2015-09-30T12:21:37Z" , "unittest_idp.xml"
155
+ )
88
156
internal_response = InternalData (auth_info = auth_info )
89
- internal_response .attributes = AttributeMapper (INTERNAL_ATTRIBUTES ).to_internal ("saml" , USERS ["testuser1" ])
157
+ internal_response .attributes = AttributeMapper (
158
+ frontend .internal_attributes
159
+ ).to_internal ("saml" , USERS ["testuser1" ])
90
160
internal_response .subject_id = USERS ["testuser1" ]["eduPersonTargetedID" ][0 ]
91
161
92
162
return internal_response
@@ -123,6 +193,28 @@ def test_handle_authn_request(self, context, frontend, authn_req):
123
193
assert internal_req .subject_type == 'pairwise'
124
194
assert internal_req .attributes == ["mail" ]
125
195
196
+ def test_handle_authn_request_with_extra_scopes (
197
+ self , context , frontend_with_extra_scopes , authn_req_with_extra_scopes
198
+ ):
199
+ client_name = "test client"
200
+ self .insert_client_in_client_db (
201
+ frontend_with_extra_scopes ,
202
+ authn_req_with_extra_scopes ["redirect_uri" ],
203
+ {"client_name" : client_name },
204
+ )
205
+
206
+ context .request = dict (parse_qsl (authn_req_with_extra_scopes .to_urlencoded ()))
207
+ frontend_with_extra_scopes .handle_authn_request (context )
208
+ internal_req = frontend_with_extra_scopes ._handle_authn_request (context )
209
+ assert internal_req .requester == authn_req_with_extra_scopes ["client_id" ]
210
+ assert internal_req .requester_name == [{"lang" : "en" , "text" : client_name }]
211
+ assert internal_req .subject_type == "pairwise"
212
+ assert sorted (internal_req .attributes ) == [
213
+ "eduPersonPrincipalName" ,
214
+ "eduPersonScopedAffiliation" ,
215
+ "mail" ,
216
+ ]
217
+
126
218
def test_get_approved_attributes (self , frontend ):
127
219
claims_req = ClaimsRequest (id_token = Claims (email = None ), userinfo = Claims (userinfo_claim = None ))
128
220
req = AuthorizationRequest (scope = "openid profile" , claims = claims_req )
@@ -199,9 +291,64 @@ def test_provider_configuration_endpoint(self, context, frontend):
199
291
200
292
provider_config_dict = provider_config .to_dict ()
201
293
scopes_supported = provider_config_dict .pop ("scopes_supported" )
294
+ assert "eduperson" not in scopes_supported
202
295
assert all (scope in scopes_supported for scope in ["openid" , "email" ])
203
296
assert provider_config_dict == expected_capabilities
204
297
298
+ def test_provider_configuration_endpoint_with_extra_scopes (
299
+ self , context , frontend_with_extra_scopes
300
+ ):
301
+ expected_capabilities = {
302
+ "response_types_supported" : ["code" , "id_token" , "code id_token token" ],
303
+ "jwks_uri" : "{}/{}/jwks" .format (BASE_URL , frontend_with_extra_scopes .name ),
304
+ "authorization_endpoint" : "{}/foo_backend/{}/authorization" .format (
305
+ BASE_URL , frontend_with_extra_scopes .name
306
+ ),
307
+ "token_endpoint" : "{}/{}/token" .format (
308
+ BASE_URL , frontend_with_extra_scopes .name
309
+ ),
310
+ "userinfo_endpoint" : "{}/{}/userinfo" .format (
311
+ BASE_URL , frontend_with_extra_scopes .name
312
+ ),
313
+ "id_token_signing_alg_values_supported" : ["RS256" ],
314
+ "response_modes_supported" : ["fragment" , "query" ],
315
+ "subject_types_supported" : ["pairwise" ],
316
+ "claim_types_supported" : ["normal" ],
317
+ "claims_parameter_supported" : True ,
318
+ "request_parameter_supported" : False ,
319
+ "request_uri_parameter_supported" : False ,
320
+ "claims_supported" : [
321
+ "email" ,
322
+ "eduperson_scoped_affiliation" ,
323
+ "eduperson_principal_name" ,
324
+ ],
325
+ "grant_types_supported" : ["authorization_code" , "implicit" ],
326
+ "issuer" : BASE_URL ,
327
+ "require_request_uri_registration" : False ,
328
+ "token_endpoint_auth_methods_supported" : ["client_secret_basic" ],
329
+ "version" : "3.0" ,
330
+ }
331
+
332
+ http_response = frontend_with_extra_scopes .provider_config (context )
333
+ provider_config = ProviderConfigurationResponse ().deserialize (
334
+ http_response .message , "json"
335
+ )
336
+
337
+ provider_config_dict = provider_config .to_dict ()
338
+ scopes_supported = provider_config_dict .pop ("scopes_supported" )
339
+ assert all (
340
+ scope in scopes_supported for scope in ["openid" , "email" , "eduperson" ]
341
+ )
342
+
343
+ # FIXME why is this needed?
344
+ expected_capabilities ["claims_supported" ] = set (
345
+ expected_capabilities ["claims_supported" ]
346
+ )
347
+ provider_config_dict ["claims_supported" ] = set (
348
+ provider_config_dict ["claims_supported" ]
349
+ )
350
+ assert provider_config_dict == expected_capabilities
351
+
205
352
def test_jwks (self , context , frontend ):
206
353
http_response = frontend .jwks (context )
207
354
jwks = json .loads (http_response .message )
@@ -307,7 +454,7 @@ def test_token_endpoint_with_invalid_code(self, context, frontend, authn_req):
307
454
assert parsed_message ["error" ] == "invalid_grant"
308
455
309
456
def test_userinfo_endpoint (self , context , frontend , authn_req ):
310
- user_id = "user1"
457
+ user_id = USERS [ "testuser1" ][ "eduPersonTargetedID" ][ 0 ]
311
458
self .insert_client_in_client_db (frontend , authn_req ["redirect_uri" ])
312
459
self .insert_user_in_user_db (frontend , user_id )
313
460
@@ -317,14 +464,28 @@ def test_userinfo_endpoint(self, context, frontend, authn_req):
317
464
context .request_authorization = "Bearer {}" .format (token )
318
465
response = frontend .userinfo_endpoint (context )
319
466
parsed = OpenIDSchema ().deserialize (response .message , "json" )
320
- assert parsed [
"email" ]
== "[email protected] "
321
-
322
- def test_userinfo_without_token (self , context , frontend ):
467
+ assert parsed [
"email" ]
== "[email protected] "
468
+
469
+ def test_userinfo_endpoint_with_extra_scopes (
470
+ self , context , frontend_with_extra_scopes , authn_req_with_extra_scopes
471
+ ):
472
+ user_id = USERS ["testuser1" ]["eduPersonTargetedID" ][0 ]
473
+ self .insert_client_in_client_db (
474
+ frontend_with_extra_scopes , authn_req_with_extra_scopes ["redirect_uri" ]
475
+ )
476
+ self .insert_user_in_user_db (frontend_with_extra_scopes , user_id )
477
+
478
+ token = self .create_access_token (
479
+ frontend_with_extra_scopes , user_id , authn_req_with_extra_scopes
480
+ )
323
481
context .request = {}
324
- context .request_authorization = ""
325
-
326
- response = frontend .userinfo_endpoint (context )
327
- assert response .status == "401 Unauthorized"
482
+ context .request_authorization = "Bearer {}" .format (token )
483
+ response = frontend_with_extra_scopes .userinfo_endpoint (context )
484
+ parsed = OpenIDSchema ().deserialize (response .message , "json" )
485
+ assert parsed [
"email" ]
== "[email protected] "
486
+ # TODO
487
+ assert parsed [
"eduperson_scoped_affiliation" ]
== [
"[email protected] " ]
488
+ assert parsed [
"eduperson_principal_name" ]
== [
"[email protected] " ]
328
489
329
490
def test_userinfo_with_invalid_token (self , context , frontend ):
330
491
context .request = {}
@@ -333,32 +494,41 @@ def test_userinfo_with_invalid_token(self, context, frontend):
333
494
response = frontend .userinfo_endpoint (context )
334
495
assert response .status == "401 Unauthorized"
335
496
336
- def test_full_flow (self , context , frontend ):
497
+ def test_full_flow (self , context , frontend_with_extra_scopes ):
337
498
redirect_uri = "https://client.example.com/redirect"
338
499
response_type = "code id_token token"
339
500
mock_callback = Mock ()
340
- frontend .auth_req_callback_func = mock_callback
501
+ frontend_with_extra_scopes .auth_req_callback_func = mock_callback
341
502
# discovery
342
- http_response = frontend .provider_config (context )
503
+ http_response = frontend_with_extra_scopes .provider_config (context )
343
504
provider_config = ProviderConfigurationResponse ().deserialize (http_response .message , "json" )
344
505
345
506
# client registration
346
507
registration_request = RegistrationRequest (redirect_uris = [redirect_uri ], response_types = [response_type ])
347
508
context .request = registration_request .to_dict ()
348
- http_response = frontend .client_registration (context )
509
+ http_response = frontend_with_extra_scopes .client_registration (context )
349
510
registration_response = RegistrationResponse ().deserialize (http_response .message , "json" )
350
511
351
512
# authentication request
352
- authn_req = AuthorizationRequest (redirect_uri = redirect_uri , client_id = registration_response ["client_id" ],
353
- response_type = response_type , scope = "openid email" , state = "state" ,
354
- nonce = "nonce" )
513
+ authn_req = AuthorizationRequest (
514
+ redirect_uri = redirect_uri ,
515
+ client_id = registration_response ["client_id" ],
516
+ response_type = response_type ,
517
+ scope = "openid email eduperson" ,
518
+ state = "state" ,
519
+ nonce = "nonce" ,
520
+ )
355
521
context .request = dict (parse_qsl (authn_req .to_urlencoded ()))
356
- frontend .handle_authn_request (context )
522
+ frontend_with_extra_scopes .handle_authn_request (context )
357
523
assert mock_callback .call_count == 1
358
524
359
525
# fake authentication response from backend
360
- internal_response = self .setup_for_authn_response (context , frontend , authn_req )
361
- http_response = frontend .handle_authn_response (context , internal_response )
526
+ internal_response = self .setup_for_authn_response (
527
+ context , frontend_with_extra_scopes , authn_req
528
+ )
529
+ http_response = frontend_with_extra_scopes .handle_authn_response (
530
+ context , internal_response
531
+ )
362
532
authn_resp = AuthorizationResponse ().deserialize (urlparse (http_response .message ).fragment , "urlencoded" )
363
533
assert "code" in authn_resp
364
534
assert "access_token" in authn_resp
@@ -370,14 +540,16 @@ def test_full_flow(self, context, frontend):
370
540
basic_auth = urlsafe_b64encode (credentials .encode ("utf-8" )).decode ("utf-8" )
371
541
context .request_authorization = "Basic {}" .format (basic_auth )
372
542
373
- http_response = frontend .token_endpoint (context )
543
+ http_response = frontend_with_extra_scopes .token_endpoint (context )
374
544
parsed = AccessTokenResponse ().deserialize (http_response .message , "json" )
375
545
assert "access_token" in parsed
376
546
assert "id_token" in parsed
377
547
378
548
# userinfo request
379
549
context .request = {}
380
550
context .request_authorization = "Bearer {}" .format (parsed ["access_token" ])
381
- http_response = frontend .userinfo_endpoint (context )
551
+ http_response = frontend_with_extra_scopes .userinfo_endpoint (context )
382
552
parsed = OpenIDSchema ().deserialize (http_response .message , "json" )
383
553
assert "email" in parsed
554
+ assert "eduperson_principal_name" in parsed
555
+ assert "eduperson_scoped_affiliation" in parsed
0 commit comments