@@ -65,7 +65,9 @@ class JWT(object):
65
65
def __init__ (self , key_jar = None , iss = '' , lifetime = 0 ,
66
66
sign = True , sign_alg = 'RS256' , encrypt = False ,
67
67
enc_enc = "A128CBC-HS256" , enc_alg = "RSA1_5" , msg_cls = None ,
68
- iss2msg_cls = None , skew = 15 ):
68
+ iss2msg_cls = None , skew = 15 ,
69
+ allowed_sign_algs = None , allowed_enc_algs = None ,
70
+ allowed_enc_encs = None ):
69
71
self .key_jar = key_jar # KeyJar instance
70
72
self .iss = iss # My identifier
71
73
self .lifetime = lifetime # default life time of the signature
@@ -80,6 +82,10 @@ def __init__(self, key_jar=None, iss='', lifetime=0,
80
82
self .iss2msg_cls = iss2msg_cls or {}
81
83
# Allowed time skew
82
84
self .skew = skew
85
+ # When verifying/decrypting
86
+ self .allowed_sign_algs = allowed_sign_algs
87
+ self .allowed_enc_algs = allowed_enc_algs
88
+ self .allowed_enc_encs = allowed_enc_encs
83
89
84
90
def receiver_keys (self , recv , use ):
85
91
return self .key_jar .get (use , owner = recv )
@@ -271,18 +277,17 @@ def unpack(self, token):
271
277
if not token :
272
278
raise KeyError
273
279
274
- _content_type = 'jwt'
275
280
_jwe_header = _jws_header = None
276
281
277
282
# Check if it's an encrypted JWT
278
283
_decryptor = jwe_factory (token )
279
284
if _decryptor :
280
285
# check headers
281
286
darg = {}
282
- if self .enc_enc :
283
- darg ['enc' ] = self .enc_enc
284
- if self .enc_alg :
285
- darg ['alg' ] = self .enc_alg
287
+ if self .allowed_enc_encs :
288
+ darg ['enc' ] = self .allowed_enc_encs
289
+ if self .allowed_enc_algs :
290
+ darg ['alg' ] = self .allowed_enc_algs
286
291
287
292
if _decryptor .jwt .verify_headers (** darg ) is False :
288
293
raise HeaderError ('Wrong alg or enc' )
@@ -294,19 +299,22 @@ def unpack(self, token):
294
299
try :
295
300
_content_type = _decryptor .jwt .headers ['cty' ]
296
301
except KeyError :
297
- pass
302
+ _content_type = ''
298
303
else :
304
+ _content_type = 'jwt'
299
305
_info = token
300
306
301
307
# If I have reason to believe the information I have is a signed JWT
302
308
if _content_type .lower () == 'jwt' :
303
309
# Check that is a signed JWT
304
310
_verifier = jws_factory (_info )
305
311
if _verifier :
306
- if self .alg and not _verifier .jwt .verify_headers (alg = self .alg ):
312
+ if self .allowed_sign_algs and not _verifier .jwt .verify_headers (
313
+ alg = self .allowed_sign_algs ):
307
314
raise HeaderError (
308
315
'Wrong signing algorithm: "{}" expected "{}"' .format (
309
- _verifier .jwt .headers ['alg' ], self .alg ))
316
+ _verifier .jwt .headers ['alg' ],
317
+ self .allowed_sign_algs ))
310
318
_info = self ._verify (_verifier , _info )
311
319
else :
312
320
raise Exception ()
0 commit comments