-
-
Couldn't load subscription status.
- Fork 33.2k
gh-115952: Fix potential virtual memory allocation denial of service in the pickle module #119204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
822230d
88f1461
048099b
d9d1d1d
6f6f765
d0e667e
3462d0e
becbd25
b257974
1e487ca
184984d
f0c0728
1f4e2f1
c72d095
e89bfea
a80106c
01bc6b9
20aa1bf
ab58869
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -74,6 +74,15 @@ def count_opcode(code, pickle): | |
| def identity(x): | ||
| return x | ||
|
|
||
| def itersize(start, stop): | ||
| # Produce geometrical increasing sequence from start to stop | ||
| # (inclusively) for tests. | ||
| size = start | ||
| while size < stop: | ||
| yield size | ||
| size <<= 1 | ||
| yield stop | ||
|
|
||
|
|
||
| class UnseekableIO(io.BytesIO): | ||
| def peek(self, *args): | ||
|
|
@@ -853,9 +862,8 @@ def assert_is_copy(self, obj, objcopy, msg=None): | |
| self.assertEqual(getattr(obj, slot, None), | ||
| getattr(objcopy, slot, None), msg=msg) | ||
|
|
||
| def check_unpickling_error(self, errors, data): | ||
| with self.subTest(data=data), \ | ||
| self.assertRaises(errors): | ||
| def check_unpickling_error_strict(self, errors, data): | ||
| with self.assertRaises(errors): | ||
| try: | ||
| self.loads(data) | ||
| except BaseException as exc: | ||
|
|
@@ -864,6 +872,10 @@ def check_unpickling_error(self, errors, data): | |
| (data, exc.__class__.__name__, exc)) | ||
| raise | ||
|
|
||
| def check_unpickling_error(self, errors, data): | ||
| with self.subTest(data=data): | ||
| self.check_unpickling_error_strict(errors, data) | ||
|
|
||
| def test_load_from_data0(self): | ||
| self.assert_is_copy(self._testdata, self.loads(DATA0)) | ||
|
|
||
|
|
@@ -1135,6 +1147,141 @@ def test_negative_32b_binput(self): | |
| dumped = b'\x80\x03X\x01\x00\x00\x00ar\xff\xff\xff\xff.' | ||
| self.check_unpickling_error(ValueError, dumped) | ||
|
|
||
| def test_too_large_put(self): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a comment explaining why this and the next test method result in ([], []) being returned no matter what rather than an error when the values are too large? (I suspect readers with a knowledge of the specific pickle protocol may understand, but it isn't obvious otherwise) |
||
| # Test that PUT with large id does not cause allocation of | ||
| # too large memo table. | ||
| data = lambda n: (b'((lp' + str(n).encode() + b'\n' + | ||
| b'g' + str(n).encode() + b'\nt.') | ||
| # 0: ( MARK | ||
| # 1: ( MARK | ||
| # 2: l LIST (MARK at 1) | ||
| # 3: p PUT 1000000000000 | ||
| # 18: g GET 1000000000000 | ||
| # 33: t TUPLE (MARK at 0) | ||
| # 34: . STOP | ||
| for idx in [10**6, 10**9, 10**12]: | ||
| if idx > sys.maxsize: | ||
| continue | ||
| self.assertEqual(self.loads(data(idx)), ([],)*2) | ||
|
|
||
| def test_too_large_long_binput(self): | ||
| # Test that LONG_BINPUT with large id does not cause allocation of | ||
| # too large memo table. | ||
| data = lambda n: (b'(]r' + struct.pack('<I', n) + | ||
| b'j' + struct.pack('<I', n) + b't.') | ||
| # 0: ( MARK | ||
| # 1: ] EMPTY_LIST | ||
| # 2: r LONG_BINPUT 4294967295 | ||
| # 7: j LONG_BINGET 4294967295 | ||
| # 12: t TUPLE (MARK at 0) | ||
| # 13: . STOP | ||
| for idx in itersize(1 << 20, min(sys.maxsize, (1 << 32) - 1)): | ||
| self.assertEqual(self.loads(data(idx)), ([],)*2) | ||
|
|
||
| def _test_truncated_data(self, dumped, expected_error=None): | ||
| # Test that instructions to read large data without providing | ||
| # such amount of data do not cause large memory usage. | ||
| if expected_error is None: | ||
| expected_error = self.truncated_data_error | ||
| # BytesIO | ||
| with self.assertRaisesRegex(*expected_error): | ||
| self.loads(dumped) | ||
| if hasattr(self, 'unpickler'): | ||
| try: | ||
| with open(TESTFN, 'wb') as f: | ||
| f.write(dumped) | ||
| # buffered file | ||
| with open(TESTFN, 'rb') as f: | ||
| u = self.unpickler(f) | ||
| with self.assertRaisesRegex(*expected_error): | ||
| u.load() | ||
| # unbuffered file | ||
| with open(TESTFN, 'rb', buffering=0) as f: | ||
| u = self.unpickler(f) | ||
| with self.assertRaisesRegex(*expected_error): | ||
| u.load() | ||
| finally: | ||
| os_helper.unlink(TESTFN) | ||
|
|
||
| def test_truncated_large_binstring(self): | ||
| data = lambda size: b'T' + struct.pack('<I', size) + b'.' * 5 | ||
| # 0: T BINSTRING '....' | ||
| # 9: . STOP | ||
| self.assertEqual(self.loads(data(4)), '....') # self-testing | ||
| for size in itersize(1 << 10, min(sys.maxsize - 5, (1 << 31) - 1)): | ||
| self._test_truncated_data(data(size)) | ||
| self._test_truncated_data(data(1 << 31), | ||
| (pickle.UnpicklingError, 'truncated|exceeds|negative byte count')) | ||
|
|
||
| def test_truncated_large_binunicode(self): | ||
| data = lambda size: b'X' + struct.pack('<I', size) + b'.' * 5 | ||
| # 0: X BINUNICODE '....' | ||
| # 9: . STOP | ||
| self.assertEqual(self.loads(data(4)), '....') # self-testing | ||
| for size in itersize(1 << 10, min(sys.maxsize - 5, (1 << 32) - 1)): | ||
| self._test_truncated_data(data(size)) | ||
|
|
||
| def test_truncated_large_binbytes(self): | ||
| data = lambda size: b'B' + struct.pack('<I', size) + b'.' * 5 | ||
| # 0: B BINBYTES b'....' | ||
| # 9: . STOP | ||
| self.assertEqual(self.loads(data(4)), b'....') # self-testing | ||
| for size in itersize(1 << 10, min(sys.maxsize, 1 << 31)): | ||
| self._test_truncated_data(data(size)) | ||
|
|
||
| def test_truncated_large_long4(self): | ||
| data = lambda size: b'\x8b' + struct.pack('<I', size) + b'.' * 5 | ||
| # 0: \x8b LONG4 0x2e2e2e2e | ||
| # 9: . STOP | ||
| self.assertEqual(self.loads(data(4)), 0x2e2e2e2e) # self-testing | ||
| for size in itersize(1 << 10, min(sys.maxsize - 5, (1 << 31) - 1)): | ||
| self._test_truncated_data(data(size)) | ||
| self._test_truncated_data(data(1 << 31), | ||
| (pickle.UnpicklingError, 'LONG pickle has negative byte count')) | ||
|
|
||
| def test_truncated_large_frame(self): | ||
| data = lambda size: b'\x95' + struct.pack('<Q', size) + b'N.' | ||
| # 0: \x95 FRAME 2 | ||
| # 9: N NONE | ||
| # 10: . STOP | ||
| self.assertIsNone(self.loads(data(2))) # self-testing | ||
| for size in itersize(1 << 10, sys.maxsize - 9): | ||
| self._test_truncated_data(data(size)) | ||
| if sys.maxsize + 1 < 1 << 64: | ||
| self._test_truncated_data(data(sys.maxsize + 1), | ||
| ((OverflowError, ValueError), | ||
| 'FRAME length exceeds|frame size > sys.maxsize')) | ||
|
|
||
| def test_truncated_large_binunicode8(self): | ||
| data = lambda size: b'\x8d' + struct.pack('<Q', size) + b'.' * 5 | ||
| # 0: \x8d BINUNICODE8 '....' | ||
| # 13: . STOP | ||
| self.assertEqual(self.loads(data(4)), '....') # self-testing | ||
| for size in itersize(1 << 10, sys.maxsize - 9): | ||
| self._test_truncated_data(data(size)) | ||
| if sys.maxsize + 1 < 1 << 64: | ||
| self._test_truncated_data(data(sys.maxsize + 1), self.size_overflow_error) | ||
|
|
||
| def test_truncated_large_binbytes8(self): | ||
| data = lambda size: b'\x8e' + struct.pack('<Q', size) + b'.' * 5 | ||
| # 0: \x8e BINBYTES8 b'....' | ||
| # 13: . STOP | ||
| self.assertEqual(self.loads(data(4)), b'....') # self-testing | ||
| for size in itersize(1 << 10, sys.maxsize): | ||
| self._test_truncated_data(data(size)) | ||
| if sys.maxsize + 1 < 1 << 64: | ||
| self._test_truncated_data(data(sys.maxsize + 1), self.size_overflow_error) | ||
|
|
||
| def test_truncated_large_bytearray8(self): | ||
| data = lambda size: b'\x96' + struct.pack('<Q', size) + b'.' * 5 | ||
| # 0: \x96 BYTEARRAY8 bytearray(b'....') | ||
| # 13: . STOP | ||
| self.assertEqual(self.loads(data(4)), bytearray(b'....')) # self-testing | ||
| for size in itersize(1 << 10, sys.maxsize): | ||
| self._test_truncated_data(data(size)) | ||
| if sys.maxsize + 1 < 1 << 64: | ||
| self._test_truncated_data(data(sys.maxsize + 1), self.size_overflow_error) | ||
|
|
||
| def test_badly_escaped_string(self): | ||
| self.check_unpickling_error(ValueError, b"S'\\'\n.") | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| :mod:`pickle` now avoids allocating an arbitrary large amount of memory | ||
| based on small untrusted input when unpickling specially crafted data. |
Uh oh!
There was an error while loading. Please reload this page.