-
-
Couldn't load subscription status.
- Fork 33.2k
gh-115952: Fix potential virtual memory allocation denial of service in the pickle module #119204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 12 commits
822230d
88f1461
048099b
d9d1d1d
6f6f765
d0e667e
3462d0e
becbd25
b257974
1e487ca
184984d
f0c0728
1f4e2f1
c72d095
e89bfea
a80106c
01bc6b9
20aa1bf
ab58869
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -853,9 +853,8 @@ def assert_is_copy(self, obj, objcopy, msg=None): | |
| self.assertEqual(getattr(obj, slot, None), | ||
| getattr(objcopy, slot, None), msg=msg) | ||
|
|
||
| def check_unpickling_error(self, errors, data): | ||
| with self.subTest(data=data), \ | ||
| self.assertRaises(errors): | ||
| def check_unpickling_error_strict(self, errors, data): | ||
| with self.assertRaises(errors): | ||
| try: | ||
| self.loads(data) | ||
| except BaseException as exc: | ||
|
|
@@ -864,6 +863,10 @@ def check_unpickling_error(self, errors, data): | |
| (data, exc.__class__.__name__, exc)) | ||
| raise | ||
|
|
||
| def check_unpickling_error(self, errors, data): | ||
| with self.subTest(data=data): | ||
| self.check_unpickling_error_strict(errors, data) | ||
|
|
||
| def test_load_from_data0(self): | ||
| self.assert_is_copy(self._testdata, self.loads(DATA0)) | ||
|
|
||
|
|
@@ -1115,6 +1118,112 @@ def test_negative_32b_binput(self): | |
| dumped = b'\x80\x03X\x01\x00\x00\x00ar\xff\xff\xff\xff.' | ||
| self.check_unpickling_error(ValueError, dumped) | ||
|
|
||
| def itersize(self, start, stop): | ||
gpshead marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # Produce geometrical increasing sequence from start to stop | ||
| # (inclusively) for tests. | ||
| size = start | ||
| while size < stop: | ||
| yield size | ||
| size <<= 1 | ||
| yield stop | ||
|
|
||
| def test_too_large_put(self): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a comment explaining why this and the next test method result in ([], []) being returned no matter what rather than an error when the values are too large? (I suspect readers with a knowledge of the specific pickle protocol may understand, but it isn't obvious otherwise) |
||
| data = lambda n: (b'((lp' + str(n).encode() + b'\n' + | ||
| b'g' + str(n).encode() + b'\nt.') | ||
| for idx in [10**6, 10**9, 10**12]: | ||
| self.assertEqual(self.loads(data(idx)), ([],)*2) | ||
|
|
||
| def test_too_large_long_binput(self): | ||
| data = lambda n: (b'(]r' + struct.pack('<I', n) + | ||
| b'j' + struct.pack('<I', n) + b't.') | ||
| for idx in self.itersize(1 << 20, min(sys.maxsize, (1 << 32) - 1)): | ||
| self.assertEqual(self.loads(data(idx)), ([],)*2) | ||
|
|
||
| def _test_truncated_data(self, dumped, expected_error=None): | ||
| if expected_error is None: | ||
| expected_error = self.truncated_data_error | ||
| # BytesIO | ||
| with self.assertRaisesRegex(*expected_error): | ||
| self.loads(dumped) | ||
| if hasattr(self, 'unpickler'): | ||
| try: | ||
| with open(TESTFN, 'wb') as f: | ||
| f.write(dumped) | ||
| # buffered file | ||
| with open(TESTFN, 'rb') as f: | ||
| u = self.unpickler(f) | ||
| with self.assertRaisesRegex(*expected_error): | ||
| u.load() | ||
| # unbuffered file | ||
| with open(TESTFN, 'rb', buffering=0) as f: | ||
| u = self.unpickler(f) | ||
| with self.assertRaisesRegex(*expected_error): | ||
| u.load() | ||
| finally: | ||
| os_helper.unlink(TESTFN) | ||
|
|
||
| def test_truncated_large_binstring(self): | ||
| data = lambda size: b'T' + struct.pack('<I', size) + b'.' * 5 | ||
| self.assertEqual(self.loads(data(4)), '....') # self-testing | ||
| for size in self.itersize(1 << 10, min(sys.maxsize - 5, (1 << 31) - 1)): | ||
| self._test_truncated_data(data(size)) | ||
| self._test_truncated_data(data(1 << 31), | ||
| (pickle.UnpicklingError, 'truncated|exceeds|negative byte count')) | ||
|
|
||
| def test_truncated_large_binunicode(self): | ||
| data = lambda size: b'X' + struct.pack('<I', size) + b'.' * 5 | ||
| self.assertEqual(self.loads(data(4)), '....') # self-testing | ||
| for size in self.itersize(1 << 10, min(sys.maxsize - 5, (1 << 32) - 1)): | ||
| self._test_truncated_data(data(size)) | ||
|
|
||
| def test_truncated_large_binbytes(self): | ||
| data = lambda size: b'B' + struct.pack('<I', size) + b'.' * 5 | ||
| self.assertEqual(self.loads(data(4)), b'....') # self-testing | ||
| for size in self.itersize(1 << 10, min(sys.maxsize, 1 << 31)): | ||
| self._test_truncated_data(data(size)) | ||
|
|
||
| def test_truncated_large_long4(self): | ||
| data = lambda size: b'\x8b' + struct.pack('<I', size) + b'.' * 5 | ||
| self.assertEqual(self.loads(data(4)), 0x2e2e2e2e) # self-testing | ||
| for size in self.itersize(1 << 10, min(sys.maxsize - 5, (1 << 31) - 1)): | ||
| self._test_truncated_data(data(size)) | ||
| self._test_truncated_data(data(1 << 31), | ||
| (pickle.UnpicklingError, 'LONG pickle has negative byte count')) | ||
|
|
||
| def test_truncated_large_frame(self): | ||
| data = lambda size: b'\x95' + struct.pack('<Q', size) + b'N.' | ||
| self.assertIsNone(self.loads(data(2))) # self-testing | ||
| for size in self.itersize(1 << 10, sys.maxsize - 9): | ||
| self._test_truncated_data(data(size)) | ||
| if sys.maxsize + 1 < 1 << 64: | ||
| self._test_truncated_data(data(sys.maxsize + 1), | ||
| ((OverflowError, ValueError), | ||
| 'FRAME length exceeds|frame size > sys.maxsize')) | ||
|
|
||
| def test_truncated_large_binunicode8(self): | ||
| data = lambda size: b'\x8d' + struct.pack('<Q', size) + b'.' * 5 | ||
| self.assertEqual(self.loads(data(4)), '....') # self-testing | ||
| for size in self.itersize(1 << 10, sys.maxsize - 9): | ||
| self._test_truncated_data(data(size)) | ||
| if sys.maxsize + 1 < 1 << 64: | ||
| self._test_truncated_data(data(sys.maxsize + 1), self.size_overflow_error) | ||
|
|
||
| def test_truncated_large_binbytes8(self): | ||
| data = lambda size: b'\x8e' + struct.pack('<Q', size) + b'.' * 5 | ||
| self.assertEqual(self.loads(data(4)), b'....') # self-testing | ||
| for size in self.itersize(1 << 10, sys.maxsize): | ||
| self._test_truncated_data(data(size)) | ||
| if sys.maxsize + 1 < 1 << 64: | ||
| self._test_truncated_data(data(sys.maxsize + 1), self.size_overflow_error) | ||
|
|
||
| def test_truncated_large_bytearray8(self): | ||
| data = lambda size: b'\x96' + struct.pack('<Q', size) + b'.' * 5 | ||
| self.assertEqual(self.loads(data(4)), bytearray(b'....')) # self-testing | ||
| for size in self.itersize(1 << 10, sys.maxsize): | ||
| self._test_truncated_data(data(size)) | ||
| if sys.maxsize + 1 < 1 << 64: | ||
| self._test_truncated_data(data(sys.maxsize + 1), self.size_overflow_error) | ||
|
|
||
| def test_badly_escaped_string(self): | ||
| self.check_unpickling_error(ValueError, b"S'\\'\n.") | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| :mod:`pickle` now avoids allocating an arbitrary large amount of memory | ||
| based on small untrusted input when unpickling specially crafted data. |
Uh oh!
There was an error while loading. Please reload this page.