Skip to content
This repository was archived by the owner on Jun 1, 2023. It is now read-only.

Commit edf331c

Browse files
committed
Updated tests.
Added tests for Context class.
1 parent 30134bd commit edf331c

File tree

5 files changed

+120
-40
lines changed

5 files changed

+120
-40
lines changed

tests/test_04_message.py

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,19 @@
1+
import json
12
from urllib.parse import parse_qs
23
from urllib.parse import urlparse
34

4-
import json
55
import pytest
66
from cryptojwt.exception import HeaderError
7-
87
from cryptojwt.jwk.hmac import SYMKey
98
from cryptojwt.jwk.rsa import new_rsa_key
109
from cryptojwt.jws.exception import NoSuitableSigningKeys
11-
1210
from cryptojwt.key_bundle import KeyBundle
13-
from cryptojwt.key_jar import build_keyjar
1411
from cryptojwt.key_jar import KeyJar
12+
from cryptojwt.key_jar import build_keyjar
1513

1614
from oidcmsg.exception import DecodeError
1715
from oidcmsg.exception import MessageException
1816
from oidcmsg.exception import OidcMsgError
19-
from oidcmsg.exception import WrongEncryptionAlgorithm
20-
21-
from oidcmsg.message import json_deserializer, msg_ser
22-
from oidcmsg.message import json_serializer
2317
from oidcmsg.message import OPTIONAL_LIST_OF_MESSAGES
2418
from oidcmsg.message import OPTIONAL_LIST_OF_STRINGS
2519
from oidcmsg.message import OPTIONAL_MESSAGE
@@ -28,8 +22,10 @@
2822
from oidcmsg.message import SINGLE_OPTIONAL_JSON
2923
from oidcmsg.message import SINGLE_OPTIONAL_STRING
3024
from oidcmsg.message import SINGLE_REQUIRED_STRING
25+
from oidcmsg.message import json_deserializer
26+
from oidcmsg.message import json_serializer
27+
from oidcmsg.message import msg_ser
3128
from oidcmsg.message import sp_sep_list_deserializer
32-
3329
from oidcmsg.oauth2 import Message
3430

3531
__author__ = 'Roland Hedberg'
@@ -41,25 +37,25 @@
4137
{"type": "RSA", "use": ["enc"]},
4238
{"type": "EC", "crv": "P-256", "use": ["sig"]},
4339
{"type": "EC", "crv": "P-256", "use": ["enc"]},
44-
]
40+
]
4541

4642
keym = [
4743
{"type": "RSA", "use": ["sig"]},
4844
{"type": "RSA", "use": ["sig"]},
4945
{"type": "RSA", "use": ["sig"]},
50-
]
46+
]
5147

5248
KEYJAR = build_keyjar(keys)
5349

5450
IKEYJAR = build_keyjar(keys)
55-
IKEYJAR.issuer_keys['issuer'] = IKEYJAR.issuer_keys['']
56-
del IKEYJAR.issuer_keys['']
51+
IKEYJAR.import_jwks(IKEYJAR.export_jwks(private=True), 'issuer')
52+
del IKEYJAR['']
5753

5854
KEYJARS = {}
5955
for iss in ['A', 'B', 'C']:
6056
_kj = build_keyjar(keym)
61-
_kj.issuer_keys[iss] = _kj.issuer_keys['']
62-
del _kj.issuer_keys['']
57+
_kj.import_jwks(_kj.export_jwks(private=True), iss)
58+
del _kj['']
6359
KEYJARS[iss] = _kj
6460

6561

@@ -121,7 +117,7 @@ class DummyMessage(Message):
121117
"opt_str_list": OPTIONAL_LIST_OF_STRINGS,
122118
"req_str_list": REQUIRED_LIST_OF_STRINGS,
123119
"opt_json": SINGLE_OPTIONAL_JSON
124-
}
120+
}
125121

126122

127123
class TestMessage(object):
@@ -296,7 +292,7 @@ def test_int_instead_of_string(self):
296292
@pytest.mark.parametrize("keytype,alg", [
297293
('RSA', 'RS256'),
298294
('EC', 'ES256')
299-
])
295+
])
300296
def test_to_jwt(keytype, alg):
301297
msg = Message(a='foo', b='bar', c='tjoho')
302298
_jwt = msg.to_jwt(KEYJAR.get_signing_key(keytype, ''), alg)
@@ -307,7 +303,7 @@ def test_to_jwt(keytype, alg):
307303
@pytest.mark.parametrize("keytype,alg,enc", [
308304
('RSA', 'RSA1_5', 'A128CBC-HS256'),
309305
('EC', 'ECDH-ES', 'A128GCM'),
310-
])
306+
])
311307
def test_to_jwe(keytype, alg, enc):
312308
msg = Message(a='foo', b='bar', c='tjoho')
313309
_jwe = msg.to_jwe(KEYJAR.get_encrypt_key(keytype, ''), alg=alg, enc=enc)
@@ -334,7 +330,7 @@ class MsgMessage(Message):
334330
c_param = {
335331
"msg": OPTIONAL_MESSAGE,
336332
"opt_str": SINGLE_OPTIONAL_STRING,
337-
}
333+
}
338334

339335
_dict = {
340336
"req_str": "Fair", "req_str_list": ["spike", "lee"],
@@ -359,7 +355,7 @@ class MsgMessage(Message):
359355
c_param = {
360356
"msgs": OPTIONAL_LIST_OF_MESSAGES,
361357
"opt_str": SINGLE_OPTIONAL_STRING,
362-
}
358+
}
363359

364360
_dict = {
365361
"req_str": "Fair", "req_str_list": ["spike", "lee"],
@@ -385,7 +381,7 @@ class MsgMessage(Message):
385381
c_param = {
386382
"msgs": OPTIONAL_LIST_OF_MESSAGES,
387383
"opt_str": SINGLE_OPTIONAL_STRING,
388-
}
384+
}
389385

390386
_dict = {
391387
"req_str": "Fair", "req_str_list": ["spike", "lee"],
@@ -409,7 +405,7 @@ class MsgMessage(Message):
409405
c_param = {
410406
"msgs": OPTIONAL_LIST_OF_MESSAGES,
411407
"opt_str": SINGLE_OPTIONAL_STRING,
412-
}
408+
}
413409

414410
_dict = {
415411
"req_str": "Fair", "req_str_list": ["spike", "lee"],
@@ -446,7 +442,7 @@ def test_json_type_error():
446442
@pytest.mark.parametrize("keytype,alg,enc", [
447443
('RSA', 'RSA1_5', 'A128CBC-HS256'),
448444
('EC', 'ECDH-ES', 'A128GCM'),
449-
])
445+
])
450446
def test_to_jwe(keytype, alg, enc):
451447
msg = Message(a='foo', b='bar', c='tjoho')
452448
_jwe = msg.to_jwe(KEYJAR.get_encrypt_key(keytype, ''), alg=alg,
@@ -469,8 +465,7 @@ def test_no_suitable_keys():
469465
keytype = 'RSA'
470466
alg = 'RS256'
471467
msg = Message(a='foo', b='bar', c='tjoho')
472-
_jwt = msg.to_jwt(NEW_KEYJAR.get_signing_key(keytype, '', kid=NEW_KID),
473-
alg)
468+
_jwt = msg.to_jwt(NEW_KEYJAR.get_signing_key(keytype, '', kid=NEW_KID), alg)
474469
with pytest.raises(NoSuitableSigningKeys):
475470
Message().from_jwt(_jwt, KEYJAR)
476471

@@ -495,9 +490,9 @@ def test_weed():
495490
def test_msg_ser():
496491
assert msg_ser('a.b.c', 'dict') == 'a.b.c'
497492
with pytest.raises(MessageException):
498-
msg_ser([1,2], 'dict')
493+
msg_ser([1, 2], 'dict')
499494
with pytest.raises(OidcMsgError):
500-
msg_ser([1,2], 'list')
495+
msg_ser([1, 2], 'list')
501496

502497

503498
def test_error_description():

tests/test_05_oauth2.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,14 @@
4242

4343
KEYJAR = build_keyjar(keys)
4444
IKEYJAR = build_keyjar(keys)
45-
IKEYJAR.issuer_keys['issuer'] = IKEYJAR.issuer_keys['']
46-
del IKEYJAR.issuer_keys['']
45+
IKEYJAR.import_jwks(IKEYJAR.export_jwks(private=True), 'issuer')
46+
del IKEYJAR['']
4747

4848
KEYJARS = {}
4949
for iss in ['A', 'B', 'C']:
5050
_kj = build_keyjar(keym)
51-
_kj.issuer_keys[iss] = _kj.issuer_keys['']
52-
del _kj.issuer_keys['']
51+
_kj.import_jwks(_kj.export_jwks(private=True) ,iss)
52+
del _kj['']
5353
KEYJARS[iss] = _kj
5454

5555

tests/test_06_oidc.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -767,10 +767,9 @@ def test_ok_idtoken(self):
767767
idts = IdToken(**idval)
768768
keyjar = KeyJar()
769769
keyjar.add_symmetric('', "SomeTestPassword")
770-
keyjar.add_symmetric('https://alpha.cloud.nds.rub.de',
771-
"SomeTestPassword")
772-
_signed_jwt = idts.to_jwt(key=keyjar.get_signing_key('oct'),
773-
algorithm="HS256", lifetime=300)
770+
keyjar.add_symmetric('https://alpha.cloud.nds.rub.de', "SomeTestPassword")
771+
_signed_jwt = idts.to_jwt(key=keyjar.get_signing_key('oct'), algorithm="HS256",
772+
lifetime=300)
774773

775774
_info = {
776775
"access_token": "accessTok", "id_token": _signed_jwt,

tests/test_07_session.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ def full_path(local_file):
4949

5050
CLI_KEY = init_key_jar(public_path=full_path('pub_client.jwks'),
5151
private_path=full_path('priv_client.jwks'),
52-
key_defs=KEYDEF, owner=CLIENT_ID)
52+
key_defs=KEYDEF, issuer_id=CLIENT_ID)
5353

5454
ISS_KEY = init_key_jar(public_path=full_path('pub_iss.jwks'),
5555
private_path=full_path('priv_iss.jwks'),
56-
key_defs=KEYDEF, owner=ISS)
56+
key_defs=KEYDEF, issuer_id=ISS)
5757

5858
ISS_KEY.import_jwks_as_json(open(full_path('pub_client.jwks')).read(), CLIENT_ID)
5959
CLI_KEY.import_jwks_as_json(open(full_path('pub_iss.jwks')).read(),ISS)
@@ -325,7 +325,7 @@ def test_back_channel_logout_request():
325325
}
326326
lt = LogoutToken(**val)
327327
signer = JWS(lt.to_json(), alg='ES256')
328-
_jws = signer.sign_compact(keys=ISS_KEY.get_signing_key(owner=ISS))
328+
_jws = signer.sign_compact(keys=ISS_KEY.get_signing_key(issuer_id=ISS))
329329

330330
bclr = BackChannelLogoutRequest(logout_token=_jws)
331331

@@ -336,8 +336,7 @@ def test_back_channel_logout_request():
336336

337337
assert 'logout_token' in _request
338338

339-
_verified = _request.verify(keyjar=CLI_KEY, iss=ISS,
340-
aud=CLIENT_ID, skew=30)
339+
_verified = _request.verify(keyjar=CLI_KEY, iss=ISS, aud=CLIENT_ID, skew=30)
341340

342341
assert _verified
343342
assert set(_request.keys()) == {'logout_token', '__verified_logout_token'}

tests/test_12_context.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import copy
2+
import shutil
3+
4+
import pytest
5+
6+
from oidcmsg.context import OidcContext
7+
8+
KEYDEF = [
9+
{"type": "EC", "crv": "P-256", "use": ["sig"]},
10+
{"type": "EC", "crv": "P-256", "use": ["enc"]}
11+
]
12+
13+
JWKS = {
14+
"keys": [
15+
{
16+
"n":
17+
'zkpUgEgXICI54blf6iWiD2RbMDCOO1jV0VSff1MFFnujM4othfMsad7H1kRo50YM5S'
18+
'_X9TdvrpdOfpz5aBaKFhT6Ziv0nhtcekq1eRl8mjBlvGKCE5XGk-0LFSDwvqgkJoFY'
19+
'Inq7bu0a4JEzKs5AyJY75YlGh879k1Uu2Sv3ZZOunfV1O1Orta-NvS-aG_jN5cstVb'
20+
'CGWE20H0vFVrJKNx0Zf-u-aA-syM4uX7wdWgQ-owoEMHge0GmGgzso2lwOYf_4znan'
21+
'LwEuO3p5aabEaFoKNR4K6GjQcjBcYmDEE4CtfRU9AEmhcD1kleiTB9TjPWkgDmT9MX'
22+
'sGxBHf3AKT5w',
23+
"e": "AQAB", "kty": "RSA", "kid": "rsa1"
24+
},
25+
{
26+
"k":
27+
'YTEyZjBlMDgxMGI4YWU4Y2JjZDFiYTFlZTBjYzljNDU3YWM0ZWNiNzhmNmFlYTNkNTY0NzMzYjE',
28+
"kty": "oct"
29+
},
30+
]
31+
}
32+
33+
34+
def test_context():
35+
c = OidcContext({})
36+
assert c.keyjar is not None
37+
38+
39+
class TestContext(object):
40+
@pytest.fixture(autouse=True)
41+
def setup(self):
42+
try:
43+
shutil.rmtree('db')
44+
except FileNotFoundError:
45+
pass
46+
47+
self.conf = {
48+
'issuer': 'https://example.com',
49+
'db_conf': {
50+
'abstract_storage_cls': 'abstorage.extension.LabeledAbstractStorage',
51+
'keyjar': {
52+
'handler': 'abstorage.storages.abfile.AbstractFileSystem',
53+
'fdir': 'db/keyjar',
54+
'key_conv': 'abstorage.converter.QPKey',
55+
'value_conv': 'cryptojwt.serialize.item.KeyIssuer',
56+
'label': 'keyjar'
57+
},
58+
'default': {
59+
'handler': 'abstorage.storages.abfile.AbstractFileSystem',
60+
'fdir': 'db',
61+
'key_conv': 'abstorage.converter.QPKey',
62+
'value_conv': 'abstorage.converter.JSON'
63+
}
64+
}
65+
}
66+
67+
def test_context_with_entity_id_no_keys(self):
68+
c = OidcContext(self.conf, entity_id='https://example.com')
69+
assert c.keyjar.owners() == []
70+
71+
def test_context_with_entity_id_and_keys(self):
72+
conf = copy.deepcopy(self.conf)
73+
conf['keys'] = {'key_defs': KEYDEF}
74+
75+
c = OidcContext(conf, entity_id='https://example.com')
76+
assert set(c.keyjar.owners()) == {'', 'https://example.com'}
77+
78+
def test_context_with_entity_id_and_jwks(self):
79+
conf = copy.deepcopy(self.conf)
80+
conf['jwks'] = JWKS
81+
82+
c = OidcContext(conf, entity_id='https://example.com')
83+
assert set(c.keyjar.owners()) == {'', 'https://example.com'}
84+
assert len(c.keyjar.get('sig', 'RSA')) == 1
85+
assert len(c.keyjar.get('sig', 'RSA', issuer_id='https://example.com')) == 1
86+
assert len(c.keyjar.get('sig', 'oct')) == 1
87+
assert len(c.keyjar.get('sig', 'oct', issuer_id='https://example.com')) == 1

0 commit comments

Comments
 (0)