Skip to content

Commit cd48afe

Browse files
Merge pull request #515 from c00kiemon5ter/fix-example-dependencies
Use cryptography in place of Cryptodome for aes
2 parents f6b48bf + d8ad5f7 commit cd48afe

File tree

2 files changed

+79
-57
lines changed

2 files changed

+79
-57
lines changed

src/saml2/aes.py

Lines changed: 74 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,70 @@
1-
#!/usr/bin/env python
21
import os
3-
from base64 import b64encode
42
from base64 import b64decode
3+
from base64 import b64encode
54

6-
from Cryptodome import Random
7-
from Cryptodome.Cipher import AES
5+
from cryptography.hazmat.backends import default_backend
6+
from cryptography.hazmat.primitives.ciphers import Cipher
7+
from cryptography.hazmat.primitives.ciphers import algorithms
8+
from cryptography.hazmat.primitives.ciphers import modes
89

9-
__author__ = 'rolandh'
1010

1111
POSTFIX_MODE = {
12-
"cbc": AES.MODE_CBC,
13-
"cfb": AES.MODE_CFB,
14-
"ecb": AES.MODE_CFB,
12+
'cbc': modes.CBC,
13+
'cfb': modes.CFB,
14+
'ecb': modes.ECB,
1515
}
1616

17-
BLOCK_SIZE = 16
17+
AES_BLOCK_SIZE = int(algorithms.AES.block_size / 8)
1818

1919

2020
class AESCipher(object):
21-
def __init__(self, key, iv=""):
21+
def __init__(self, key, iv=None):
2222
"""
23-
2423
:param key: The encryption key
2524
:param iv: Init vector
2625
:return: AESCipher instance
2726
"""
2827
self.key = key
2928
self.iv = iv
3029

31-
def build_cipher(self, iv="", alg="aes_128_cbc"):
30+
def build_cipher(self, iv=None, alg='aes_128_cbc'):
3231
"""
3332
:param iv: init vector
3433
:param alg: cipher algorithm
3534
:return: A Cipher instance
3635
"""
37-
typ, bits, cmode = alg.split("_")
36+
typ, bits, cmode = alg.lower().split('_')
37+
bits = int(bits)
3838

3939
if not iv:
4040
if self.iv:
4141
iv = self.iv
4242
else:
43-
iv = Random.new().read(AES.block_size)
44-
else:
45-
assert len(iv) == AES.block_size
43+
iv = os.urandom(AES_BLOCK_SIZE)
4644

47-
if bits not in ["128", "192", "256"]:
48-
raise Exception("Unsupported key length")
49-
try:
50-
assert len(self.key) == int(bits) >> 3
51-
except AssertionError:
52-
raise Exception("Wrong Key length")
45+
if len(iv) != AES_BLOCK_SIZE:
46+
raise Exception('Wrong iv size: {}'.format(len(iv)))
47+
48+
if bits not in algorithms.AES.key_sizes:
49+
raise Exception('Unsupported key length: {}'.format(bits))
50+
51+
if len(self.key) != bits / 8:
52+
raise Exception('Wrong Key length: {}'.format(len(self.key)))
5353

5454
try:
55-
return AES.new(self.key, POSTFIX_MODE[cmode], iv), iv
55+
mode = POSTFIX_MODE[cmode]
5656
except KeyError:
57-
raise Exception("Unsupported chaining mode")
57+
raise Exception('Unsupported chaining mode: {}'.format(cmode))
58+
59+
cipher = Cipher(
60+
algorithms.AES(self.key),
61+
mode(iv),
62+
backend=default_backend())
5863

64+
return cipher, iv
5965

60-
def encrypt(self, msg, iv=None, alg="aes_128_cbc", padding="PKCS#7",
61-
b64enc=True, block_size=BLOCK_SIZE):
66+
def encrypt(self, msg, iv=None, alg='aes_128_cbc', padding='PKCS#7',
67+
b64enc=True, block_size=AES_BLOCK_SIZE):
6268
"""
6369
:param key: The encryption key
6470
:param iv: init vector
@@ -69,57 +75,71 @@ def encrypt(self, msg, iv=None, alg="aes_128_cbc", padding="PKCS#7",
6975
:return: The encrypted message
7076
"""
7177

72-
if padding == "PKCS#7":
78+
if padding == 'PKCS#7':
7379
_block_size = block_size
74-
elif padding == "PKCS#5":
80+
elif padding == 'PKCS#5':
7581
_block_size = 8
7682
else:
7783
_block_size = 0
7884

7985
if _block_size:
8086
plen = _block_size - (len(msg) % _block_size)
81-
c = chr(plen)
82-
msg += c*plen
87+
c = chr(plen).encode()
88+
msg += c * plen
8389

8490
cipher, iv = self.build_cipher(iv, alg)
85-
cmsg = iv + cipher.encrypt(msg)
91+
encryptor = cipher.encryptor()
92+
cmsg = iv + encryptor.update(msg) + encryptor.finalize()
93+
8694
if b64enc:
87-
return b64encode(cmsg)
95+
enc_msg = b64encode(cmsg)
8896
else:
89-
return cmsg
97+
enc_msg = cmsg
9098

99+
return enc_msg
91100

92-
def decrypt(self, msg, iv=None, alg="aes_128_cbc", padding="PKCS#7", b64dec=True):
101+
def decrypt(self, msg, iv=None, alg='aes_128_cbc', padding='PKCS#7',
102+
b64dec=True):
93103
"""
94104
:param key: The encryption key
95105
:param iv: init vector
96106
:param msg: Base64 encoded message to be decrypted
97107
:return: The decrypted message
98108
"""
99-
if b64dec:
100-
data = b64decode(msg)
101-
else:
102-
data = msg
109+
data = b64decode(msg) if b64dec else msg
103110

104-
_iv = data[:AES.block_size]
111+
_iv = data[:AES_BLOCK_SIZE]
105112
if iv:
106113
assert iv == _iv
107114
cipher, iv = self.build_cipher(iv, alg=alg)
108-
res = cipher.decrypt(data)[AES.block_size:]
109-
if padding in ["PKCS#5", "PKCS#7"]:
110-
res = res[:-ord(res[-1])]
115+
decryptor = cipher.decryptor()
116+
res = decryptor.update(data)[AES_BLOCK_SIZE:] + decryptor.finalize()
117+
if padding in ['PKCS#5', 'PKCS#7']:
118+
idx = bytearray(res)[-1]
119+
res = res[:-idx]
111120
return res
112121

113-
if __name__ == "__main__":
114-
key_ = "1234523451234545" # 16 byte key
122+
123+
def run_test():
124+
key = b'1234523451234545' # 16 byte key
125+
iv = os.urandom(AES_BLOCK_SIZE)
115126
# Iff padded, the message doesn't have to be multiple of 16 in length
116-
msg_ = "ToBeOrNotTobe W.S."
117-
aes = AESCipher(key_)
118-
iv_ = os.urandom(16)
119-
encrypted_msg = aes.encrypt(key_, msg_, iv_)
120-
txt = aes.decrypt(key_, encrypted_msg, iv_)
121-
assert txt == msg_
122-
123-
encrypted_msg = aes.encrypt(key_, msg_, 0)
124-
txt = aes.decrypt(key_, encrypted_msg, 0)
125-
assert txt == msg_
127+
original_msg = b'ToBeOrNotTobe W.S.'
128+
aes = AESCipher(key)
129+
130+
encrypted_msg = aes.encrypt(original_msg, iv)
131+
decrypted_msg = aes.decrypt(encrypted_msg, iv)
132+
assert decrypted_msg == original_msg
133+
134+
encrypted_msg = aes.encrypt(original_msg)
135+
decrypted_msg = aes.decrypt(encrypted_msg)
136+
assert decrypted_msg == original_msg
137+
138+
aes = AESCipher(key, iv)
139+
encrypted_msg = aes.encrypt(original_msg)
140+
decrypted_msg = aes.decrypt(encrypted_msg)
141+
assert decrypted_msg == original_msg
142+
143+
144+
if __name__ == '__main__':
145+
run_test()

src/saml2/authn.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ def __init__(self, srv, mako_template, template_lookup, pwd, return_to):
120120
self.return_to = return_to
121121
self.active = {}
122122
self.query_param = "upm_answer"
123-
self.aes = AESCipher(self.srv.symkey, srv.iv)
123+
self.aes = AESCipher(self.srv.symkey.encode(), srv.iv)
124124

125125
def __call__(self, cookie=None, policy_url=None, logo_url=None,
126126
query="", **kwargs):
@@ -171,7 +171,8 @@ def verify(self, request, **kwargs):
171171
try:
172172
self._verify(_dict["password"][0], _dict["login"][0])
173173
timestamp = str(int(time.mktime(time.gmtime())))
174-
info = self.aes.encrypt("::".join([_dict["login"][0], timestamp]))
174+
msg = "::".join([_dict["login"][0], timestamp])
175+
info = self.aes.encrypt(msg.encode())
175176
self.active[info] = timestamp
176177
cookie = make_cookie(self.cookie_name, info, self.srv.seed)
177178
return_to = create_return_url(self.return_to, _dict["query"][0],
@@ -191,7 +192,8 @@ def authenticated_as(self, cookie=None, **kwargs):
191192
info, timestamp = parse_cookie(self.cookie_name,
192193
self.srv.seed, cookie)
193194
if self.active[info] == timestamp:
194-
uid, _ts = self.aes.decrypt(info).split("::")
195+
msg = self.aes.decrypt(info).decode()
196+
uid, _ts = msg.split("::")
195197
if timestamp == _ts:
196198
return {"uid": uid}
197199
except Exception:

0 commit comments

Comments
 (0)