27
27
from .encoding import to_bytes
28
28
from .solve import (
29
29
solve , Fail , OracleFunc , ResultType ,
30
- convert_to_bytes , remove_padding )
30
+ convert_to_bytes , remove_padding , add_padding )
31
31
32
32
__all__ = [
33
33
'padding_oracle' ,
@@ -41,6 +41,7 @@ def padding_oracle(payload: Union[bytes, str],
41
41
null_byte : bytes = b' ' ,
42
42
return_raw : bool = False ,
43
43
mode : Union [bool , str ] = 'decrypt' ,
44
+ pad_payload : bool = True
44
45
) -> Union [bytes , List [int ]]:
45
46
'''
46
47
Run padding oracle attack to decrypt ciphertext given a function to check
@@ -58,7 +59,9 @@ def padding_oracle(payload: Union[bytes, str],
58
59
set (default: None)
59
60
return_raw (bool) do not convert plaintext into bytes and
60
61
unpad (default: False)
61
- mode (bool|str) encrypt the payload (defaut: False/'decrypt')
62
+ mode (str) encrypt the payload (defaut: 'decrypt')
63
+ pad_payload (bool) PKCS#7 pad the supplied payload before
64
+ encryption (default: True)
62
65
63
66
64
67
Returns:
@@ -72,19 +75,18 @@ def padding_oracle(payload: Union[bytes, str],
72
75
raise TypeError ('payload should have type bytes' )
73
76
if not isinstance (block_size , int ):
74
77
raise TypeError ('block_size should have type int' )
75
- if not len (payload ) % block_size == 0 :
76
- raise ValueError ('payload length should be multiple of block size' )
77
78
if not 1 <= num_threads <= 1000 :
78
79
raise ValueError ('num_threads should be in [1, 1000]' )
79
80
if not isinstance (null_byte , (bytes , str )):
80
81
raise TypeError ('expect null with type bytes or str' )
81
82
if not len (null_byte ) == 1 :
82
83
raise ValueError ('null byte should have length of 1' )
83
- if not isinstance (mode , ( bool , str ) ):
84
- raise TypeError ('expect mode with type bool or str' )
84
+ if not isinstance (mode , str ):
85
+ raise TypeError ('expect mode with type str' )
85
86
if isinstance (mode , str ) and mode not in ('encrypt' , 'decrypt' ):
86
87
raise ValueError ('mode must be either encrypt or decrypt' )
87
-
88
+ if (mode == 'decrypt' ) and not (len (payload ) % block_size == 0 ):
89
+ raise ValueError ('for decryption payload length should be multiple of block size' )
88
90
logger = get_logger ()
89
91
logger .setLevel (log_level )
90
92
@@ -93,8 +95,8 @@ def padding_oracle(payload: Union[bytes, str],
93
95
94
96
95
97
# Does the user want the encryption routine
96
- if (mode == 'encrypt' ) or ( mode == True ) :
97
- return encrypt (payload , block_size , oracle , num_threads , null_byte , logger )
98
+ if (mode == 'encrypt' ):
99
+ return encrypt (payload , block_size , oracle , num_threads , null_byte , pad_payload , logger )
98
100
99
101
# If not continue with decryption as normal
100
102
return decrypt (payload , block_size , oracle , num_threads , null_byte , return_raw , logger )
@@ -126,11 +128,11 @@ def plaintext_callback(plaintext: bytes):
126
128
if not return_raw :
127
129
plaintext = convert_to_bytes (plaintext , null_byte )
128
130
plaintext = remove_padding (plaintext )
129
-
131
+
130
132
return plaintext
131
133
132
134
133
- def encrypt (payload , block_size , oracle , num_threads , null_byte , logger ):
135
+ def encrypt (payload , block_size , oracle , num_threads , null_byte , pad_payload , logger ):
134
136
# Wrapper to handle exceptions from the oracle function
135
137
def wrapped_oracle (ciphertext : bytes ):
136
138
try :
@@ -148,24 +150,40 @@ def result_callback(result: ResultType):
148
150
logger .error (result .message )
149
151
150
152
def plaintext_callback (plaintext : bytes ):
151
- plaintext = convert_to_bytes (plaintext , null_byte )
152
- logger .info (f'plaintext: { plaintext } ' )
153
+ plaintext = convert_to_bytes (plaintext , null_byte ).strip (null_byte )
154
+ bytes_done = str (len (plaintext )).rjust (len (str (block_size )), ' ' )
155
+ blocks_done = solve_index .rjust (len (block_total ), ' ' )
156
+ printout = "{0}/{1} bytes encrypted in block {2}/{3}" .format (bytes_done , block_size , blocks_done , block_total )
157
+ logger .info (printout )
153
158
154
159
def blocks (data : bytes ):
155
160
return [data [index :(index + block_size )] for index in range (0 , len (data ), block_size )]
156
161
157
162
def bytes_xor (byte_string_1 : bytes , byte_string_2 : bytes ):
158
163
return bytes ([_a ^ _b for _a , _b in zip (byte_string_1 , byte_string_2 )])
159
164
165
+ if pad_payload :
166
+ payload = add_padding (payload , block_size )
167
+
168
+ if len (payload ) % block_size != 0 :
169
+ raise ValueError ('''For encryption payload length must be a multiple of blocksize. Perhaps you meant to
170
+ pad the payload (inbuilt PKCS#7 padding can be enabled by setting pad_payload=True)''' )
171
+
160
172
plaintext_blocks = blocks (payload )
161
173
ciphertext_blocks = [null_byte * block_size for _ in range (len (plaintext_blocks )+ 1 )]
162
174
175
+ solve_index = '1'
176
+ block_total = str (len (plaintext_blocks ))
177
+
163
178
for index in range (len (plaintext_blocks )- 1 , - 1 , - 1 ):
164
179
plaintext = solve (b'\x00 ' * block_size + ciphertext_blocks [index + 1 ], block_size , wrapped_oracle ,
165
180
num_threads , result_callback , plaintext_callback )
166
181
ciphertext_blocks [index ] = bytes_xor (plaintext_blocks [index ], plaintext )
182
+ solve_index = str (int (solve_index )+ 1 )
167
183
168
184
ciphertext = b'' .join (ciphertext_blocks )
185
+ logger .info (f"forged ciphertext: { ciphertext } " )
186
+
169
187
return ciphertext
170
188
171
189
def get_logger ():
0 commit comments