2
2
from base64 import b64decode
3
3
from base64 import b64encode
4
4
5
- from Cryptodome import Random
6
- from Cryptodome .Cipher import AES
5
+ from cryptography .hazmat .backends import default_backend
6
+ from cryptography .hazmat .primitives .ciphers import Cipher
7
+ from cryptography .hazmat .primitives .ciphers import algorithms
8
+ from cryptography .hazmat .primitives .ciphers import modes
7
9
8
10
9
11
POSTFIX_MODE = {
10
- 'cbc' : AES . MODE_CBC ,
11
- 'cfb' : AES . MODE_CFB ,
12
- 'ecb' : AES . MODE_CFB ,
12
+ 'cbc' : modes . CBC ,
13
+ 'cfb' : modes . CFB ,
14
+ 'ecb' : modes . ECB ,
13
15
}
14
16
15
- BLOCK_SIZE = 16
17
+ AES_BLOCK_SIZE = int ( algorithms . AES . block_size / 8 )
16
18
17
19
18
20
class AESCipher (object ):
@@ -31,29 +33,38 @@ def build_cipher(self, iv=None, alg='aes_128_cbc'):
31
33
:param alg: cipher algorithm
32
34
:return: A Cipher instance
33
35
"""
34
- typ , bits , cmode = alg .split ('_' )
36
+ typ , bits , cmode = alg .lower ().split ('_' )
37
+ bits = int (bits )
35
38
36
39
if not iv :
37
- iv = self .iv if self .iv else Random .new ().read (AES .block_size )
40
+ if self .iv :
41
+ iv = self .iv
42
+ else :
43
+ iv = os .urandom (AES_BLOCK_SIZE )
38
44
39
- if len (iv ) != AES . block_size :
40
- raise Exception ('Wrong iv size' )
45
+ if len (iv ) != AES_BLOCK_SIZE :
46
+ raise Exception ('Wrong iv size: {}' . format ( len ( iv )) )
41
47
42
- if bits not in [ '128' , '192' , '256' ] :
43
- raise Exception ('Unsupported key length' )
48
+ if bits not in algorithms . AES . key_sizes :
49
+ raise Exception ('Unsupported key length: {}' . format ( bits ) )
44
50
45
- if len (self .key ) != int ( bits ) >> 3 :
46
- raise Exception ('Wrong Key length' )
51
+ if len (self .key ) != bits / 8 :
52
+ raise Exception ('Wrong Key length: {}' . format ( len ( self . key )) )
47
53
48
54
try :
49
- result = AES . new ( self . key , POSTFIX_MODE [cmode ], iv )
55
+ mode = POSTFIX_MODE [cmode ]
50
56
except KeyError :
51
- raise Exception ('Unsupported chaining mode' )
52
- else :
53
- return result , iv
57
+ raise Exception ('Unsupported chaining mode: {}' .format (cmode ))
58
+
59
+ cipher = Cipher (
60
+ algorithms .AES (self .key ),
61
+ mode (iv ),
62
+ backend = default_backend ())
63
+
64
+ return cipher , iv
54
65
55
66
def encrypt (self , msg , iv = None , alg = 'aes_128_cbc' , padding = 'PKCS#7' ,
56
- b64enc = True , block_size = BLOCK_SIZE ):
67
+ b64enc = True , block_size = AES_BLOCK_SIZE ):
57
68
"""
58
69
:param key: The encryption key
59
70
:param iv: init vector
@@ -73,11 +84,12 @@ def encrypt(self, msg, iv=None, alg='aes_128_cbc', padding='PKCS#7',
73
84
74
85
if _block_size :
75
86
plen = _block_size - (len (msg ) % _block_size )
76
- c = chr (plen )
87
+ c = chr (plen ). encode ()
77
88
msg += c * plen
78
89
79
90
cipher , iv = self .build_cipher (iv , alg )
80
- cmsg = iv + cipher .encrypt (msg )
91
+ encryptor = cipher .encryptor ()
92
+ cmsg = iv + encryptor .update (msg ) + encryptor .finalize ()
81
93
82
94
if b64enc :
83
95
enc_msg = b64encode (cmsg )
@@ -96,26 +108,38 @@ def decrypt(self, msg, iv=None, alg='aes_128_cbc', padding='PKCS#7',
96
108
"""
97
109
data = b64decode (msg ) if b64dec else msg
98
110
99
- _iv = data [:AES . block_size ]
111
+ _iv = data [:AES_BLOCK_SIZE ]
100
112
if iv :
101
113
assert iv == _iv
102
114
cipher , iv = self .build_cipher (iv , alg = alg )
103
- res = cipher .decrypt (data )[AES .block_size :]
115
+ decryptor = cipher .decryptor ()
116
+ res = decryptor .update (data )[AES_BLOCK_SIZE :] + decryptor .finalize ()
104
117
if padding in ['PKCS#5' , 'PKCS#7' ]:
105
- res = res [:- ord (res [- 1 ])]
118
+ idx = bytearray (res )[- 1 ]
119
+ res = res [:- idx ]
106
120
return res
107
121
108
122
109
- if __name__ == '__main__' :
110
- key_ = '1234523451234545' # 16 byte key
123
+ def run_test ():
124
+ key = b'1234523451234545' # 16 byte key
125
+ iv = os .urandom (AES_BLOCK_SIZE )
111
126
# Iff padded, the message doesn't have to be multiple of 16 in length
112
- msg_ = 'ToBeOrNotTobe W.S.'
113
- aes = AESCipher (key_ )
114
- iv_ = os .urandom (16 )
115
- encrypted_msg = aes .encrypt (key_ , msg_ , iv_ )
116
- txt = aes .decrypt (key_ , encrypted_msg , iv_ )
117
- assert txt == msg_
118
-
119
- encrypted_msg = aes .encrypt (key_ , msg_ , 0 )
120
- txt = aes .decrypt (key_ , encrypted_msg , 0 )
121
- assert txt == msg_
127
+ original_msg = b'ToBeOrNotTobe W.S.'
128
+ aes = AESCipher (key )
129
+
130
+ encrypted_msg = aes .encrypt (original_msg , iv )
131
+ decrypted_msg = aes .decrypt (encrypted_msg , iv )
132
+ assert decrypted_msg == original_msg
133
+
134
+ encrypted_msg = aes .encrypt (original_msg )
135
+ decrypted_msg = aes .decrypt (encrypted_msg )
136
+ assert decrypted_msg == original_msg
137
+
138
+ aes = AESCipher (key , iv )
139
+ encrypted_msg = aes .encrypt (original_msg )
140
+ decrypted_msg = aes .decrypt (encrypted_msg )
141
+ assert decrypted_msg == original_msg
142
+
143
+
144
+ if __name__ == '__main__' :
145
+ run_test ()
0 commit comments