Skip to content

Commit b3860b8

Browse files
rohec00kiemon5ter
authored andcommitted
Added tests
1 parent a56db95 commit b3860b8

File tree

2 files changed

+210
-3
lines changed

2 files changed

+210
-3
lines changed

src/satosa/backends/idpy_oidc.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
"""
22
OIDC/OAuth2 backend module.
33
"""
4-
import logging
54
from datetime import datetime
5+
import logging
66
from urllib.parse import urlparse
77

88
from idpyoidc.client.oauth2.stand_alone_client import StandAloneClient
99
from idpyoidc.server.user_authn.authn_context import UNSPECIFIED
1010

11-
import satosa.logging_util as lu
1211
from satosa.backends.base import BackendModule
1312
from satosa.internal import AuthenticationInformation
1413
from satosa.internal import InternalData
14+
import satosa.logging_util as lu
1515
from ..exception import SATOSAAuthenticationError
1616
from ..exception import SATOSAError
1717
from ..response import Redirect
@@ -74,7 +74,7 @@ def register_endpoints(self):
7474
if not redirect_path:
7575
raise SATOSAError("Missing path in redirect uri")
7676
redirect_path = urlparse(redirect_path[0]).path
77-
url_map.append(("^%s$" % redirect_path.lstrip("/"), self.response_endpoint))
77+
url_map.append((f"^{redirect_path.lstrip('/')}$", self.response_endpoint))
7878
return url_map
7979

8080
def response_endpoint(self, context, *args):
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
import json
2+
import re
3+
import time
4+
from unittest.mock import Mock
5+
from urllib.parse import parse_qsl
6+
from urllib.parse import urlparse
7+
8+
from cryptojwt.key_jar import build_keyjar
9+
from idpyoidc.client.defaults import DEFAULT_KEY_DEFS
10+
from idpyoidc.client.oauth2.stand_alone_client import StandAloneClient
11+
from idpyoidc.message.oidc import AuthorizationResponse
12+
from idpyoidc.message.oidc import IdToken
13+
from oic.oic import AuthorizationRequest
14+
import pytest
15+
import responses
16+
17+
from satosa.backends.idpy_oidc import IdpyOIDCBackend
18+
from satosa.context import Context
19+
from satosa.internal import InternalData
20+
from satosa.response import Response
21+
22+
ISSUER = "https://provider.example.com"
23+
CLIENT_ID = "test_client"
24+
CLIENT_BASE_URL = "https://client.test.com"
25+
NONCE = "the nonce"
26+
27+
28+
class TestIdpyOIDCBackend(object):
29+
@pytest.fixture
30+
def backend_config(self):
31+
return {
32+
"client": {
33+
"base_url": CLIENT_BASE_URL,
34+
"client_id": CLIENT_ID,
35+
"client_type": "oidc",
36+
"client_secret": "ZJYCqe3GGRvdrudKyZS0XhGv_Z45DuKhCUk0gBR1vZk",
37+
"application_type": "web",
38+
"application_name": "SATOSA Test",
39+
"contacts": ["[email protected]"],
40+
"response_types_supported": ["code"],
41+
"response_type": "code id_token token",
42+
"scope": "openid foo",
43+
"key_conf": {"key_defs": DEFAULT_KEY_DEFS},
44+
"jwks_uri": f"{CLIENT_BASE_URL}/jwks.json",
45+
"provider_info": {
46+
"issuer": ISSUER,
47+
"authorization_endpoint": f"{ISSUER}/authn",
48+
"token_endpoint": f"{ISSUER}/token",
49+
"userinfo_endpoint": f"{ISSUER}/user",
50+
"jwks_uri": f"{ISSUER}/static/jwks"
51+
}
52+
}
53+
}
54+
55+
@pytest.fixture
56+
def internal_attributes(self):
57+
return {
58+
"attributes": {
59+
"givenname": {"openid": ["given_name"]},
60+
"mail": {"openid": ["email"]},
61+
"edupersontargetedid": {"openid": ["sub"]},
62+
"surname": {"openid": ["family_name"]}
63+
}
64+
}
65+
66+
@pytest.fixture(autouse=True)
67+
@responses.activate
68+
def create_backend(self, internal_attributes, backend_config):
69+
base_url = backend_config['client']['base_url']
70+
self.issuer_keys = build_keyjar(DEFAULT_KEY_DEFS)
71+
with responses.RequestsMock() as rsps:
72+
rsps.add(
73+
responses.GET,
74+
backend_config['client']['provider_info']['jwks_uri'],
75+
body=self.issuer_keys.export_jwks_as_json(),
76+
status=200,
77+
content_type="application/json")
78+
79+
self.oidc_backend = IdpyOIDCBackend(Mock(), internal_attributes, backend_config,
80+
base_url, "oidc")
81+
82+
@pytest.fixture
83+
def userinfo(self):
84+
return {
85+
"given_name": "Test",
86+
"family_name": "Devsson",
87+
"email": "[email protected]",
88+
"sub": "username"
89+
}
90+
91+
def test_client(self, backend_config):
92+
assert isinstance(self.oidc_backend.client, StandAloneClient)
93+
# 3 signing keys. One RSA, one EC and one symmetric
94+
assert len(self.oidc_backend.client.context.keyjar.get_signing_key()) == 3
95+
assert self.oidc_backend.client.context.jwks_uri == backend_config['client']['jwks_uri']
96+
97+
def assert_expected_attributes(self, attr_map, user_claims, actual_attributes):
98+
expected_attributes = {}
99+
for out_attr, in_mapping in attr_map["attributes"].items():
100+
expected_attributes[out_attr] = [user_claims[in_mapping["openid"][0]]]
101+
102+
assert actual_attributes == expected_attributes
103+
104+
def setup_token_endpoint(self, userinfo):
105+
_client = self.oidc_backend.client
106+
signing_key = self.issuer_keys.get_signing_key(key_type='RSA')[0]
107+
signing_key.alg = "RS256"
108+
id_token_claims = {
109+
"iss": ISSUER,
110+
"sub": userinfo["sub"],
111+
"aud": CLIENT_ID,
112+
"nonce": NONCE,
113+
"exp": time.time() + 3600,
114+
"iat": time.time()
115+
}
116+
id_token = IdToken(**id_token_claims).to_jwt([signing_key], algorithm=signing_key.alg)
117+
token_response = {
118+
"access_token": "SlAV32hkKG",
119+
"token_type": "Bearer",
120+
"refresh_token": "8xLOxBtZp8",
121+
"expires_in": 3600,
122+
"id_token": id_token
123+
}
124+
responses.add(responses.POST,
125+
_client.context.provider_info['token_endpoint'],
126+
body=json.dumps(token_response),
127+
status=200,
128+
content_type="application/json")
129+
130+
def setup_userinfo_endpoint(self, userinfo):
131+
responses.add(responses.GET,
132+
self.oidc_backend.client.context.provider_info['userinfo_endpoint'],
133+
body=json.dumps(userinfo),
134+
status=200,
135+
content_type="application/json")
136+
137+
@pytest.fixture
138+
def incoming_authn_response(self):
139+
_context = self.oidc_backend.client.context
140+
oidc_state = "my state"
141+
_uri = _context.claims.get_usage("redirect_uris")[0]
142+
_request = AuthorizationRequest(
143+
redirect_uri=_uri,
144+
response_type="code",
145+
client_id=_context.get_client_id(),
146+
scope=_context.claims.get_usage("scope"),
147+
nonce=NONCE
148+
)
149+
_context.cstate.set(oidc_state, {"iss": _context.issuer})
150+
_context.cstate.bind_key(NONCE, oidc_state)
151+
_context.cstate.update(oidc_state, _request)
152+
153+
response = AuthorizationResponse(
154+
code="F+R4uWbN46U+Bq9moQPC4lEvRd2De4o=",
155+
state=oidc_state,
156+
iss=_context.issuer,
157+
nonce=NONCE
158+
)
159+
return response.to_dict()
160+
161+
def test_register_endpoints(self):
162+
_uri = self.oidc_backend.client.context.claims.get_usage("redirect_uris")[0]
163+
redirect_uri_path = urlparse(_uri).path.lstrip('/')
164+
url_map = self.oidc_backend.register_endpoints()
165+
regex, callback = url_map[0]
166+
assert re.search(regex, redirect_uri_path)
167+
assert callback == self.oidc_backend.response_endpoint
168+
169+
def test_translate_response_to_internal_response(self, userinfo):
170+
internal_response = self.oidc_backend._translate_response(userinfo, ISSUER)
171+
assert internal_response.subject_id == userinfo["sub"]
172+
self.assert_expected_attributes(self.oidc_backend.internal_attributes, userinfo,
173+
internal_response.attributes)
174+
175+
@responses.activate
176+
def test_response_endpoint(self, context, userinfo, incoming_authn_response):
177+
self.setup_token_endpoint(userinfo)
178+
self.setup_userinfo_endpoint(userinfo)
179+
180+
response_context = Context()
181+
response_context.request = incoming_authn_response
182+
response_context.state = context.state
183+
184+
self.oidc_backend.response_endpoint(response_context)
185+
186+
args = self.oidc_backend.auth_callback_func.call_args[0]
187+
assert isinstance(args[0], Context)
188+
assert isinstance(args[1], InternalData)
189+
self.assert_expected_attributes(self.oidc_backend.internal_attributes, userinfo,
190+
args[1].attributes)
191+
192+
def test_start_auth_redirects_to_provider_authorization_endpoint(self, context):
193+
_client = self.oidc_backend.client
194+
auth_response = self.oidc_backend.start_auth(context, None)
195+
assert isinstance(auth_response, Response)
196+
197+
login_url = auth_response.message
198+
parsed = urlparse(login_url)
199+
assert login_url.startswith(_client.context.provider_info["authorization_endpoint"])
200+
auth_params = dict(parse_qsl(parsed.query))
201+
assert auth_params["scope"] == " ".join(_client.context.claims.get_usage("scope"))
202+
assert auth_params["response_type"] == _client.context.claims.get_usage("response_types")[0]
203+
assert auth_params["client_id"] == _client.client_id
204+
assert auth_params["redirect_uri"] == _client.context.claims.get_usage("redirect_uris")[0]
205+
assert "state" in auth_params
206+
assert "nonce" in auth_params
207+

0 commit comments

Comments
 (0)