diff --git a/Lib/test/test_io/test_general.py b/Lib/test/test_io/test_general.py index 5f645e3abbe230..c4a26719374450 100644 --- a/Lib/test/test_io/test_general.py +++ b/Lib/test/test_io/test_general.py @@ -6,7 +6,6 @@ import abc import array import errno -import locale import os import pickle import random @@ -28,23 +27,10 @@ from test.support.os_helper import FakePath from .utils import byteslike, CTestCase, PyTestCase -import codecs import io # C implementation of io import _pyio as pyio # Python implementation of io -def _default_chunk_size(): - """Get the default TextIOWrapper chunk size""" - with open(__file__, "r", encoding="latin-1") as f: - return f._CHUNK_SIZE - - - -class BadIndex: - def __index__(self): - 1/0 - - class IOTest: def setUp(self): @@ -2393,1640 +2379,6 @@ class PyBufferedRandomTest(BufferedRandomTest, PyTestCase): tp = pyio.BufferedRandom -# To fully exercise seek/tell, the StatefulIncrementalDecoder has these -# properties: -# - A single output character can correspond to many bytes of input. -# - The number of input bytes to complete the character can be -# undetermined until the last input byte is received. -# - The number of input bytes can vary depending on previous input. -# - A single input byte can correspond to many characters of output. -# - The number of output characters can be undetermined until the -# last input byte is received. -# - The number of output characters can vary depending on previous input. - -class StatefulIncrementalDecoder(codecs.IncrementalDecoder): - """ - For testing seek/tell behavior with a stateful, buffering decoder. - - Input is a sequence of words. Words may be fixed-length (length set - by input) or variable-length (period-terminated). In variable-length - mode, extra periods are ignored. Possible words are: - - 'i' followed by a number sets the input length, I (maximum 99). - When I is set to 0, words are space-terminated. - - 'o' followed by a number sets the output length, O (maximum 99). - - Any other word is converted into a word followed by a period on - the output. The output word consists of the input word truncated - or padded out with hyphens to make its length equal to O. If O - is 0, the word is output verbatim without truncating or padding. - I and O are initially set to 1. When I changes, any buffered input is - re-scanned according to the new I. EOF also terminates the last word. - """ - - def __init__(self, errors='strict'): - codecs.IncrementalDecoder.__init__(self, errors) - self.reset() - - def __repr__(self): - return '' % id(self) - - def reset(self): - self.i = 1 - self.o = 1 - self.buffer = bytearray() - - def getstate(self): - i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset() - return bytes(self.buffer), i*100 + o - - def setstate(self, state): - buffer, io = state - self.buffer = bytearray(buffer) - i, o = divmod(io, 100) - self.i, self.o = i ^ 1, o ^ 1 - - def decode(self, input, final=False): - output = '' - for b in input: - if self.i == 0: # variable-length, terminated with period - if b == ord('.'): - if self.buffer: - output += self.process_word() - else: - self.buffer.append(b) - else: # fixed-length, terminate after self.i bytes - self.buffer.append(b) - if len(self.buffer) == self.i: - output += self.process_word() - if final and self.buffer: # EOF terminates the last word - output += self.process_word() - return output - - def process_word(self): - output = '' - if self.buffer[0] == ord('i'): - self.i = min(99, int(self.buffer[1:] or 0)) # set input length - elif self.buffer[0] == ord('o'): - self.o = min(99, int(self.buffer[1:] or 0)) # set output length - else: - output = self.buffer.decode('ascii') - if len(output) < self.o: - output += '-'*self.o # pad out with hyphens - if self.o: - output = output[:self.o] # truncate to output length - output += '.' - self.buffer = bytearray() - return output - - codecEnabled = False - - -# bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak -# when registering codecs and cleanup functions. -def lookupTestDecoder(name): - if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder': - latin1 = codecs.lookup('latin-1') - return codecs.CodecInfo( - name='test_decoder', encode=latin1.encode, decode=None, - incrementalencoder=None, - streamreader=None, streamwriter=None, - incrementaldecoder=StatefulIncrementalDecoder) - - -class StatefulIncrementalDecoderTest(unittest.TestCase): - """ - Make sure the StatefulIncrementalDecoder actually works. - """ - - test_cases = [ - # I=1, O=1 (fixed-length input == fixed-length output) - (b'abcd', False, 'a.b.c.d.'), - # I=0, O=0 (variable-length input, variable-length output) - (b'oiabcd', True, 'abcd.'), - # I=0, O=0 (should ignore extra periods) - (b'oi...abcd...', True, 'abcd.'), - # I=0, O=6 (variable-length input, fixed-length output) - (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'), - # I=2, O=6 (fixed-length input < fixed-length output) - (b'i.i2.o6xyz', True, 'xy----.z-----.'), - # I=6, O=3 (fixed-length input > fixed-length output) - (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'), - # I=0, then 3; O=29, then 15 (with longer output) - (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True, - 'a----------------------------.' + - 'b----------------------------.' + - 'cde--------------------------.' + - 'abcdefghijabcde.' + - 'a.b------------.' + - '.c.------------.' + - 'd.e------------.' + - 'k--------------.' + - 'l--------------.' + - 'm--------------.') - ] - - def test_decoder(self): - # Try a few one-shot test cases. - for input, eof, output in self.test_cases: - d = StatefulIncrementalDecoder() - self.assertEqual(d.decode(input, eof), output) - - # Also test an unfinished decode, followed by forcing EOF. - d = StatefulIncrementalDecoder() - self.assertEqual(d.decode(b'oiabcd'), '') - self.assertEqual(d.decode(b'', 1), 'abcd.') - -class TextIOWrapperTest: - - def setUp(self): - self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" - self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") - os_helper.unlink(os_helper.TESTFN) - codecs.register(lookupTestDecoder) - self.addCleanup(codecs.unregister, lookupTestDecoder) - - def tearDown(self): - os_helper.unlink(os_helper.TESTFN) - - def test_constructor(self): - r = self.BytesIO(b"\xc3\xa9\n\n") - b = self.BufferedReader(r, 1000) - t = self.TextIOWrapper(b, encoding="utf-8") - t.__init__(b, encoding="latin-1", newline="\r\n") - self.assertEqual(t.encoding, "latin-1") - self.assertEqual(t.line_buffering, False) - t.__init__(b, encoding="utf-8", line_buffering=True) - self.assertEqual(t.encoding, "utf-8") - self.assertEqual(t.line_buffering, True) - self.assertEqual("\xe9\n", t.readline()) - invalid_type = TypeError if self.is_C else ValueError - with self.assertRaises(invalid_type): - t.__init__(b, encoding=42) - with self.assertRaises(UnicodeEncodeError): - t.__init__(b, encoding='\udcfe') - with self.assertRaises(ValueError): - t.__init__(b, encoding='utf-8\0') - with self.assertRaises(invalid_type): - t.__init__(b, encoding="utf-8", errors=42) - if support.Py_DEBUG or sys.flags.dev_mode or self.is_C: - with self.assertRaises(UnicodeEncodeError): - t.__init__(b, encoding="utf-8", errors='\udcfe') - if support.Py_DEBUG or sys.flags.dev_mode or self.is_C: - with self.assertRaises(ValueError): - t.__init__(b, encoding="utf-8", errors='replace\0') - with self.assertRaises(TypeError): - t.__init__(b, encoding="utf-8", newline=42) - with self.assertRaises(ValueError): - t.__init__(b, encoding="utf-8", newline='\udcfe') - with self.assertRaises(ValueError): - t.__init__(b, encoding="utf-8", newline='\n\0') - with self.assertRaises(ValueError): - t.__init__(b, encoding="utf-8", newline='xyzzy') - - def test_uninitialized(self): - t = self.TextIOWrapper.__new__(self.TextIOWrapper) - del t - t = self.TextIOWrapper.__new__(self.TextIOWrapper) - self.assertRaises(Exception, repr, t) - self.assertRaisesRegex((ValueError, AttributeError), - 'uninitialized|has no attribute', - t.read, 0) - t.__init__(self.MockRawIO(), encoding="utf-8") - self.assertEqual(t.read(0), '') - - def test_non_text_encoding_codecs_are_rejected(self): - # Ensure the constructor complains if passed a codec that isn't - # marked as a text encoding - # http://bugs.python.org/issue20404 - r = self.BytesIO() - b = self.BufferedWriter(r) - with self.assertRaisesRegex(LookupError, "is not a text encoding"): - self.TextIOWrapper(b, encoding="hex") - - def test_detach(self): - r = self.BytesIO() - b = self.BufferedWriter(r) - t = self.TextIOWrapper(b, encoding="ascii") - self.assertIs(t.detach(), b) - - t = self.TextIOWrapper(b, encoding="ascii") - t.write("howdy") - self.assertFalse(r.getvalue()) - t.detach() - self.assertEqual(r.getvalue(), b"howdy") - self.assertRaises(ValueError, t.detach) - - # Operations independent of the detached stream should still work - repr(t) - self.assertEqual(t.encoding, "ascii") - self.assertEqual(t.errors, "strict") - self.assertFalse(t.line_buffering) - self.assertFalse(t.write_through) - - def test_repr(self): - raw = self.BytesIO("hello".encode("utf-8")) - b = self.BufferedReader(raw) - t = self.TextIOWrapper(b, encoding="utf-8") - modname = self.TextIOWrapper.__module__ - self.assertRegex(repr(t), - r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname) - raw.name = "dummy" - self.assertRegex(repr(t), - r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname) - t.mode = "r" - self.assertRegex(repr(t), - r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname) - raw.name = b"dummy" - self.assertRegex(repr(t), - r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname) - - t.buffer.detach() - repr(t) # Should not raise an exception - - def test_recursive_repr(self): - # Issue #25455 - raw = self.BytesIO() - t = self.TextIOWrapper(raw, encoding="utf-8") - with support.swap_attr(raw, 'name', t), support.infinite_recursion(25): - with self.assertRaises(RuntimeError): - repr(t) # Should not crash - - def test_subclass_repr(self): - class TestSubclass(self.TextIOWrapper): - pass - - f = TestSubclass(self.StringIO()) - self.assertIn(TestSubclass.__name__, repr(f)) - - def test_line_buffering(self): - r = self.BytesIO() - b = self.BufferedWriter(r, 1000) - t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True) - t.write("X") - self.assertEqual(r.getvalue(), b"") # No flush happened - t.write("Y\nZ") - self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed - t.write("A\rB") - self.assertEqual(r.getvalue(), b"XY\nZA\rB") - - def test_reconfigure_line_buffering(self): - r = self.BytesIO() - b = self.BufferedWriter(r, 1000) - t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False) - t.write("AB\nC") - self.assertEqual(r.getvalue(), b"") - - t.reconfigure(line_buffering=True) # implicit flush - self.assertEqual(r.getvalue(), b"AB\nC") - t.write("DEF\nG") - self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") - t.write("H") - self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") - t.reconfigure(line_buffering=False) # implicit flush - self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") - t.write("IJ") - self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") - - # Keeping default value - t.reconfigure() - t.reconfigure(line_buffering=None) - self.assertEqual(t.line_buffering, False) - t.reconfigure(line_buffering=True) - t.reconfigure() - t.reconfigure(line_buffering=None) - self.assertEqual(t.line_buffering, True) - - @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled") - def test_default_encoding(self): - with os_helper.EnvironmentVarGuard() as env: - # try to get a user preferred encoding different than the current - # locale encoding to check that TextIOWrapper() uses the current - # locale encoding and not the user preferred encoding - env.unset('LC_ALL', 'LANG', 'LC_CTYPE') - - current_locale_encoding = locale.getencoding() - b = self.BytesIO() - with warnings.catch_warnings(): - warnings.simplefilter("ignore", EncodingWarning) - t = self.TextIOWrapper(b) - self.assertEqual(t.encoding, current_locale_encoding) - - def test_encoding(self): - # Check the encoding attribute is always set, and valid - b = self.BytesIO() - t = self.TextIOWrapper(b, encoding="utf-8") - self.assertEqual(t.encoding, "utf-8") - with warnings.catch_warnings(): - warnings.simplefilter("ignore", EncodingWarning) - t = self.TextIOWrapper(b) - self.assertIsNotNone(t.encoding) - codecs.lookup(t.encoding) - - def test_encoding_errors_reading(self): - # (1) default - b = self.BytesIO(b"abc\n\xff\n") - t = self.TextIOWrapper(b, encoding="ascii") - self.assertRaises(UnicodeError, t.read) - # (2) explicit strict - b = self.BytesIO(b"abc\n\xff\n") - t = self.TextIOWrapper(b, encoding="ascii", errors="strict") - self.assertRaises(UnicodeError, t.read) - # (3) ignore - b = self.BytesIO(b"abc\n\xff\n") - t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") - self.assertEqual(t.read(), "abc\n\n") - # (4) replace - b = self.BytesIO(b"abc\n\xff\n") - t = self.TextIOWrapper(b, encoding="ascii", errors="replace") - self.assertEqual(t.read(), "abc\n\ufffd\n") - - def test_encoding_errors_writing(self): - # (1) default - b = self.BytesIO() - t = self.TextIOWrapper(b, encoding="ascii") - self.assertRaises(UnicodeError, t.write, "\xff") - # (2) explicit strict - b = self.BytesIO() - t = self.TextIOWrapper(b, encoding="ascii", errors="strict") - self.assertRaises(UnicodeError, t.write, "\xff") - # (3) ignore - b = self.BytesIO() - t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", - newline="\n") - t.write("abc\xffdef\n") - t.flush() - self.assertEqual(b.getvalue(), b"abcdef\n") - # (4) replace - b = self.BytesIO() - t = self.TextIOWrapper(b, encoding="ascii", errors="replace", - newline="\n") - t.write("abc\xffdef\n") - t.flush() - self.assertEqual(b.getvalue(), b"abc?def\n") - - def test_newlines(self): - input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] - - tests = [ - [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], - [ '', input_lines ], - [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ], - [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], - [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], - ] - encodings = ( - 'utf-8', 'latin-1', - 'utf-16', 'utf-16-le', 'utf-16-be', - 'utf-32', 'utf-32-le', 'utf-32-be', - ) - - # Try a range of buffer sizes to test the case where \r is the last - # character in TextIOWrapper._pending_line. - for encoding in encodings: - # XXX: str.encode() should return bytes - data = bytes(''.join(input_lines).encode(encoding)) - for do_reads in (False, True): - for bufsize in range(1, 10): - for newline, exp_lines in tests: - bufio = self.BufferedReader(self.BytesIO(data), bufsize) - textio = self.TextIOWrapper(bufio, newline=newline, - encoding=encoding) - if do_reads: - got_lines = [] - while True: - c2 = textio.read(2) - if c2 == '': - break - self.assertEqual(len(c2), 2) - got_lines.append(c2 + textio.readline()) - else: - got_lines = list(textio) - - for got_line, exp_line in zip(got_lines, exp_lines): - self.assertEqual(got_line, exp_line) - self.assertEqual(len(got_lines), len(exp_lines)) - - def test_newlines_input(self): - testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" - normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") - for newline, expected in [ - (None, normalized.decode("ascii").splitlines(keepends=True)), - ("", testdata.decode("ascii").splitlines(keepends=True)), - ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), - ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), - ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), - ]: - buf = self.BytesIO(testdata) - txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) - self.assertEqual(txt.readlines(), expected) - txt.seek(0) - self.assertEqual(txt.read(), "".join(expected)) - - def test_newlines_output(self): - testdict = { - "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", - "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", - "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", - "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", - } - tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) - for newline, expected in tests: - buf = self.BytesIO() - txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) - txt.write("AAA\nB") - txt.write("BB\nCCC\n") - txt.write("X\rY\r\nZ") - txt.flush() - self.assertEqual(buf.closed, False) - self.assertEqual(buf.getvalue(), expected) - - def test_destructor(self): - l = [] - base = self.BytesIO - class MyBytesIO(base): - def close(self): - l.append(self.getvalue()) - base.close(self) - b = MyBytesIO() - t = self.TextIOWrapper(b, encoding="ascii") - t.write("abc") - del t - support.gc_collect() - self.assertEqual([b"abc"], l) - - def test_override_destructor(self): - record = [] - class MyTextIO(self.TextIOWrapper): - def __del__(self): - record.append(1) - try: - f = super().__del__ - except AttributeError: - pass - else: - f() - def close(self): - record.append(2) - super().close() - def flush(self): - record.append(3) - super().flush() - b = self.BytesIO() - t = MyTextIO(b, encoding="ascii") - del t - support.gc_collect() - self.assertEqual(record, [1, 2, 3]) - - def test_error_through_destructor(self): - # Test that the exception state is not modified by a destructor, - # even if close() fails. - rawio = self.CloseFailureIO() - with support.catch_unraisable_exception() as cm: - with self.assertRaises(AttributeError): - self.TextIOWrapper(rawio, encoding="utf-8").xyzzy - - self.assertEqual(cm.unraisable.exc_type, OSError) - - # Systematic tests of the text I/O API - - def test_basic_io(self): - for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): - for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le": - f = self.open(os_helper.TESTFN, "w+", encoding=enc) - f._CHUNK_SIZE = chunksize - self.assertEqual(f.write("abc"), 3) - f.close() - f = self.open(os_helper.TESTFN, "r+", encoding=enc) - f._CHUNK_SIZE = chunksize - self.assertEqual(f.tell(), 0) - self.assertEqual(f.read(), "abc") - cookie = f.tell() - self.assertEqual(f.seek(0), 0) - self.assertEqual(f.read(None), "abc") - f.seek(0) - self.assertEqual(f.read(2), "ab") - self.assertEqual(f.read(1), "c") - self.assertEqual(f.read(1), "") - self.assertEqual(f.read(), "") - self.assertEqual(f.tell(), cookie) - self.assertEqual(f.seek(0), 0) - self.assertEqual(f.seek(0, 2), cookie) - self.assertEqual(f.write("def"), 3) - self.assertEqual(f.seek(cookie), cookie) - self.assertEqual(f.read(), "def") - if enc.startswith("utf"): - self.multi_line_test(f, enc) - f.close() - - def multi_line_test(self, f, enc): - f.seek(0) - f.truncate() - sample = "s\xff\u0fff\uffff" - wlines = [] - for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): - chars = [] - for i in range(size): - chars.append(sample[i % len(sample)]) - line = "".join(chars) + "\n" - wlines.append((f.tell(), line)) - f.write(line) - f.seek(0) - rlines = [] - while True: - pos = f.tell() - line = f.readline() - if not line: - break - rlines.append((pos, line)) - self.assertEqual(rlines, wlines) - - def test_telling(self): - f = self.open(os_helper.TESTFN, "w+", encoding="utf-8") - p0 = f.tell() - f.write("\xff\n") - p1 = f.tell() - f.write("\xff\n") - p2 = f.tell() - f.seek(0) - self.assertEqual(f.tell(), p0) - self.assertEqual(f.readline(), "\xff\n") - self.assertEqual(f.tell(), p1) - self.assertEqual(f.readline(), "\xff\n") - self.assertEqual(f.tell(), p2) - f.seek(0) - for line in f: - self.assertEqual(line, "\xff\n") - self.assertRaises(OSError, f.tell) - self.assertEqual(f.tell(), p2) - f.close() - - def test_seeking(self): - chunk_size = _default_chunk_size() - prefix_size = chunk_size - 2 - u_prefix = "a" * prefix_size - prefix = bytes(u_prefix.encode("utf-8")) - self.assertEqual(len(u_prefix), len(prefix)) - u_suffix = "\u8888\n" - suffix = bytes(u_suffix.encode("utf-8")) - line = prefix + suffix - with self.open(os_helper.TESTFN, "wb") as f: - f.write(line*2) - with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: - s = f.read(prefix_size) - self.assertEqual(s, str(prefix, "ascii")) - self.assertEqual(f.tell(), prefix_size) - self.assertEqual(f.readline(), u_suffix) - - def test_seeking_too(self): - # Regression test for a specific bug - data = b'\xe0\xbf\xbf\n' - with self.open(os_helper.TESTFN, "wb") as f: - f.write(data) - with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: - f._CHUNK_SIZE # Just test that it exists - f._CHUNK_SIZE = 2 - f.readline() - f.tell() - - def test_seek_and_tell(self): - #Test seek/tell using the StatefulIncrementalDecoder. - # Make test faster by doing smaller seeks - CHUNK_SIZE = 128 - - def test_seek_and_tell_with_data(data, min_pos=0): - """Tell/seek to various points within a data stream and ensure - that the decoded data returned by read() is consistent.""" - f = self.open(os_helper.TESTFN, 'wb') - f.write(data) - f.close() - f = self.open(os_helper.TESTFN, encoding='test_decoder') - f._CHUNK_SIZE = CHUNK_SIZE - decoded = f.read() - f.close() - - for i in range(min_pos, len(decoded) + 1): # seek positions - for j in [1, 5, len(decoded) - i]: # read lengths - f = self.open(os_helper.TESTFN, encoding='test_decoder') - self.assertEqual(f.read(i), decoded[:i]) - cookie = f.tell() - self.assertEqual(f.read(j), decoded[i:i + j]) - f.seek(cookie) - self.assertEqual(f.read(), decoded[i:]) - f.close() - - # Enable the test decoder. - StatefulIncrementalDecoder.codecEnabled = 1 - - # Run the tests. - try: - # Try each test case. - for input, _, _ in StatefulIncrementalDecoderTest.test_cases: - test_seek_and_tell_with_data(input) - - # Position each test case so that it crosses a chunk boundary. - for input, _, _ in StatefulIncrementalDecoderTest.test_cases: - offset = CHUNK_SIZE - len(input)//2 - prefix = b'.'*offset - # Don't bother seeking into the prefix (takes too long). - min_pos = offset*2 - test_seek_and_tell_with_data(prefix + input, min_pos) - - # Ensure our test decoder won't interfere with subsequent tests. - finally: - StatefulIncrementalDecoder.codecEnabled = 0 - - def test_multibyte_seek_and_tell(self): - f = self.open(os_helper.TESTFN, "w", encoding="euc_jp") - f.write("AB\n\u3046\u3048\n") - f.close() - - f = self.open(os_helper.TESTFN, "r", encoding="euc_jp") - self.assertEqual(f.readline(), "AB\n") - p0 = f.tell() - self.assertEqual(f.readline(), "\u3046\u3048\n") - p1 = f.tell() - f.seek(p0) - self.assertEqual(f.readline(), "\u3046\u3048\n") - self.assertEqual(f.tell(), p1) - f.close() - - def test_seek_with_encoder_state(self): - f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004") - f.write("\u00e6\u0300") - p0 = f.tell() - f.write("\u00e6") - f.seek(p0) - f.write("\u0300") - f.close() - - f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004") - self.assertEqual(f.readline(), "\u00e6\u0300\u0300") - f.close() - - def test_encoded_writes(self): - data = "1234567890" - tests = ("utf-16", - "utf-16-le", - "utf-16-be", - "utf-32", - "utf-32-le", - "utf-32-be") - for encoding in tests: - buf = self.BytesIO() - f = self.TextIOWrapper(buf, encoding=encoding) - # Check if the BOM is written only once (see issue1753). - f.write(data) - f.write(data) - f.seek(0) - self.assertEqual(f.read(), data * 2) - f.seek(0) - self.assertEqual(f.read(), data * 2) - self.assertEqual(buf.getvalue(), (data * 2).encode(encoding)) - - def test_unreadable(self): - class UnReadable(self.BytesIO): - def readable(self): - return False - txt = self.TextIOWrapper(UnReadable(), encoding="utf-8") - self.assertRaises(OSError, txt.read) - - def test_read_one_by_one(self): - txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8") - reads = "" - while True: - c = txt.read(1) - if not c: - break - reads += c - self.assertEqual(reads, "AA\nBB") - - def test_readlines(self): - txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8") - self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) - txt.seek(0) - self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) - txt.seek(0) - self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"]) - - # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. - def test_read_by_chunk(self): - # make sure "\r\n" straddles 128 char boundary. - txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8") - reads = "" - while True: - c = txt.read(128) - if not c: - break - reads += c - self.assertEqual(reads, "A"*127+"\nB") - - def test_writelines(self): - l = ['ab', 'cd', 'ef'] - buf = self.BytesIO() - txt = self.TextIOWrapper(buf, encoding="utf-8") - txt.writelines(l) - txt.flush() - self.assertEqual(buf.getvalue(), b'abcdef') - - def test_writelines_userlist(self): - l = UserList(['ab', 'cd', 'ef']) - buf = self.BytesIO() - txt = self.TextIOWrapper(buf, encoding="utf-8") - txt.writelines(l) - txt.flush() - self.assertEqual(buf.getvalue(), b'abcdef') - - def test_writelines_error(self): - txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8") - self.assertRaises(TypeError, txt.writelines, [1, 2, 3]) - self.assertRaises(TypeError, txt.writelines, None) - self.assertRaises(TypeError, txt.writelines, b'abc') - - def test_issue1395_1(self): - txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") - - # read one char at a time - reads = "" - while True: - c = txt.read(1) - if not c: - break - reads += c - self.assertEqual(reads, self.normalized) - - def test_issue1395_2(self): - txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") - txt._CHUNK_SIZE = 4 - - reads = "" - while True: - c = txt.read(4) - if not c: - break - reads += c - self.assertEqual(reads, self.normalized) - - def test_issue1395_3(self): - txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") - txt._CHUNK_SIZE = 4 - - reads = txt.read(4) - reads += txt.read(4) - reads += txt.readline() - reads += txt.readline() - reads += txt.readline() - self.assertEqual(reads, self.normalized) - - def test_issue1395_4(self): - txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") - txt._CHUNK_SIZE = 4 - - reads = txt.read(4) - reads += txt.read() - self.assertEqual(reads, self.normalized) - - def test_issue1395_5(self): - txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") - txt._CHUNK_SIZE = 4 - - reads = txt.read(4) - pos = txt.tell() - txt.seek(0) - txt.seek(pos) - self.assertEqual(txt.read(4), "BBB\n") - - def test_issue2282(self): - buffer = self.BytesIO(self.testdata) - txt = self.TextIOWrapper(buffer, encoding="ascii") - - self.assertEqual(buffer.seekable(), txt.seekable()) - - def test_append_bom(self): - # The BOM is not written again when appending to a non-empty file - filename = os_helper.TESTFN - for charset in ('utf-8-sig', 'utf-16', 'utf-32'): - with self.open(filename, 'w', encoding=charset) as f: - f.write('aaa') - pos = f.tell() - with self.open(filename, 'rb') as f: - self.assertEqual(f.read(), 'aaa'.encode(charset)) - - with self.open(filename, 'a', encoding=charset) as f: - f.write('xxx') - with self.open(filename, 'rb') as f: - self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) - - def test_seek_bom(self): - # Same test, but when seeking manually - filename = os_helper.TESTFN - for charset in ('utf-8-sig', 'utf-16', 'utf-32'): - with self.open(filename, 'w', encoding=charset) as f: - f.write('aaa') - pos = f.tell() - with self.open(filename, 'r+', encoding=charset) as f: - f.seek(pos) - f.write('zzz') - f.seek(0) - f.write('bbb') - with self.open(filename, 'rb') as f: - self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) - - def test_seek_append_bom(self): - # Same test, but first seek to the start and then to the end - filename = os_helper.TESTFN - for charset in ('utf-8-sig', 'utf-16', 'utf-32'): - with self.open(filename, 'w', encoding=charset) as f: - f.write('aaa') - with self.open(filename, 'a', encoding=charset) as f: - f.seek(0) - f.seek(0, self.SEEK_END) - f.write('xxx') - with self.open(filename, 'rb') as f: - self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) - - def test_errors_property(self): - with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: - self.assertEqual(f.errors, "strict") - with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f: - self.assertEqual(f.errors, "replace") - - @support.no_tracing - @threading_helper.requires_working_threading() - def test_threads_write(self): - # Issue6750: concurrent writes could duplicate data - event = threading.Event() - with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f: - def run(n): - text = "Thread%03d\n" % n - event.wait() - f.write(text) - threads = [threading.Thread(target=run, args=(x,)) - for x in range(20)] - with threading_helper.start_threads(threads, event.set): - time.sleep(0.02) - with self.open(os_helper.TESTFN, encoding="utf-8") as f: - content = f.read() - for n in range(20): - self.assertEqual(content.count("Thread%03d\n" % n), 1) - - def test_flush_error_on_close(self): - # Test that text file is closed despite failed flush - # and that flush() is called before file closed. - txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") - closed = [] - def bad_flush(): - closed[:] = [txt.closed, txt.buffer.closed] - raise OSError() - txt.flush = bad_flush - self.assertRaises(OSError, txt.close) # exception not swallowed - self.assertTrue(txt.closed) - self.assertTrue(txt.buffer.closed) - self.assertTrue(closed) # flush() called - self.assertFalse(closed[0]) # flush() called before file closed - self.assertFalse(closed[1]) - txt.flush = lambda: None # break reference loop - - def test_close_error_on_close(self): - buffer = self.BytesIO(self.testdata) - def bad_flush(): - raise OSError('flush') - def bad_close(): - raise OSError('close') - buffer.close = bad_close - txt = self.TextIOWrapper(buffer, encoding="ascii") - txt.flush = bad_flush - with self.assertRaises(OSError) as err: # exception not swallowed - txt.close() - self.assertEqual(err.exception.args, ('close',)) - self.assertIsInstance(err.exception.__context__, OSError) - self.assertEqual(err.exception.__context__.args, ('flush',)) - self.assertFalse(txt.closed) - - # Silence destructor error - buffer.close = lambda: None - txt.flush = lambda: None - - def test_nonnormalized_close_error_on_close(self): - # Issue #21677 - buffer = self.BytesIO(self.testdata) - def bad_flush(): - raise non_existing_flush - def bad_close(): - raise non_existing_close - buffer.close = bad_close - txt = self.TextIOWrapper(buffer, encoding="ascii") - txt.flush = bad_flush - with self.assertRaises(NameError) as err: # exception not swallowed - txt.close() - self.assertIn('non_existing_close', str(err.exception)) - self.assertIsInstance(err.exception.__context__, NameError) - self.assertIn('non_existing_flush', str(err.exception.__context__)) - self.assertFalse(txt.closed) - - # Silence destructor error - buffer.close = lambda: None - txt.flush = lambda: None - - def test_multi_close(self): - txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") - txt.close() - txt.close() - txt.close() - self.assertRaises(ValueError, txt.flush) - - def test_unseekable(self): - txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8") - self.assertRaises(self.UnsupportedOperation, txt.tell) - self.assertRaises(self.UnsupportedOperation, txt.seek, 0) - - def test_readonly_attributes(self): - txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") - buf = self.BytesIO(self.testdata) - with self.assertRaises(AttributeError): - txt.buffer = buf - - def test_rawio(self): - # Issue #12591: TextIOWrapper must work with raw I/O objects, so - # that subprocess.Popen() can have the required unbuffered - # semantics with universal_newlines=True. - raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) - txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') - # Reads - self.assertEqual(txt.read(4), 'abcd') - self.assertEqual(txt.readline(), 'efghi\n') - self.assertEqual(list(txt), ['jkl\n', 'opq\n']) - - def test_rawio_write_through(self): - # Issue #12591: with write_through=True, writes don't need a flush - raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) - txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n', - write_through=True) - txt.write('1') - txt.write('23\n4') - txt.write('5') - self.assertEqual(b''.join(raw._write_stack), b'123\n45') - - def test_bufio_write_through(self): - # Issue #21396: write_through=True doesn't force a flush() - # on the underlying binary buffered object. - flush_called, write_called = [], [] - class BufferedWriter(self.BufferedWriter): - def flush(self, *args, **kwargs): - flush_called.append(True) - return super().flush(*args, **kwargs) - def write(self, *args, **kwargs): - write_called.append(True) - return super().write(*args, **kwargs) - - rawio = self.BytesIO() - data = b"a" - bufio = BufferedWriter(rawio, len(data)*2) - textio = self.TextIOWrapper(bufio, encoding='ascii', - write_through=True) - # write to the buffered io but don't overflow the buffer - text = data.decode('ascii') - textio.write(text) - - # buffer.flush is not called with write_through=True - self.assertFalse(flush_called) - # buffer.write *is* called with write_through=True - self.assertTrue(write_called) - self.assertEqual(rawio.getvalue(), b"") # no flush - - write_called = [] # reset - textio.write(text * 10) # total content is larger than bufio buffer - self.assertTrue(write_called) - self.assertEqual(rawio.getvalue(), data * 11) # all flushed - - def test_reconfigure_write_through(self): - raw = self.MockRawIO([]) - t = self.TextIOWrapper(raw, encoding='ascii', newline='\n') - t.write('1') - t.reconfigure(write_through=True) # implied flush - self.assertEqual(t.write_through, True) - self.assertEqual(b''.join(raw._write_stack), b'1') - t.write('23') - self.assertEqual(b''.join(raw._write_stack), b'123') - t.reconfigure(write_through=False) - self.assertEqual(t.write_through, False) - t.write('45') - t.flush() - self.assertEqual(b''.join(raw._write_stack), b'12345') - # Keeping default value - t.reconfigure() - t.reconfigure(write_through=None) - self.assertEqual(t.write_through, False) - t.reconfigure(write_through=True) - t.reconfigure() - t.reconfigure(write_through=None) - self.assertEqual(t.write_through, True) - - def test_read_nonbytes(self): - # Issue #17106 - # Crash when underlying read() returns non-bytes - t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") - self.assertRaises(TypeError, t.read, 1) - t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") - self.assertRaises(TypeError, t.readline) - t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") - self.assertRaises(TypeError, t.read) - - def test_illegal_encoder(self): - # Issue 31271: Calling write() while the return value of encoder's - # encode() is invalid shouldn't cause an assertion failure. - rot13 = codecs.lookup("rot13") - with support.swap_attr(rot13, '_is_text_encoding', True): - t = self.TextIOWrapper(self.BytesIO(b'foo'), encoding="rot13") - self.assertRaises(TypeError, t.write, 'bar') - - def test_illegal_decoder(self): - # Issue #17106 - # Bypass the early encoding check added in issue 20404 - def _make_illegal_wrapper(): - quopri = codecs.lookup("quopri") - quopri._is_text_encoding = True - try: - t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), - newline='\n', encoding="quopri") - finally: - quopri._is_text_encoding = False - return t - # Crash when decoder returns non-string - t = _make_illegal_wrapper() - self.assertRaises(TypeError, t.read, 1) - t = _make_illegal_wrapper() - self.assertRaises(TypeError, t.readline) - t = _make_illegal_wrapper() - self.assertRaises(TypeError, t.read) - - # Issue 31243: calling read() while the return value of decoder's - # getstate() is invalid should neither crash the interpreter nor - # raise a SystemError. - def _make_very_illegal_wrapper(getstate_ret_val): - class BadDecoder: - def getstate(self): - return getstate_ret_val - def _get_bad_decoder(dummy): - return BadDecoder() - quopri = codecs.lookup("quopri") - with support.swap_attr(quopri, 'incrementaldecoder', - _get_bad_decoder): - return _make_illegal_wrapper() - t = _make_very_illegal_wrapper(42) - self.assertRaises(TypeError, t.read, 42) - t = _make_very_illegal_wrapper(()) - self.assertRaises(TypeError, t.read, 42) - t = _make_very_illegal_wrapper((1, 2)) - self.assertRaises(TypeError, t.read, 42) - - def _check_create_at_shutdown(self, **kwargs): - # Issue #20037: creating a TextIOWrapper at shutdown - # shouldn't crash the interpreter. - iomod = self.io.__name__ - code = """if 1: - import codecs - import {iomod} as io - - # Avoid looking up codecs at shutdown - codecs.lookup('utf-8') - - class C: - def __del__(self): - io.TextIOWrapper(io.BytesIO(), **{kwargs}) - print("ok") - c = C() - """.format(iomod=iomod, kwargs=kwargs) - return assert_python_ok("-c", code) - - def test_create_at_shutdown_without_encoding(self): - rc, out, err = self._check_create_at_shutdown() - if err: - # Can error out with a RuntimeError if the module state - # isn't found. - self.assertIn(self.shutdown_error, err.decode()) - else: - self.assertEqual("ok", out.decode().strip()) - - def test_create_at_shutdown_with_encoding(self): - rc, out, err = self._check_create_at_shutdown(encoding='utf-8', - errors='strict') - self.assertFalse(err) - self.assertEqual("ok", out.decode().strip()) - - def test_read_byteslike(self): - r = MemviewBytesIO(b'Just some random string\n') - t = self.TextIOWrapper(r, 'utf-8') - - # TextIOwrapper will not read the full string, because - # we truncate it to a multiple of the native int size - # so that we can construct a more complex memoryview. - bytes_val = _to_memoryview(r.getvalue()).tobytes() - - self.assertEqual(t.read(200), bytes_val.decode('utf-8')) - - def test_issue22849(self): - class F(object): - def readable(self): return True - def writable(self): return True - def seekable(self): return True - - for i in range(10): - try: - self.TextIOWrapper(F(), encoding='utf-8') - except Exception: - pass - - F.tell = lambda x: 0 - t = self.TextIOWrapper(F(), encoding='utf-8') - - def test_reconfigure_locale(self): - wrapper = self.TextIOWrapper(self.BytesIO(b"test")) - wrapper.reconfigure(encoding="locale") - - def test_reconfigure_encoding_read(self): - # latin1 -> utf8 - # (latin1 can decode utf-8 encoded string) - data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8') - raw = self.BytesIO(data) - txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') - self.assertEqual(txt.readline(), 'abc\xe9\n') - with self.assertRaises(self.UnsupportedOperation): - txt.reconfigure(encoding='utf-8') - with self.assertRaises(self.UnsupportedOperation): - txt.reconfigure(newline=None) - - def test_reconfigure_write_fromascii(self): - # ascii has a specific encodefunc in the C implementation, - # but utf-8-sig has not. Make sure that we get rid of the - # cached encodefunc when we switch encoders. - raw = self.BytesIO() - txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') - txt.write('foo\n') - txt.reconfigure(encoding='utf-8-sig') - txt.write('\xe9\n') - txt.flush() - self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n') - - def test_reconfigure_write(self): - # latin -> utf8 - raw = self.BytesIO() - txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') - txt.write('abc\xe9\n') - txt.reconfigure(encoding='utf-8') - self.assertEqual(raw.getvalue(), b'abc\xe9\n') - txt.write('d\xe9f\n') - txt.flush() - self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n') - - # ascii -> utf-8-sig: ensure that no BOM is written in the middle of - # the file - raw = self.BytesIO() - txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') - txt.write('abc\n') - txt.reconfigure(encoding='utf-8-sig') - txt.write('d\xe9f\n') - txt.flush() - self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n') - - def test_reconfigure_write_non_seekable(self): - raw = self.BytesIO() - raw.seekable = lambda: False - raw.seek = None - txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') - txt.write('abc\n') - txt.reconfigure(encoding='utf-8-sig') - txt.write('d\xe9f\n') - txt.flush() - - # If the raw stream is not seekable, there'll be a BOM - self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n') - - def test_reconfigure_defaults(self): - txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n') - txt.reconfigure(encoding=None) - self.assertEqual(txt.encoding, 'ascii') - self.assertEqual(txt.errors, 'replace') - txt.write('LF\n') - - txt.reconfigure(newline='\r\n') - self.assertEqual(txt.encoding, 'ascii') - self.assertEqual(txt.errors, 'replace') - - txt.reconfigure(errors='ignore') - self.assertEqual(txt.encoding, 'ascii') - self.assertEqual(txt.errors, 'ignore') - txt.write('CRLF\n') - - txt.reconfigure(encoding='utf-8', newline=None) - self.assertEqual(txt.errors, 'strict') - txt.seek(0) - self.assertEqual(txt.read(), 'LF\nCRLF\n') - - self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n') - - def test_reconfigure_errors(self): - txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\r') - with self.assertRaises(TypeError): # there was a crash - txt.reconfigure(encoding=42) - if self.is_C: - with self.assertRaises(UnicodeEncodeError): - txt.reconfigure(encoding='\udcfe') - with self.assertRaises(LookupError): - txt.reconfigure(encoding='locale\0') - # TODO: txt.reconfigure(encoding='utf-8\0') - # TODO: txt.reconfigure(encoding='nonexisting') - with self.assertRaises(TypeError): - txt.reconfigure(errors=42) - if self.is_C: - with self.assertRaises(UnicodeEncodeError): - txt.reconfigure(errors='\udcfe') - # TODO: txt.reconfigure(errors='ignore\0') - # TODO: txt.reconfigure(errors='nonexisting') - with self.assertRaises(TypeError): - txt.reconfigure(newline=42) - with self.assertRaises(ValueError): - txt.reconfigure(newline='\udcfe') - with self.assertRaises(ValueError): - txt.reconfigure(newline='xyz') - if not self.is_C: - # TODO: Should fail in C too. - with self.assertRaises(ValueError): - txt.reconfigure(newline='\n\0') - if self.is_C: - # TODO: Use __bool__(), not __index__(). - with self.assertRaises(ZeroDivisionError): - txt.reconfigure(line_buffering=BadIndex()) - with self.assertRaises(OverflowError): - txt.reconfigure(line_buffering=2**1000) - with self.assertRaises(ZeroDivisionError): - txt.reconfigure(write_through=BadIndex()) - with self.assertRaises(OverflowError): - txt.reconfigure(write_through=2**1000) - with self.assertRaises(ZeroDivisionError): # there was a crash - txt.reconfigure(line_buffering=BadIndex(), - write_through=BadIndex()) - self.assertEqual(txt.encoding, 'ascii') - self.assertEqual(txt.errors, 'replace') - self.assertIs(txt.line_buffering, False) - self.assertIs(txt.write_through, False) - - txt.reconfigure(encoding='latin1', errors='ignore', newline='\r\n', - line_buffering=True, write_through=True) - self.assertEqual(txt.encoding, 'latin1') - self.assertEqual(txt.errors, 'ignore') - self.assertIs(txt.line_buffering, True) - self.assertIs(txt.write_through, True) - - def test_reconfigure_newline(self): - raw = self.BytesIO(b'CR\rEOF') - txt = self.TextIOWrapper(raw, 'ascii', newline='\n') - txt.reconfigure(newline=None) - self.assertEqual(txt.readline(), 'CR\n') - raw = self.BytesIO(b'CR\rEOF') - txt = self.TextIOWrapper(raw, 'ascii', newline='\n') - txt.reconfigure(newline='') - self.assertEqual(txt.readline(), 'CR\r') - raw = self.BytesIO(b'CR\rLF\nEOF') - txt = self.TextIOWrapper(raw, 'ascii', newline='\r') - txt.reconfigure(newline='\n') - self.assertEqual(txt.readline(), 'CR\rLF\n') - raw = self.BytesIO(b'LF\nCR\rEOF') - txt = self.TextIOWrapper(raw, 'ascii', newline='\n') - txt.reconfigure(newline='\r') - self.assertEqual(txt.readline(), 'LF\nCR\r') - raw = self.BytesIO(b'CR\rCRLF\r\nEOF') - txt = self.TextIOWrapper(raw, 'ascii', newline='\r') - txt.reconfigure(newline='\r\n') - self.assertEqual(txt.readline(), 'CR\rCRLF\r\n') - - txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r') - txt.reconfigure(newline=None) - txt.write('linesep\n') - txt.reconfigure(newline='') - txt.write('LF\n') - txt.reconfigure(newline='\n') - txt.write('LF\n') - txt.reconfigure(newline='\r') - txt.write('CR\n') - txt.reconfigure(newline='\r\n') - txt.write('CRLF\n') - expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n' - self.assertEqual(txt.detach().getvalue().decode('ascii'), expected) - - def test_issue25862(self): - # Assertion failures occurred in tell() after read() and write(). - t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') - t.read(1) - t.read() - t.tell() - t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') - t.read(1) - t.write('x') - t.tell() - - def test_issue35928(self): - p = self.BufferedRWPair(self.BytesIO(b'foo\nbar\n'), self.BytesIO()) - f = self.TextIOWrapper(p) - res = f.readline() - self.assertEqual(res, 'foo\n') - f.write(res) - self.assertEqual(res + f.readline(), 'foo\nbar\n') - - def test_pickling_subclass(self): - global MyTextIO - class MyTextIO(self.TextIOWrapper): - def __init__(self, raw, tag): - super().__init__(raw) - self.tag = tag - def __getstate__(self): - return self.tag, self.buffer.getvalue() - def __setstate__(slf, state): - tag, value = state - slf.__init__(self.BytesIO(value), tag) - - raw = self.BytesIO(b'data') - txt = MyTextIO(raw, 'ham') - for proto in range(pickle.HIGHEST_PROTOCOL + 1): - with self.subTest(protocol=proto): - pickled = pickle.dumps(txt, proto) - newtxt = pickle.loads(pickled) - self.assertEqual(newtxt.buffer.getvalue(), b'data') - self.assertEqual(newtxt.tag, 'ham') - del MyTextIO - - @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") - def test_read_non_blocking(self): - import os - r, w = os.pipe() - try: - os.set_blocking(r, False) - with self.io.open(r, 'rt') as textfile: - r = None - # Nothing has been written so a non-blocking read raises a BlockingIOError exception. - with self.assertRaises(BlockingIOError): - textfile.read() - finally: - if r is not None: - os.close(r) - os.close(w) - - -class MemviewBytesIO(io.BytesIO): - '''A BytesIO object whose read method returns memoryviews - rather than bytes''' - - def read1(self, len_): - return _to_memoryview(super().read1(len_)) - - def read(self, len_): - return _to_memoryview(super().read(len_)) - -def _to_memoryview(buf): - '''Convert bytes-object *buf* to a non-trivial memoryview''' - - arr = array.array('i') - idx = len(buf) - len(buf) % arr.itemsize - arr.frombytes(buf[:idx]) - return memoryview(arr) - - -class CTextIOWrapperTest(TextIOWrapperTest, CTestCase): - shutdown_error = "LookupError: unknown encoding: ascii" - - def test_initialization(self): - r = self.BytesIO(b"\xc3\xa9\n\n") - b = self.BufferedReader(r, 1000) - t = self.TextIOWrapper(b, encoding="utf-8") - self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy') - self.assertRaises(ValueError, t.read) - - t = self.TextIOWrapper.__new__(self.TextIOWrapper) - self.assertRaises(Exception, repr, t) - - def test_garbage_collection(self): - # C TextIOWrapper objects are collected, and collecting them flushes - # all data to disk. - # The Python version has __del__, so it ends in gc.garbage instead. - with warnings.catch_warnings(): - warnings.simplefilter("ignore", ResourceWarning) - rawio = self.FileIO(os_helper.TESTFN, "wb") - b = self.BufferedWriter(rawio) - t = self.TextIOWrapper(b, encoding="ascii") - t.write("456def") - t.x = t - wr = weakref.ref(t) - del t - support.gc_collect() - self.assertIsNone(wr(), wr) - with self.open(os_helper.TESTFN, "rb") as f: - self.assertEqual(f.read(), b"456def") - - def test_rwpair_cleared_before_textio(self): - # Issue 13070: TextIOWrapper's finalization would crash when called - # after the reference to the underlying BufferedRWPair's writer got - # cleared by the GC. - for i in range(1000): - b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) - t1 = self.TextIOWrapper(b1, encoding="ascii") - b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) - t2 = self.TextIOWrapper(b2, encoding="ascii") - # circular references - t1.buddy = t2 - t2.buddy = t1 - support.gc_collect() - - def test_del__CHUNK_SIZE_SystemError(self): - t = self.TextIOWrapper(self.BytesIO(), encoding='ascii') - with self.assertRaises(AttributeError): - del t._CHUNK_SIZE - - def test_internal_buffer_size(self): - # bpo-43260: TextIOWrapper's internal buffer should not store - # data larger than chunk size. - chunk_size = 8192 # default chunk size, updated later - - class MockIO(self.MockRawIO): - def write(self, data): - if len(data) > chunk_size: - raise RuntimeError - return super().write(data) - - buf = MockIO() - t = self.TextIOWrapper(buf, encoding="ascii") - chunk_size = t._CHUNK_SIZE - t.write("abc") - t.write("def") - # default chunk size is 8192 bytes so t don't write data to buf. - self.assertEqual([], buf._write_stack) - - with self.assertRaises(RuntimeError): - t.write("x"*(chunk_size+1)) - - self.assertEqual([b"abcdef"], buf._write_stack) - t.write("ghi") - t.write("x"*chunk_size) - self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack) - - def test_issue119506(self): - chunk_size = 8192 - - class MockIO(self.MockRawIO): - written = False - def write(self, data): - if not self.written: - self.written = True - t.write("middle") - return super().write(data) - - buf = MockIO() - t = self.TextIOWrapper(buf) - t.write("abc") - t.write("def") - # writing data which size >= chunk_size cause flushing buffer before write. - t.write("g" * chunk_size) - t.flush() - - self.assertEqual([b"abcdef", b"middle", b"g"*chunk_size], - buf._write_stack) - - -class PyTextIOWrapperTest(TextIOWrapperTest, PyTestCase): - shutdown_error = "LookupError: unknown encoding: ascii" - - -class IncrementalNewlineDecoderTest: - - def check_newline_decoding_utf8(self, decoder): - # UTF-8 specific tests for a newline decoder - def _check_decode(b, s, **kwargs): - # We exercise getstate() / setstate() as well as decode() - state = decoder.getstate() - self.assertEqual(decoder.decode(b, **kwargs), s) - decoder.setstate(state) - self.assertEqual(decoder.decode(b, **kwargs), s) - - _check_decode(b'\xe8\xa2\x88', "\u8888") - - _check_decode(b'\xe8', "") - _check_decode(b'\xa2', "") - _check_decode(b'\x88', "\u8888") - - _check_decode(b'\xe8', "") - _check_decode(b'\xa2', "") - _check_decode(b'\x88', "\u8888") - - _check_decode(b'\xe8', "") - self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True) - - decoder.reset() - _check_decode(b'\n', "\n") - _check_decode(b'\r', "") - _check_decode(b'', "\n", final=True) - _check_decode(b'\r', "\n", final=True) - - _check_decode(b'\r', "") - _check_decode(b'a', "\na") - - _check_decode(b'\r\r\n', "\n\n") - _check_decode(b'\r', "") - _check_decode(b'\r', "\n") - _check_decode(b'\na', "\na") - - _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n") - _check_decode(b'\xe8\xa2\x88', "\u8888") - _check_decode(b'\n', "\n") - _check_decode(b'\xe8\xa2\x88\r', "\u8888") - _check_decode(b'\n', "\n") - - def check_newline_decoding(self, decoder, encoding): - result = [] - if encoding is not None: - encoder = codecs.getincrementalencoder(encoding)() - def _decode_bytewise(s): - # Decode one byte at a time - for b in encoder.encode(s): - result.append(decoder.decode(bytes([b]))) - else: - encoder = None - def _decode_bytewise(s): - # Decode one char at a time - for c in s: - result.append(decoder.decode(c)) - self.assertEqual(decoder.newlines, None) - _decode_bytewise("abc\n\r") - self.assertEqual(decoder.newlines, '\n') - _decode_bytewise("\nabc") - self.assertEqual(decoder.newlines, ('\n', '\r\n')) - _decode_bytewise("abc\r") - self.assertEqual(decoder.newlines, ('\n', '\r\n')) - _decode_bytewise("abc") - self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) - _decode_bytewise("abc\r") - self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") - decoder.reset() - input = "abc" - if encoder is not None: - encoder.reset() - input = encoder.encode(input) - self.assertEqual(decoder.decode(input), "abc") - self.assertEqual(decoder.newlines, None) - - def test_newline_decoder(self): - encodings = ( - # None meaning the IncrementalNewlineDecoder takes unicode input - # rather than bytes input - None, 'utf-8', 'latin-1', - 'utf-16', 'utf-16-le', 'utf-16-be', - 'utf-32', 'utf-32-le', 'utf-32-be', - ) - for enc in encodings: - decoder = enc and codecs.getincrementaldecoder(enc)() - decoder = self.IncrementalNewlineDecoder(decoder, translate=True) - self.check_newline_decoding(decoder, enc) - decoder = codecs.getincrementaldecoder("utf-8")() - decoder = self.IncrementalNewlineDecoder(decoder, translate=True) - self.check_newline_decoding_utf8(decoder) - self.assertRaises(TypeError, decoder.setstate, 42) - - def test_newline_bytes(self): - # Issue 5433: Excessive optimization in IncrementalNewlineDecoder - def _check(dec): - self.assertEqual(dec.newlines, None) - self.assertEqual(dec.decode("\u0D00"), "\u0D00") - self.assertEqual(dec.newlines, None) - self.assertEqual(dec.decode("\u0A00"), "\u0A00") - self.assertEqual(dec.newlines, None) - dec = self.IncrementalNewlineDecoder(None, translate=False) - _check(dec) - dec = self.IncrementalNewlineDecoder(None, translate=True) - _check(dec) - - def test_translate(self): - # issue 35062 - for translate in (-2, -1, 1, 2): - decoder = codecs.getincrementaldecoder("utf-8")() - decoder = self.IncrementalNewlineDecoder(decoder, translate) - self.check_newline_decoding_utf8(decoder) - decoder = codecs.getincrementaldecoder("utf-8")() - decoder = self.IncrementalNewlineDecoder(decoder, translate=0) - self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n") - -class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest, unittest.TestCase): - IncrementalNewlineDecoder = io.IncrementalNewlineDecoder - - @support.cpython_only - def test_uninitialized(self): - uninitialized = self.IncrementalNewlineDecoder.__new__( - self.IncrementalNewlineDecoder) - self.assertRaises(ValueError, uninitialized.decode, b'bar') - self.assertRaises(ValueError, uninitialized.getstate) - self.assertRaises(ValueError, uninitialized.setstate, (b'foo', 0)) - self.assertRaises(ValueError, uninitialized.reset) - - -class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest, unittest.TestCase): - IncrementalNewlineDecoder = pyio.IncrementalNewlineDecoder - - # XXX Tests for open() class MiscIOTest: diff --git a/Lib/test/test_io/test_textio.py b/Lib/test/test_io/test_textio.py new file mode 100644 index 00000000000000..d8d0928b4ba69b --- /dev/null +++ b/Lib/test/test_io/test_textio.py @@ -0,0 +1,1664 @@ +import array +import codecs +import locale +import os +import pickle +import sys +import threading +import time +import unittest +import warnings +import weakref +from collections import UserList +from test import support +from test.support import os_helper, threading_helper +from test.support.script_helper import assert_python_ok +from .utils import CTestCase, PyTestCase + +import io # C implementation of io +import _pyio as pyio # Python implementation of io + + +def _default_chunk_size(): + """Get the default TextIOWrapper chunk size""" + with open(__file__, "r", encoding="latin-1") as f: + return f._CHUNK_SIZE + + +class BadIndex: + def __index__(self): + 1/0 + + +# To fully exercise seek/tell, the StatefulIncrementalDecoder has these +# properties: +# - A single output character can correspond to many bytes of input. +# - The number of input bytes to complete the character can be +# undetermined until the last input byte is received. +# - The number of input bytes can vary depending on previous input. +# - A single input byte can correspond to many characters of output. +# - The number of output characters can be undetermined until the +# last input byte is received. +# - The number of output characters can vary depending on previous input. + +class StatefulIncrementalDecoder(codecs.IncrementalDecoder): + """ + For testing seek/tell behavior with a stateful, buffering decoder. + + Input is a sequence of words. Words may be fixed-length (length set + by input) or variable-length (period-terminated). In variable-length + mode, extra periods are ignored. Possible words are: + - 'i' followed by a number sets the input length, I (maximum 99). + When I is set to 0, words are space-terminated. + - 'o' followed by a number sets the output length, O (maximum 99). + - Any other word is converted into a word followed by a period on + the output. The output word consists of the input word truncated + or padded out with hyphens to make its length equal to O. If O + is 0, the word is output verbatim without truncating or padding. + I and O are initially set to 1. When I changes, any buffered input is + re-scanned according to the new I. EOF also terminates the last word. + """ + + def __init__(self, errors='strict'): + codecs.IncrementalDecoder.__init__(self, errors) + self.reset() + + def __repr__(self): + return '' % id(self) + + def reset(self): + self.i = 1 + self.o = 1 + self.buffer = bytearray() + + def getstate(self): + i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset() + return bytes(self.buffer), i*100 + o + + def setstate(self, state): + buffer, io = state + self.buffer = bytearray(buffer) + i, o = divmod(io, 100) + self.i, self.o = i ^ 1, o ^ 1 + + def decode(self, input, final=False): + output = '' + for b in input: + if self.i == 0: # variable-length, terminated with period + if b == ord('.'): + if self.buffer: + output += self.process_word() + else: + self.buffer.append(b) + else: # fixed-length, terminate after self.i bytes + self.buffer.append(b) + if len(self.buffer) == self.i: + output += self.process_word() + if final and self.buffer: # EOF terminates the last word + output += self.process_word() + return output + + def process_word(self): + output = '' + if self.buffer[0] == ord('i'): + self.i = min(99, int(self.buffer[1:] or 0)) # set input length + elif self.buffer[0] == ord('o'): + self.o = min(99, int(self.buffer[1:] or 0)) # set output length + else: + output = self.buffer.decode('ascii') + if len(output) < self.o: + output += '-'*self.o # pad out with hyphens + if self.o: + output = output[:self.o] # truncate to output length + output += '.' + self.buffer = bytearray() + return output + + codecEnabled = False + + +# bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak +# when registering codecs and cleanup functions. +def lookupTestDecoder(name): + if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder': + latin1 = codecs.lookup('latin-1') + return codecs.CodecInfo( + name='test_decoder', encode=latin1.encode, decode=None, + incrementalencoder=None, + streamreader=None, streamwriter=None, + incrementaldecoder=StatefulIncrementalDecoder) + + +class StatefulIncrementalDecoderTest(unittest.TestCase): + """ + Make sure the StatefulIncrementalDecoder actually works. + """ + + test_cases = [ + # I=1, O=1 (fixed-length input == fixed-length output) + (b'abcd', False, 'a.b.c.d.'), + # I=0, O=0 (variable-length input, variable-length output) + (b'oiabcd', True, 'abcd.'), + # I=0, O=0 (should ignore extra periods) + (b'oi...abcd...', True, 'abcd.'), + # I=0, O=6 (variable-length input, fixed-length output) + (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'), + # I=2, O=6 (fixed-length input < fixed-length output) + (b'i.i2.o6xyz', True, 'xy----.z-----.'), + # I=6, O=3 (fixed-length input > fixed-length output) + (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'), + # I=0, then 3; O=29, then 15 (with longer output) + (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True, + 'a----------------------------.' + + 'b----------------------------.' + + 'cde--------------------------.' + + 'abcdefghijabcde.' + + 'a.b------------.' + + '.c.------------.' + + 'd.e------------.' + + 'k--------------.' + + 'l--------------.' + + 'm--------------.') + ] + + def test_decoder(self): + # Try a few one-shot test cases. + for input, eof, output in self.test_cases: + d = StatefulIncrementalDecoder() + self.assertEqual(d.decode(input, eof), output) + + # Also test an unfinished decode, followed by forcing EOF. + d = StatefulIncrementalDecoder() + self.assertEqual(d.decode(b'oiabcd'), '') + self.assertEqual(d.decode(b'', 1), 'abcd.') + +class TextIOWrapperTest: + + def setUp(self): + self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" + self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") + os_helper.unlink(os_helper.TESTFN) + codecs.register(lookupTestDecoder) + self.addCleanup(codecs.unregister, lookupTestDecoder) + + def tearDown(self): + os_helper.unlink(os_helper.TESTFN) + + def test_constructor(self): + r = self.BytesIO(b"\xc3\xa9\n\n") + b = self.BufferedReader(r, 1000) + t = self.TextIOWrapper(b, encoding="utf-8") + t.__init__(b, encoding="latin-1", newline="\r\n") + self.assertEqual(t.encoding, "latin-1") + self.assertEqual(t.line_buffering, False) + t.__init__(b, encoding="utf-8", line_buffering=True) + self.assertEqual(t.encoding, "utf-8") + self.assertEqual(t.line_buffering, True) + self.assertEqual("\xe9\n", t.readline()) + invalid_type = TypeError if self.is_C else ValueError + with self.assertRaises(invalid_type): + t.__init__(b, encoding=42) + with self.assertRaises(UnicodeEncodeError): + t.__init__(b, encoding='\udcfe') + with self.assertRaises(ValueError): + t.__init__(b, encoding='utf-8\0') + with self.assertRaises(invalid_type): + t.__init__(b, encoding="utf-8", errors=42) + if support.Py_DEBUG or sys.flags.dev_mode or self.is_C: + with self.assertRaises(UnicodeEncodeError): + t.__init__(b, encoding="utf-8", errors='\udcfe') + if support.Py_DEBUG or sys.flags.dev_mode or self.is_C: + with self.assertRaises(ValueError): + t.__init__(b, encoding="utf-8", errors='replace\0') + with self.assertRaises(TypeError): + t.__init__(b, encoding="utf-8", newline=42) + with self.assertRaises(ValueError): + t.__init__(b, encoding="utf-8", newline='\udcfe') + with self.assertRaises(ValueError): + t.__init__(b, encoding="utf-8", newline='\n\0') + with self.assertRaises(ValueError): + t.__init__(b, encoding="utf-8", newline='xyzzy') + + def test_uninitialized(self): + t = self.TextIOWrapper.__new__(self.TextIOWrapper) + del t + t = self.TextIOWrapper.__new__(self.TextIOWrapper) + self.assertRaises(Exception, repr, t) + self.assertRaisesRegex((ValueError, AttributeError), + 'uninitialized|has no attribute', + t.read, 0) + t.__init__(self.MockRawIO(), encoding="utf-8") + self.assertEqual(t.read(0), '') + + def test_non_text_encoding_codecs_are_rejected(self): + # Ensure the constructor complains if passed a codec that isn't + # marked as a text encoding + # http://bugs.python.org/issue20404 + r = self.BytesIO() + b = self.BufferedWriter(r) + with self.assertRaisesRegex(LookupError, "is not a text encoding"): + self.TextIOWrapper(b, encoding="hex") + + def test_detach(self): + r = self.BytesIO() + b = self.BufferedWriter(r) + t = self.TextIOWrapper(b, encoding="ascii") + self.assertIs(t.detach(), b) + + t = self.TextIOWrapper(b, encoding="ascii") + t.write("howdy") + self.assertFalse(r.getvalue()) + t.detach() + self.assertEqual(r.getvalue(), b"howdy") + self.assertRaises(ValueError, t.detach) + + # Operations independent of the detached stream should still work + repr(t) + self.assertEqual(t.encoding, "ascii") + self.assertEqual(t.errors, "strict") + self.assertFalse(t.line_buffering) + self.assertFalse(t.write_through) + + def test_repr(self): + raw = self.BytesIO("hello".encode("utf-8")) + b = self.BufferedReader(raw) + t = self.TextIOWrapper(b, encoding="utf-8") + modname = self.TextIOWrapper.__module__ + self.assertRegex(repr(t), + r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname) + raw.name = "dummy" + self.assertRegex(repr(t), + r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname) + t.mode = "r" + self.assertRegex(repr(t), + r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname) + raw.name = b"dummy" + self.assertRegex(repr(t), + r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname) + + t.buffer.detach() + repr(t) # Should not raise an exception + + def test_recursive_repr(self): + # Issue #25455 + raw = self.BytesIO() + t = self.TextIOWrapper(raw, encoding="utf-8") + with support.swap_attr(raw, 'name', t), support.infinite_recursion(25): + with self.assertRaises(RuntimeError): + repr(t) # Should not crash + + def test_subclass_repr(self): + class TestSubclass(self.TextIOWrapper): + pass + + f = TestSubclass(self.StringIO()) + self.assertIn(TestSubclass.__name__, repr(f)) + + def test_line_buffering(self): + r = self.BytesIO() + b = self.BufferedWriter(r, 1000) + t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True) + t.write("X") + self.assertEqual(r.getvalue(), b"") # No flush happened + t.write("Y\nZ") + self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed + t.write("A\rB") + self.assertEqual(r.getvalue(), b"XY\nZA\rB") + + def test_reconfigure_line_buffering(self): + r = self.BytesIO() + b = self.BufferedWriter(r, 1000) + t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False) + t.write("AB\nC") + self.assertEqual(r.getvalue(), b"") + + t.reconfigure(line_buffering=True) # implicit flush + self.assertEqual(r.getvalue(), b"AB\nC") + t.write("DEF\nG") + self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") + t.write("H") + self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") + t.reconfigure(line_buffering=False) # implicit flush + self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") + t.write("IJ") + self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") + + # Keeping default value + t.reconfigure() + t.reconfigure(line_buffering=None) + self.assertEqual(t.line_buffering, False) + t.reconfigure(line_buffering=True) + t.reconfigure() + t.reconfigure(line_buffering=None) + self.assertEqual(t.line_buffering, True) + + @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled") + def test_default_encoding(self): + with os_helper.EnvironmentVarGuard() as env: + # try to get a user preferred encoding different than the current + # locale encoding to check that TextIOWrapper() uses the current + # locale encoding and not the user preferred encoding + env.unset('LC_ALL', 'LANG', 'LC_CTYPE') + + current_locale_encoding = locale.getencoding() + b = self.BytesIO() + with warnings.catch_warnings(): + warnings.simplefilter("ignore", EncodingWarning) + t = self.TextIOWrapper(b) + self.assertEqual(t.encoding, current_locale_encoding) + + def test_encoding(self): + # Check the encoding attribute is always set, and valid + b = self.BytesIO() + t = self.TextIOWrapper(b, encoding="utf-8") + self.assertEqual(t.encoding, "utf-8") + with warnings.catch_warnings(): + warnings.simplefilter("ignore", EncodingWarning) + t = self.TextIOWrapper(b) + self.assertIsNotNone(t.encoding) + codecs.lookup(t.encoding) + + def test_encoding_errors_reading(self): + # (1) default + b = self.BytesIO(b"abc\n\xff\n") + t = self.TextIOWrapper(b, encoding="ascii") + self.assertRaises(UnicodeError, t.read) + # (2) explicit strict + b = self.BytesIO(b"abc\n\xff\n") + t = self.TextIOWrapper(b, encoding="ascii", errors="strict") + self.assertRaises(UnicodeError, t.read) + # (3) ignore + b = self.BytesIO(b"abc\n\xff\n") + t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") + self.assertEqual(t.read(), "abc\n\n") + # (4) replace + b = self.BytesIO(b"abc\n\xff\n") + t = self.TextIOWrapper(b, encoding="ascii", errors="replace") + self.assertEqual(t.read(), "abc\n\ufffd\n") + + def test_encoding_errors_writing(self): + # (1) default + b = self.BytesIO() + t = self.TextIOWrapper(b, encoding="ascii") + self.assertRaises(UnicodeError, t.write, "\xff") + # (2) explicit strict + b = self.BytesIO() + t = self.TextIOWrapper(b, encoding="ascii", errors="strict") + self.assertRaises(UnicodeError, t.write, "\xff") + # (3) ignore + b = self.BytesIO() + t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", + newline="\n") + t.write("abc\xffdef\n") + t.flush() + self.assertEqual(b.getvalue(), b"abcdef\n") + # (4) replace + b = self.BytesIO() + t = self.TextIOWrapper(b, encoding="ascii", errors="replace", + newline="\n") + t.write("abc\xffdef\n") + t.flush() + self.assertEqual(b.getvalue(), b"abc?def\n") + + def test_newlines(self): + input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] + + tests = [ + [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], + [ '', input_lines ], + [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ], + [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], + [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], + ] + encodings = ( + 'utf-8', 'latin-1', + 'utf-16', 'utf-16-le', 'utf-16-be', + 'utf-32', 'utf-32-le', 'utf-32-be', + ) + + # Try a range of buffer sizes to test the case where \r is the last + # character in TextIOWrapper._pending_line. + for encoding in encodings: + # XXX: str.encode() should return bytes + data = bytes(''.join(input_lines).encode(encoding)) + for do_reads in (False, True): + for bufsize in range(1, 10): + for newline, exp_lines in tests: + bufio = self.BufferedReader(self.BytesIO(data), bufsize) + textio = self.TextIOWrapper(bufio, newline=newline, + encoding=encoding) + if do_reads: + got_lines = [] + while True: + c2 = textio.read(2) + if c2 == '': + break + self.assertEqual(len(c2), 2) + got_lines.append(c2 + textio.readline()) + else: + got_lines = list(textio) + + for got_line, exp_line in zip(got_lines, exp_lines): + self.assertEqual(got_line, exp_line) + self.assertEqual(len(got_lines), len(exp_lines)) + + def test_newlines_input(self): + testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" + normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") + for newline, expected in [ + (None, normalized.decode("ascii").splitlines(keepends=True)), + ("", testdata.decode("ascii").splitlines(keepends=True)), + ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), + ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), + ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), + ]: + buf = self.BytesIO(testdata) + txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) + self.assertEqual(txt.readlines(), expected) + txt.seek(0) + self.assertEqual(txt.read(), "".join(expected)) + + def test_newlines_output(self): + testdict = { + "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", + "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", + "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", + "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", + } + tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) + for newline, expected in tests: + buf = self.BytesIO() + txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) + txt.write("AAA\nB") + txt.write("BB\nCCC\n") + txt.write("X\rY\r\nZ") + txt.flush() + self.assertEqual(buf.closed, False) + self.assertEqual(buf.getvalue(), expected) + + def test_destructor(self): + l = [] + base = self.BytesIO + class MyBytesIO(base): + def close(self): + l.append(self.getvalue()) + base.close(self) + b = MyBytesIO() + t = self.TextIOWrapper(b, encoding="ascii") + t.write("abc") + del t + support.gc_collect() + self.assertEqual([b"abc"], l) + + def test_override_destructor(self): + record = [] + class MyTextIO(self.TextIOWrapper): + def __del__(self): + record.append(1) + try: + f = super().__del__ + except AttributeError: + pass + else: + f() + def close(self): + record.append(2) + super().close() + def flush(self): + record.append(3) + super().flush() + b = self.BytesIO() + t = MyTextIO(b, encoding="ascii") + del t + support.gc_collect() + self.assertEqual(record, [1, 2, 3]) + + def test_error_through_destructor(self): + # Test that the exception state is not modified by a destructor, + # even if close() fails. + rawio = self.CloseFailureIO() + with support.catch_unraisable_exception() as cm: + with self.assertRaises(AttributeError): + self.TextIOWrapper(rawio, encoding="utf-8").xyzzy + + self.assertEqual(cm.unraisable.exc_type, OSError) + + # Systematic tests of the text I/O API + + def test_basic_io(self): + for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): + for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le": + f = self.open(os_helper.TESTFN, "w+", encoding=enc) + f._CHUNK_SIZE = chunksize + self.assertEqual(f.write("abc"), 3) + f.close() + f = self.open(os_helper.TESTFN, "r+", encoding=enc) + f._CHUNK_SIZE = chunksize + self.assertEqual(f.tell(), 0) + self.assertEqual(f.read(), "abc") + cookie = f.tell() + self.assertEqual(f.seek(0), 0) + self.assertEqual(f.read(None), "abc") + f.seek(0) + self.assertEqual(f.read(2), "ab") + self.assertEqual(f.read(1), "c") + self.assertEqual(f.read(1), "") + self.assertEqual(f.read(), "") + self.assertEqual(f.tell(), cookie) + self.assertEqual(f.seek(0), 0) + self.assertEqual(f.seek(0, 2), cookie) + self.assertEqual(f.write("def"), 3) + self.assertEqual(f.seek(cookie), cookie) + self.assertEqual(f.read(), "def") + if enc.startswith("utf"): + self.multi_line_test(f, enc) + f.close() + + def multi_line_test(self, f, enc): + f.seek(0) + f.truncate() + sample = "s\xff\u0fff\uffff" + wlines = [] + for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): + chars = [] + for i in range(size): + chars.append(sample[i % len(sample)]) + line = "".join(chars) + "\n" + wlines.append((f.tell(), line)) + f.write(line) + f.seek(0) + rlines = [] + while True: + pos = f.tell() + line = f.readline() + if not line: + break + rlines.append((pos, line)) + self.assertEqual(rlines, wlines) + + def test_telling(self): + f = self.open(os_helper.TESTFN, "w+", encoding="utf-8") + p0 = f.tell() + f.write("\xff\n") + p1 = f.tell() + f.write("\xff\n") + p2 = f.tell() + f.seek(0) + self.assertEqual(f.tell(), p0) + self.assertEqual(f.readline(), "\xff\n") + self.assertEqual(f.tell(), p1) + self.assertEqual(f.readline(), "\xff\n") + self.assertEqual(f.tell(), p2) + f.seek(0) + for line in f: + self.assertEqual(line, "\xff\n") + self.assertRaises(OSError, f.tell) + self.assertEqual(f.tell(), p2) + f.close() + + def test_seeking(self): + chunk_size = _default_chunk_size() + prefix_size = chunk_size - 2 + u_prefix = "a" * prefix_size + prefix = bytes(u_prefix.encode("utf-8")) + self.assertEqual(len(u_prefix), len(prefix)) + u_suffix = "\u8888\n" + suffix = bytes(u_suffix.encode("utf-8")) + line = prefix + suffix + with self.open(os_helper.TESTFN, "wb") as f: + f.write(line*2) + with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: + s = f.read(prefix_size) + self.assertEqual(s, str(prefix, "ascii")) + self.assertEqual(f.tell(), prefix_size) + self.assertEqual(f.readline(), u_suffix) + + def test_seeking_too(self): + # Regression test for a specific bug + data = b'\xe0\xbf\xbf\n' + with self.open(os_helper.TESTFN, "wb") as f: + f.write(data) + with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: + f._CHUNK_SIZE # Just test that it exists + f._CHUNK_SIZE = 2 + f.readline() + f.tell() + + def test_seek_and_tell(self): + #Test seek/tell using the StatefulIncrementalDecoder. + # Make test faster by doing smaller seeks + CHUNK_SIZE = 128 + + def test_seek_and_tell_with_data(data, min_pos=0): + """Tell/seek to various points within a data stream and ensure + that the decoded data returned by read() is consistent.""" + f = self.open(os_helper.TESTFN, 'wb') + f.write(data) + f.close() + f = self.open(os_helper.TESTFN, encoding='test_decoder') + f._CHUNK_SIZE = CHUNK_SIZE + decoded = f.read() + f.close() + + for i in range(min_pos, len(decoded) + 1): # seek positions + for j in [1, 5, len(decoded) - i]: # read lengths + f = self.open(os_helper.TESTFN, encoding='test_decoder') + self.assertEqual(f.read(i), decoded[:i]) + cookie = f.tell() + self.assertEqual(f.read(j), decoded[i:i + j]) + f.seek(cookie) + self.assertEqual(f.read(), decoded[i:]) + f.close() + + # Enable the test decoder. + StatefulIncrementalDecoder.codecEnabled = 1 + + # Run the tests. + try: + # Try each test case. + for input, _, _ in StatefulIncrementalDecoderTest.test_cases: + test_seek_and_tell_with_data(input) + + # Position each test case so that it crosses a chunk boundary. + for input, _, _ in StatefulIncrementalDecoderTest.test_cases: + offset = CHUNK_SIZE - len(input)//2 + prefix = b'.'*offset + # Don't bother seeking into the prefix (takes too long). + min_pos = offset*2 + test_seek_and_tell_with_data(prefix + input, min_pos) + + # Ensure our test decoder won't interfere with subsequent tests. + finally: + StatefulIncrementalDecoder.codecEnabled = 0 + + def test_multibyte_seek_and_tell(self): + f = self.open(os_helper.TESTFN, "w", encoding="euc_jp") + f.write("AB\n\u3046\u3048\n") + f.close() + + f = self.open(os_helper.TESTFN, "r", encoding="euc_jp") + self.assertEqual(f.readline(), "AB\n") + p0 = f.tell() + self.assertEqual(f.readline(), "\u3046\u3048\n") + p1 = f.tell() + f.seek(p0) + self.assertEqual(f.readline(), "\u3046\u3048\n") + self.assertEqual(f.tell(), p1) + f.close() + + def test_seek_with_encoder_state(self): + f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004") + f.write("\u00e6\u0300") + p0 = f.tell() + f.write("\u00e6") + f.seek(p0) + f.write("\u0300") + f.close() + + f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004") + self.assertEqual(f.readline(), "\u00e6\u0300\u0300") + f.close() + + def test_encoded_writes(self): + data = "1234567890" + tests = ("utf-16", + "utf-16-le", + "utf-16-be", + "utf-32", + "utf-32-le", + "utf-32-be") + for encoding in tests: + buf = self.BytesIO() + f = self.TextIOWrapper(buf, encoding=encoding) + # Check if the BOM is written only once (see issue1753). + f.write(data) + f.write(data) + f.seek(0) + self.assertEqual(f.read(), data * 2) + f.seek(0) + self.assertEqual(f.read(), data * 2) + self.assertEqual(buf.getvalue(), (data * 2).encode(encoding)) + + def test_unreadable(self): + class UnReadable(self.BytesIO): + def readable(self): + return False + txt = self.TextIOWrapper(UnReadable(), encoding="utf-8") + self.assertRaises(OSError, txt.read) + + def test_read_one_by_one(self): + txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8") + reads = "" + while True: + c = txt.read(1) + if not c: + break + reads += c + self.assertEqual(reads, "AA\nBB") + + def test_readlines(self): + txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8") + self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) + txt.seek(0) + self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) + txt.seek(0) + self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"]) + + # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. + def test_read_by_chunk(self): + # make sure "\r\n" straddles 128 char boundary. + txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8") + reads = "" + while True: + c = txt.read(128) + if not c: + break + reads += c + self.assertEqual(reads, "A"*127+"\nB") + + def test_writelines(self): + l = ['ab', 'cd', 'ef'] + buf = self.BytesIO() + txt = self.TextIOWrapper(buf, encoding="utf-8") + txt.writelines(l) + txt.flush() + self.assertEqual(buf.getvalue(), b'abcdef') + + def test_writelines_userlist(self): + l = UserList(['ab', 'cd', 'ef']) + buf = self.BytesIO() + txt = self.TextIOWrapper(buf, encoding="utf-8") + txt.writelines(l) + txt.flush() + self.assertEqual(buf.getvalue(), b'abcdef') + + def test_writelines_error(self): + txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8") + self.assertRaises(TypeError, txt.writelines, [1, 2, 3]) + self.assertRaises(TypeError, txt.writelines, None) + self.assertRaises(TypeError, txt.writelines, b'abc') + + def test_issue1395_1(self): + txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") + + # read one char at a time + reads = "" + while True: + c = txt.read(1) + if not c: + break + reads += c + self.assertEqual(reads, self.normalized) + + def test_issue1395_2(self): + txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") + txt._CHUNK_SIZE = 4 + + reads = "" + while True: + c = txt.read(4) + if not c: + break + reads += c + self.assertEqual(reads, self.normalized) + + def test_issue1395_3(self): + txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") + txt._CHUNK_SIZE = 4 + + reads = txt.read(4) + reads += txt.read(4) + reads += txt.readline() + reads += txt.readline() + reads += txt.readline() + self.assertEqual(reads, self.normalized) + + def test_issue1395_4(self): + txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") + txt._CHUNK_SIZE = 4 + + reads = txt.read(4) + reads += txt.read() + self.assertEqual(reads, self.normalized) + + def test_issue1395_5(self): + txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") + txt._CHUNK_SIZE = 4 + + reads = txt.read(4) + pos = txt.tell() + txt.seek(0) + txt.seek(pos) + self.assertEqual(txt.read(4), "BBB\n") + + def test_issue2282(self): + buffer = self.BytesIO(self.testdata) + txt = self.TextIOWrapper(buffer, encoding="ascii") + + self.assertEqual(buffer.seekable(), txt.seekable()) + + def test_append_bom(self): + # The BOM is not written again when appending to a non-empty file + filename = os_helper.TESTFN + for charset in ('utf-8-sig', 'utf-16', 'utf-32'): + with self.open(filename, 'w', encoding=charset) as f: + f.write('aaa') + pos = f.tell() + with self.open(filename, 'rb') as f: + self.assertEqual(f.read(), 'aaa'.encode(charset)) + + with self.open(filename, 'a', encoding=charset) as f: + f.write('xxx') + with self.open(filename, 'rb') as f: + self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) + + def test_seek_bom(self): + # Same test, but when seeking manually + filename = os_helper.TESTFN + for charset in ('utf-8-sig', 'utf-16', 'utf-32'): + with self.open(filename, 'w', encoding=charset) as f: + f.write('aaa') + pos = f.tell() + with self.open(filename, 'r+', encoding=charset) as f: + f.seek(pos) + f.write('zzz') + f.seek(0) + f.write('bbb') + with self.open(filename, 'rb') as f: + self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) + + def test_seek_append_bom(self): + # Same test, but first seek to the start and then to the end + filename = os_helper.TESTFN + for charset in ('utf-8-sig', 'utf-16', 'utf-32'): + with self.open(filename, 'w', encoding=charset) as f: + f.write('aaa') + with self.open(filename, 'a', encoding=charset) as f: + f.seek(0) + f.seek(0, self.SEEK_END) + f.write('xxx') + with self.open(filename, 'rb') as f: + self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) + + def test_errors_property(self): + with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: + self.assertEqual(f.errors, "strict") + with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f: + self.assertEqual(f.errors, "replace") + + @support.no_tracing + @threading_helper.requires_working_threading() + def test_threads_write(self): + # Issue6750: concurrent writes could duplicate data + event = threading.Event() + with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f: + def run(n): + text = "Thread%03d\n" % n + event.wait() + f.write(text) + threads = [threading.Thread(target=run, args=(x,)) + for x in range(20)] + with threading_helper.start_threads(threads, event.set): + time.sleep(0.02) + with self.open(os_helper.TESTFN, encoding="utf-8") as f: + content = f.read() + for n in range(20): + self.assertEqual(content.count("Thread%03d\n" % n), 1) + + def test_flush_error_on_close(self): + # Test that text file is closed despite failed flush + # and that flush() is called before file closed. + txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") + closed = [] + def bad_flush(): + closed[:] = [txt.closed, txt.buffer.closed] + raise OSError() + txt.flush = bad_flush + self.assertRaises(OSError, txt.close) # exception not swallowed + self.assertTrue(txt.closed) + self.assertTrue(txt.buffer.closed) + self.assertTrue(closed) # flush() called + self.assertFalse(closed[0]) # flush() called before file closed + self.assertFalse(closed[1]) + txt.flush = lambda: None # break reference loop + + def test_close_error_on_close(self): + buffer = self.BytesIO(self.testdata) + def bad_flush(): + raise OSError('flush') + def bad_close(): + raise OSError('close') + buffer.close = bad_close + txt = self.TextIOWrapper(buffer, encoding="ascii") + txt.flush = bad_flush + with self.assertRaises(OSError) as err: # exception not swallowed + txt.close() + self.assertEqual(err.exception.args, ('close',)) + self.assertIsInstance(err.exception.__context__, OSError) + self.assertEqual(err.exception.__context__.args, ('flush',)) + self.assertFalse(txt.closed) + + # Silence destructor error + buffer.close = lambda: None + txt.flush = lambda: None + + def test_nonnormalized_close_error_on_close(self): + # Issue #21677 + buffer = self.BytesIO(self.testdata) + def bad_flush(): + raise non_existing_flush + def bad_close(): + raise non_existing_close + buffer.close = bad_close + txt = self.TextIOWrapper(buffer, encoding="ascii") + txt.flush = bad_flush + with self.assertRaises(NameError) as err: # exception not swallowed + txt.close() + self.assertIn('non_existing_close', str(err.exception)) + self.assertIsInstance(err.exception.__context__, NameError) + self.assertIn('non_existing_flush', str(err.exception.__context__)) + self.assertFalse(txt.closed) + + # Silence destructor error + buffer.close = lambda: None + txt.flush = lambda: None + + def test_multi_close(self): + txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") + txt.close() + txt.close() + txt.close() + self.assertRaises(ValueError, txt.flush) + + def test_unseekable(self): + txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8") + self.assertRaises(self.UnsupportedOperation, txt.tell) + self.assertRaises(self.UnsupportedOperation, txt.seek, 0) + + def test_readonly_attributes(self): + txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") + buf = self.BytesIO(self.testdata) + with self.assertRaises(AttributeError): + txt.buffer = buf + + def test_rawio(self): + # Issue #12591: TextIOWrapper must work with raw I/O objects, so + # that subprocess.Popen() can have the required unbuffered + # semantics with universal_newlines=True. + raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) + txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') + # Reads + self.assertEqual(txt.read(4), 'abcd') + self.assertEqual(txt.readline(), 'efghi\n') + self.assertEqual(list(txt), ['jkl\n', 'opq\n']) + + def test_rawio_write_through(self): + # Issue #12591: with write_through=True, writes don't need a flush + raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) + txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n', + write_through=True) + txt.write('1') + txt.write('23\n4') + txt.write('5') + self.assertEqual(b''.join(raw._write_stack), b'123\n45') + + def test_bufio_write_through(self): + # Issue #21396: write_through=True doesn't force a flush() + # on the underlying binary buffered object. + flush_called, write_called = [], [] + class BufferedWriter(self.BufferedWriter): + def flush(self, *args, **kwargs): + flush_called.append(True) + return super().flush(*args, **kwargs) + def write(self, *args, **kwargs): + write_called.append(True) + return super().write(*args, **kwargs) + + rawio = self.BytesIO() + data = b"a" + bufio = BufferedWriter(rawio, len(data)*2) + textio = self.TextIOWrapper(bufio, encoding='ascii', + write_through=True) + # write to the buffered io but don't overflow the buffer + text = data.decode('ascii') + textio.write(text) + + # buffer.flush is not called with write_through=True + self.assertFalse(flush_called) + # buffer.write *is* called with write_through=True + self.assertTrue(write_called) + self.assertEqual(rawio.getvalue(), b"") # no flush + + write_called = [] # reset + textio.write(text * 10) # total content is larger than bufio buffer + self.assertTrue(write_called) + self.assertEqual(rawio.getvalue(), data * 11) # all flushed + + def test_reconfigure_write_through(self): + raw = self.MockRawIO([]) + t = self.TextIOWrapper(raw, encoding='ascii', newline='\n') + t.write('1') + t.reconfigure(write_through=True) # implied flush + self.assertEqual(t.write_through, True) + self.assertEqual(b''.join(raw._write_stack), b'1') + t.write('23') + self.assertEqual(b''.join(raw._write_stack), b'123') + t.reconfigure(write_through=False) + self.assertEqual(t.write_through, False) + t.write('45') + t.flush() + self.assertEqual(b''.join(raw._write_stack), b'12345') + # Keeping default value + t.reconfigure() + t.reconfigure(write_through=None) + self.assertEqual(t.write_through, False) + t.reconfigure(write_through=True) + t.reconfigure() + t.reconfigure(write_through=None) + self.assertEqual(t.write_through, True) + + def test_read_nonbytes(self): + # Issue #17106 + # Crash when underlying read() returns non-bytes + t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") + self.assertRaises(TypeError, t.read, 1) + t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") + self.assertRaises(TypeError, t.readline) + t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") + self.assertRaises(TypeError, t.read) + + def test_illegal_encoder(self): + # Issue 31271: Calling write() while the return value of encoder's + # encode() is invalid shouldn't cause an assertion failure. + rot13 = codecs.lookup("rot13") + with support.swap_attr(rot13, '_is_text_encoding', True): + t = self.TextIOWrapper(self.BytesIO(b'foo'), encoding="rot13") + self.assertRaises(TypeError, t.write, 'bar') + + def test_illegal_decoder(self): + # Issue #17106 + # Bypass the early encoding check added in issue 20404 + def _make_illegal_wrapper(): + quopri = codecs.lookup("quopri") + quopri._is_text_encoding = True + try: + t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), + newline='\n', encoding="quopri") + finally: + quopri._is_text_encoding = False + return t + # Crash when decoder returns non-string + t = _make_illegal_wrapper() + self.assertRaises(TypeError, t.read, 1) + t = _make_illegal_wrapper() + self.assertRaises(TypeError, t.readline) + t = _make_illegal_wrapper() + self.assertRaises(TypeError, t.read) + + # Issue 31243: calling read() while the return value of decoder's + # getstate() is invalid should neither crash the interpreter nor + # raise a SystemError. + def _make_very_illegal_wrapper(getstate_ret_val): + class BadDecoder: + def getstate(self): + return getstate_ret_val + def _get_bad_decoder(dummy): + return BadDecoder() + quopri = codecs.lookup("quopri") + with support.swap_attr(quopri, 'incrementaldecoder', + _get_bad_decoder): + return _make_illegal_wrapper() + t = _make_very_illegal_wrapper(42) + self.assertRaises(TypeError, t.read, 42) + t = _make_very_illegal_wrapper(()) + self.assertRaises(TypeError, t.read, 42) + t = _make_very_illegal_wrapper((1, 2)) + self.assertRaises(TypeError, t.read, 42) + + def _check_create_at_shutdown(self, **kwargs): + # Issue #20037: creating a TextIOWrapper at shutdown + # shouldn't crash the interpreter. + iomod = self.io.__name__ + code = """if 1: + import codecs + import {iomod} as io + + # Avoid looking up codecs at shutdown + codecs.lookup('utf-8') + + class C: + def __del__(self): + io.TextIOWrapper(io.BytesIO(), **{kwargs}) + print("ok") + c = C() + """.format(iomod=iomod, kwargs=kwargs) + return assert_python_ok("-c", code) + + def test_create_at_shutdown_without_encoding(self): + rc, out, err = self._check_create_at_shutdown() + if err: + # Can error out with a RuntimeError if the module state + # isn't found. + self.assertIn(self.shutdown_error, err.decode()) + else: + self.assertEqual("ok", out.decode().strip()) + + def test_create_at_shutdown_with_encoding(self): + rc, out, err = self._check_create_at_shutdown(encoding='utf-8', + errors='strict') + self.assertFalse(err) + self.assertEqual("ok", out.decode().strip()) + + def test_read_byteslike(self): + r = MemviewBytesIO(b'Just some random string\n') + t = self.TextIOWrapper(r, 'utf-8') + + # TextIOwrapper will not read the full string, because + # we truncate it to a multiple of the native int size + # so that we can construct a more complex memoryview. + bytes_val = _to_memoryview(r.getvalue()).tobytes() + + self.assertEqual(t.read(200), bytes_val.decode('utf-8')) + + def test_issue22849(self): + class F(object): + def readable(self): return True + def writable(self): return True + def seekable(self): return True + + for i in range(10): + try: + self.TextIOWrapper(F(), encoding='utf-8') + except Exception: + pass + + F.tell = lambda x: 0 + t = self.TextIOWrapper(F(), encoding='utf-8') + + def test_reconfigure_locale(self): + wrapper = self.TextIOWrapper(self.BytesIO(b"test")) + wrapper.reconfigure(encoding="locale") + + def test_reconfigure_encoding_read(self): + # latin1 -> utf8 + # (latin1 can decode utf-8 encoded string) + data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8') + raw = self.BytesIO(data) + txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') + self.assertEqual(txt.readline(), 'abc\xe9\n') + with self.assertRaises(self.UnsupportedOperation): + txt.reconfigure(encoding='utf-8') + with self.assertRaises(self.UnsupportedOperation): + txt.reconfigure(newline=None) + + def test_reconfigure_write_fromascii(self): + # ascii has a specific encodefunc in the C implementation, + # but utf-8-sig has not. Make sure that we get rid of the + # cached encodefunc when we switch encoders. + raw = self.BytesIO() + txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') + txt.write('foo\n') + txt.reconfigure(encoding='utf-8-sig') + txt.write('\xe9\n') + txt.flush() + self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n') + + def test_reconfigure_write(self): + # latin -> utf8 + raw = self.BytesIO() + txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') + txt.write('abc\xe9\n') + txt.reconfigure(encoding='utf-8') + self.assertEqual(raw.getvalue(), b'abc\xe9\n') + txt.write('d\xe9f\n') + txt.flush() + self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n') + + # ascii -> utf-8-sig: ensure that no BOM is written in the middle of + # the file + raw = self.BytesIO() + txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') + txt.write('abc\n') + txt.reconfigure(encoding='utf-8-sig') + txt.write('d\xe9f\n') + txt.flush() + self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n') + + def test_reconfigure_write_non_seekable(self): + raw = self.BytesIO() + raw.seekable = lambda: False + raw.seek = None + txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') + txt.write('abc\n') + txt.reconfigure(encoding='utf-8-sig') + txt.write('d\xe9f\n') + txt.flush() + + # If the raw stream is not seekable, there'll be a BOM + self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n') + + def test_reconfigure_defaults(self): + txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n') + txt.reconfigure(encoding=None) + self.assertEqual(txt.encoding, 'ascii') + self.assertEqual(txt.errors, 'replace') + txt.write('LF\n') + + txt.reconfigure(newline='\r\n') + self.assertEqual(txt.encoding, 'ascii') + self.assertEqual(txt.errors, 'replace') + + txt.reconfigure(errors='ignore') + self.assertEqual(txt.encoding, 'ascii') + self.assertEqual(txt.errors, 'ignore') + txt.write('CRLF\n') + + txt.reconfigure(encoding='utf-8', newline=None) + self.assertEqual(txt.errors, 'strict') + txt.seek(0) + self.assertEqual(txt.read(), 'LF\nCRLF\n') + + self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n') + + def test_reconfigure_errors(self): + txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\r') + with self.assertRaises(TypeError): # there was a crash + txt.reconfigure(encoding=42) + if self.is_C: + with self.assertRaises(UnicodeEncodeError): + txt.reconfigure(encoding='\udcfe') + with self.assertRaises(LookupError): + txt.reconfigure(encoding='locale\0') + # TODO: txt.reconfigure(encoding='utf-8\0') + # TODO: txt.reconfigure(encoding='nonexisting') + with self.assertRaises(TypeError): + txt.reconfigure(errors=42) + if self.is_C: + with self.assertRaises(UnicodeEncodeError): + txt.reconfigure(errors='\udcfe') + # TODO: txt.reconfigure(errors='ignore\0') + # TODO: txt.reconfigure(errors='nonexisting') + with self.assertRaises(TypeError): + txt.reconfigure(newline=42) + with self.assertRaises(ValueError): + txt.reconfigure(newline='\udcfe') + with self.assertRaises(ValueError): + txt.reconfigure(newline='xyz') + if not self.is_C: + # TODO: Should fail in C too. + with self.assertRaises(ValueError): + txt.reconfigure(newline='\n\0') + if self.is_C: + # TODO: Use __bool__(), not __index__(). + with self.assertRaises(ZeroDivisionError): + txt.reconfigure(line_buffering=BadIndex()) + with self.assertRaises(OverflowError): + txt.reconfigure(line_buffering=2**1000) + with self.assertRaises(ZeroDivisionError): + txt.reconfigure(write_through=BadIndex()) + with self.assertRaises(OverflowError): + txt.reconfigure(write_through=2**1000) + with self.assertRaises(ZeroDivisionError): # there was a crash + txt.reconfigure(line_buffering=BadIndex(), + write_through=BadIndex()) + self.assertEqual(txt.encoding, 'ascii') + self.assertEqual(txt.errors, 'replace') + self.assertIs(txt.line_buffering, False) + self.assertIs(txt.write_through, False) + + txt.reconfigure(encoding='latin1', errors='ignore', newline='\r\n', + line_buffering=True, write_through=True) + self.assertEqual(txt.encoding, 'latin1') + self.assertEqual(txt.errors, 'ignore') + self.assertIs(txt.line_buffering, True) + self.assertIs(txt.write_through, True) + + def test_reconfigure_newline(self): + raw = self.BytesIO(b'CR\rEOF') + txt = self.TextIOWrapper(raw, 'ascii', newline='\n') + txt.reconfigure(newline=None) + self.assertEqual(txt.readline(), 'CR\n') + raw = self.BytesIO(b'CR\rEOF') + txt = self.TextIOWrapper(raw, 'ascii', newline='\n') + txt.reconfigure(newline='') + self.assertEqual(txt.readline(), 'CR\r') + raw = self.BytesIO(b'CR\rLF\nEOF') + txt = self.TextIOWrapper(raw, 'ascii', newline='\r') + txt.reconfigure(newline='\n') + self.assertEqual(txt.readline(), 'CR\rLF\n') + raw = self.BytesIO(b'LF\nCR\rEOF') + txt = self.TextIOWrapper(raw, 'ascii', newline='\n') + txt.reconfigure(newline='\r') + self.assertEqual(txt.readline(), 'LF\nCR\r') + raw = self.BytesIO(b'CR\rCRLF\r\nEOF') + txt = self.TextIOWrapper(raw, 'ascii', newline='\r') + txt.reconfigure(newline='\r\n') + self.assertEqual(txt.readline(), 'CR\rCRLF\r\n') + + txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r') + txt.reconfigure(newline=None) + txt.write('linesep\n') + txt.reconfigure(newline='') + txt.write('LF\n') + txt.reconfigure(newline='\n') + txt.write('LF\n') + txt.reconfigure(newline='\r') + txt.write('CR\n') + txt.reconfigure(newline='\r\n') + txt.write('CRLF\n') + expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n' + self.assertEqual(txt.detach().getvalue().decode('ascii'), expected) + + def test_issue25862(self): + # Assertion failures occurred in tell() after read() and write(). + t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') + t.read(1) + t.read() + t.tell() + t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') + t.read(1) + t.write('x') + t.tell() + + def test_issue35928(self): + p = self.BufferedRWPair(self.BytesIO(b'foo\nbar\n'), self.BytesIO()) + f = self.TextIOWrapper(p) + res = f.readline() + self.assertEqual(res, 'foo\n') + f.write(res) + self.assertEqual(res + f.readline(), 'foo\nbar\n') + + def test_pickling_subclass(self): + global MyTextIO + class MyTextIO(self.TextIOWrapper): + def __init__(self, raw, tag): + super().__init__(raw) + self.tag = tag + def __getstate__(self): + return self.tag, self.buffer.getvalue() + def __setstate__(slf, state): + tag, value = state + slf.__init__(self.BytesIO(value), tag) + + raw = self.BytesIO(b'data') + txt = MyTextIO(raw, 'ham') + for proto in range(pickle.HIGHEST_PROTOCOL + 1): + with self.subTest(protocol=proto): + pickled = pickle.dumps(txt, proto) + newtxt = pickle.loads(pickled) + self.assertEqual(newtxt.buffer.getvalue(), b'data') + self.assertEqual(newtxt.tag, 'ham') + del MyTextIO + + @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") + def test_read_non_blocking(self): + import os + r, w = os.pipe() + try: + os.set_blocking(r, False) + with self.io.open(r, 'rt') as textfile: + r = None + # Nothing has been written so a non-blocking read raises a BlockingIOError exception. + with self.assertRaises(BlockingIOError): + textfile.read() + finally: + if r is not None: + os.close(r) + os.close(w) + + +class MemviewBytesIO(io.BytesIO): + '''A BytesIO object whose read method returns memoryviews + rather than bytes''' + + def read1(self, len_): + return _to_memoryview(super().read1(len_)) + + def read(self, len_): + return _to_memoryview(super().read(len_)) + +def _to_memoryview(buf): + '''Convert bytes-object *buf* to a non-trivial memoryview''' + + arr = array.array('i') + idx = len(buf) - len(buf) % arr.itemsize + arr.frombytes(buf[:idx]) + return memoryview(arr) + + +class CTextIOWrapperTest(TextIOWrapperTest, CTestCase): + shutdown_error = "LookupError: unknown encoding: ascii" + + def test_initialization(self): + r = self.BytesIO(b"\xc3\xa9\n\n") + b = self.BufferedReader(r, 1000) + t = self.TextIOWrapper(b, encoding="utf-8") + self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy') + self.assertRaises(ValueError, t.read) + + t = self.TextIOWrapper.__new__(self.TextIOWrapper) + self.assertRaises(Exception, repr, t) + + def test_garbage_collection(self): + # C TextIOWrapper objects are collected, and collecting them flushes + # all data to disk. + # The Python version has __del__, so it ends in gc.garbage instead. + with warnings.catch_warnings(): + warnings.simplefilter("ignore", ResourceWarning) + rawio = self.FileIO(os_helper.TESTFN, "wb") + b = self.BufferedWriter(rawio) + t = self.TextIOWrapper(b, encoding="ascii") + t.write("456def") + t.x = t + wr = weakref.ref(t) + del t + support.gc_collect() + self.assertIsNone(wr(), wr) + with self.open(os_helper.TESTFN, "rb") as f: + self.assertEqual(f.read(), b"456def") + + def test_rwpair_cleared_before_textio(self): + # Issue 13070: TextIOWrapper's finalization would crash when called + # after the reference to the underlying BufferedRWPair's writer got + # cleared by the GC. + for i in range(1000): + b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) + t1 = self.TextIOWrapper(b1, encoding="ascii") + b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) + t2 = self.TextIOWrapper(b2, encoding="ascii") + # circular references + t1.buddy = t2 + t2.buddy = t1 + support.gc_collect() + + def test_del__CHUNK_SIZE_SystemError(self): + t = self.TextIOWrapper(self.BytesIO(), encoding='ascii') + with self.assertRaises(AttributeError): + del t._CHUNK_SIZE + + def test_internal_buffer_size(self): + # bpo-43260: TextIOWrapper's internal buffer should not store + # data larger than chunk size. + chunk_size = 8192 # default chunk size, updated later + + class MockIO(self.MockRawIO): + def write(self, data): + if len(data) > chunk_size: + raise RuntimeError + return super().write(data) + + buf = MockIO() + t = self.TextIOWrapper(buf, encoding="ascii") + chunk_size = t._CHUNK_SIZE + t.write("abc") + t.write("def") + # default chunk size is 8192 bytes so t don't write data to buf. + self.assertEqual([], buf._write_stack) + + with self.assertRaises(RuntimeError): + t.write("x"*(chunk_size+1)) + + self.assertEqual([b"abcdef"], buf._write_stack) + t.write("ghi") + t.write("x"*chunk_size) + self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack) + + def test_issue119506(self): + chunk_size = 8192 + + class MockIO(self.MockRawIO): + written = False + def write(self, data): + if not self.written: + self.written = True + t.write("middle") + return super().write(data) + + buf = MockIO() + t = self.TextIOWrapper(buf) + t.write("abc") + t.write("def") + # writing data which size >= chunk_size cause flushing buffer before write. + t.write("g" * chunk_size) + t.flush() + + self.assertEqual([b"abcdef", b"middle", b"g"*chunk_size], + buf._write_stack) + + +class PyTextIOWrapperTest(TextIOWrapperTest, PyTestCase): + shutdown_error = "LookupError: unknown encoding: ascii" + + +class IncrementalNewlineDecoderTest: + + def check_newline_decoding_utf8(self, decoder): + # UTF-8 specific tests for a newline decoder + def _check_decode(b, s, **kwargs): + # We exercise getstate() / setstate() as well as decode() + state = decoder.getstate() + self.assertEqual(decoder.decode(b, **kwargs), s) + decoder.setstate(state) + self.assertEqual(decoder.decode(b, **kwargs), s) + + _check_decode(b'\xe8\xa2\x88', "\u8888") + + _check_decode(b'\xe8', "") + _check_decode(b'\xa2', "") + _check_decode(b'\x88', "\u8888") + + _check_decode(b'\xe8', "") + _check_decode(b'\xa2', "") + _check_decode(b'\x88', "\u8888") + + _check_decode(b'\xe8', "") + self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True) + + decoder.reset() + _check_decode(b'\n', "\n") + _check_decode(b'\r', "") + _check_decode(b'', "\n", final=True) + _check_decode(b'\r', "\n", final=True) + + _check_decode(b'\r', "") + _check_decode(b'a', "\na") + + _check_decode(b'\r\r\n', "\n\n") + _check_decode(b'\r', "") + _check_decode(b'\r', "\n") + _check_decode(b'\na', "\na") + + _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n") + _check_decode(b'\xe8\xa2\x88', "\u8888") + _check_decode(b'\n', "\n") + _check_decode(b'\xe8\xa2\x88\r', "\u8888") + _check_decode(b'\n', "\n") + + def check_newline_decoding(self, decoder, encoding): + result = [] + if encoding is not None: + encoder = codecs.getincrementalencoder(encoding)() + def _decode_bytewise(s): + # Decode one byte at a time + for b in encoder.encode(s): + result.append(decoder.decode(bytes([b]))) + else: + encoder = None + def _decode_bytewise(s): + # Decode one char at a time + for c in s: + result.append(decoder.decode(c)) + self.assertEqual(decoder.newlines, None) + _decode_bytewise("abc\n\r") + self.assertEqual(decoder.newlines, '\n') + _decode_bytewise("\nabc") + self.assertEqual(decoder.newlines, ('\n', '\r\n')) + _decode_bytewise("abc\r") + self.assertEqual(decoder.newlines, ('\n', '\r\n')) + _decode_bytewise("abc") + self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) + _decode_bytewise("abc\r") + self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") + decoder.reset() + input = "abc" + if encoder is not None: + encoder.reset() + input = encoder.encode(input) + self.assertEqual(decoder.decode(input), "abc") + self.assertEqual(decoder.newlines, None) + + def test_newline_decoder(self): + encodings = ( + # None meaning the IncrementalNewlineDecoder takes unicode input + # rather than bytes input + None, 'utf-8', 'latin-1', + 'utf-16', 'utf-16-le', 'utf-16-be', + 'utf-32', 'utf-32-le', 'utf-32-be', + ) + for enc in encodings: + decoder = enc and codecs.getincrementaldecoder(enc)() + decoder = self.IncrementalNewlineDecoder(decoder, translate=True) + self.check_newline_decoding(decoder, enc) + decoder = codecs.getincrementaldecoder("utf-8")() + decoder = self.IncrementalNewlineDecoder(decoder, translate=True) + self.check_newline_decoding_utf8(decoder) + self.assertRaises(TypeError, decoder.setstate, 42) + + def test_newline_bytes(self): + # Issue 5433: Excessive optimization in IncrementalNewlineDecoder + def _check(dec): + self.assertEqual(dec.newlines, None) + self.assertEqual(dec.decode("\u0D00"), "\u0D00") + self.assertEqual(dec.newlines, None) + self.assertEqual(dec.decode("\u0A00"), "\u0A00") + self.assertEqual(dec.newlines, None) + dec = self.IncrementalNewlineDecoder(None, translate=False) + _check(dec) + dec = self.IncrementalNewlineDecoder(None, translate=True) + _check(dec) + + def test_translate(self): + # issue 35062 + for translate in (-2, -1, 1, 2): + decoder = codecs.getincrementaldecoder("utf-8")() + decoder = self.IncrementalNewlineDecoder(decoder, translate) + self.check_newline_decoding_utf8(decoder) + decoder = codecs.getincrementaldecoder("utf-8")() + decoder = self.IncrementalNewlineDecoder(decoder, translate=0) + self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n") + +class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest, unittest.TestCase): + IncrementalNewlineDecoder = io.IncrementalNewlineDecoder + + @support.cpython_only + def test_uninitialized(self): + uninitialized = self.IncrementalNewlineDecoder.__new__( + self.IncrementalNewlineDecoder) + self.assertRaises(ValueError, uninitialized.decode, b'bar') + self.assertRaises(ValueError, uninitialized.getstate) + self.assertRaises(ValueError, uninitialized.setstate, (b'foo', 0)) + self.assertRaises(ValueError, uninitialized.reset) + + +class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest, unittest.TestCase): + IncrementalNewlineDecoder = pyio.IncrementalNewlineDecoder