| 
 | 1 | +import io  | 
 | 2 | +import os  | 
 | 3 | +import unittest  | 
 | 4 | +import warnings  | 
 | 5 | +from test import support  | 
 | 6 | +from test.support import import_helper, os_helper, warnings_helper  | 
 | 7 | + | 
 | 8 | + | 
 | 9 | +_testcapi = import_helper.import_module('_testcapi')  | 
 | 10 | +_testlimitedcapi = import_helper.import_module('_testlimitedcapi')  | 
 | 11 | +_io = import_helper.import_module('_io')  | 
 | 12 | +NULL = None  | 
 | 13 | +STDOUT_FD = 1  | 
 | 14 | + | 
 | 15 | +with open(__file__, 'rb') as fp:  | 
 | 16 | +    FIRST_LINE = next(fp).decode()  | 
 | 17 | +FIRST_LINE_NORM = FIRST_LINE.rstrip() + '\n'  | 
 | 18 | + | 
 | 19 | + | 
 | 20 | +class CAPIFileTest(unittest.TestCase):  | 
 | 21 | +    def test_pyfile_fromfd(self):  | 
 | 22 | +        # Test PyFile_FromFd() which is a thin wrapper to _io.open()  | 
 | 23 | +        pyfile_fromfd = _testlimitedcapi.pyfile_fromfd  | 
 | 24 | +        filename = __file__  | 
 | 25 | +        with open(filename, "rb") as fp:  | 
 | 26 | +            fd = fp.fileno()  | 
 | 27 | + | 
 | 28 | +            # FileIO  | 
 | 29 | +            fp.seek(0)  | 
 | 30 | +            obj = pyfile_fromfd(fd, filename, "rb", 0, NULL, NULL, NULL, 0)  | 
 | 31 | +            try:  | 
 | 32 | +                self.assertIsInstance(obj, _io.FileIO)  | 
 | 33 | +                self.assertEqual(obj.readline(), FIRST_LINE.encode())  | 
 | 34 | +            finally:  | 
 | 35 | +                obj.close()  | 
 | 36 | + | 
 | 37 | +            # BufferedReader  | 
 | 38 | +            fp.seek(0)  | 
 | 39 | +            obj = pyfile_fromfd(fd, filename, "rb", 1024, NULL, NULL, NULL, 0)  | 
 | 40 | +            try:  | 
 | 41 | +                self.assertIsInstance(obj, _io.BufferedReader)  | 
 | 42 | +                self.assertEqual(obj.readline(), FIRST_LINE.encode())  | 
 | 43 | +            finally:  | 
 | 44 | +                obj.close()  | 
 | 45 | + | 
 | 46 | +            # TextIOWrapper  | 
 | 47 | +            fp.seek(0)  | 
 | 48 | +            obj = pyfile_fromfd(fd, filename, "r", 1,  | 
 | 49 | +                                "utf-8", "replace", NULL, 0)  | 
 | 50 | +            try:  | 
 | 51 | +                self.assertIsInstance(obj, _io.TextIOWrapper)  | 
 | 52 | +                self.assertEqual(obj.encoding, "utf-8")  | 
 | 53 | +                self.assertEqual(obj.errors, "replace")  | 
 | 54 | +                self.assertEqual(obj.readline(), FIRST_LINE_NORM)  | 
 | 55 | +            finally:  | 
 | 56 | +                obj.close()  | 
 | 57 | + | 
 | 58 | +    def test_pyfile_getline(self):  | 
 | 59 | +        # Test PyFile_GetLine(file, n): call file.readline()  | 
 | 60 | +        # and strip "\n" suffix if n < 0.  | 
 | 61 | +        pyfile_getline = _testlimitedcapi.pyfile_getline  | 
 | 62 | + | 
 | 63 | +        # Test Unicode  | 
 | 64 | +        with open(__file__, "r") as fp:  | 
 | 65 | +            fp.seek(0)  | 
 | 66 | +            self.assertEqual(pyfile_getline(fp, -1),  | 
 | 67 | +                             FIRST_LINE_NORM.rstrip('\n'))  | 
 | 68 | +            fp.seek(0)  | 
 | 69 | +            self.assertEqual(pyfile_getline(fp, 0),  | 
 | 70 | +                             FIRST_LINE_NORM)  | 
 | 71 | +            fp.seek(0)  | 
 | 72 | +            self.assertEqual(pyfile_getline(fp, 6),  | 
 | 73 | +                             FIRST_LINE_NORM[:6])  | 
 | 74 | + | 
 | 75 | +        # Test bytes  | 
 | 76 | +        with open(__file__, "rb") as fp:  | 
 | 77 | +            fp.seek(0)  | 
 | 78 | +            self.assertEqual(pyfile_getline(fp, -1),  | 
 | 79 | +                             FIRST_LINE.rstrip('\n').encode())  | 
 | 80 | +            fp.seek(0)  | 
 | 81 | +            self.assertEqual(pyfile_getline(fp, 0),  | 
 | 82 | +                             FIRST_LINE.encode())  | 
 | 83 | +            fp.seek(0)  | 
 | 84 | +            self.assertEqual(pyfile_getline(fp, 6),  | 
 | 85 | +                             FIRST_LINE.encode()[:6])  | 
 | 86 | + | 
 | 87 | +    def test_pyfile_writestring(self):  | 
 | 88 | +        # Test PyFile_WriteString(str, file): call file.write(str)  | 
 | 89 | +        writestr = _testlimitedcapi.pyfile_writestring  | 
 | 90 | + | 
 | 91 | +        with io.StringIO() as fp:  | 
 | 92 | +            self.assertEqual(writestr("a\xe9\u20ac\U0010FFFF".encode(), fp), 0)  | 
 | 93 | +            with self.assertRaises(UnicodeDecodeError):  | 
 | 94 | +                writestr(b"\xff", fp)  | 
 | 95 | +            with self.assertRaises(UnicodeDecodeError):  | 
 | 96 | +                writestr("\udc80".encode("utf-8", "surrogatepass"), fp)  | 
 | 97 | + | 
 | 98 | +            text = fp.getvalue()  | 
 | 99 | +            self.assertEqual(text, "a\xe9\u20ac\U0010FFFF")  | 
 | 100 | + | 
 | 101 | +        with self.assertRaises(SystemError):  | 
 | 102 | +            writestr(b"abc", NULL)  | 
 | 103 | + | 
 | 104 | +    def test_pyfile_writeobject(self):  | 
 | 105 | +        # Test PyFile_WriteObject(obj, file, flags):  | 
 | 106 | +        # - Call file.write(str(obj)) if flags equals Py_PRINT_RAW.  | 
 | 107 | +        # - Call file.write(repr(obj)) otherwise.  | 
 | 108 | +        writeobject = _testlimitedcapi.pyfile_writeobject  | 
 | 109 | +        Py_PRINT_RAW = 1  | 
 | 110 | + | 
 | 111 | +        with io.StringIO() as fp:  | 
 | 112 | +            # Test flags=Py_PRINT_RAW  | 
 | 113 | +            self.assertEqual(writeobject("raw", fp, Py_PRINT_RAW), 0)  | 
 | 114 | +            writeobject(NULL, fp, Py_PRINT_RAW)  | 
 | 115 | + | 
 | 116 | +            # Test flags=0  | 
 | 117 | +            self.assertEqual(writeobject("repr", fp, 0), 0)  | 
 | 118 | +            writeobject(NULL, fp, 0)  | 
 | 119 | + | 
 | 120 | +            text = fp.getvalue()  | 
 | 121 | +            self.assertEqual(text, "raw<NULL>'repr'<NULL>")  | 
 | 122 | + | 
 | 123 | +        # invalid file type  | 
 | 124 | +        for invalid_file in (123, "abc", object()):  | 
 | 125 | +            with self.subTest(file=invalid_file):  | 
 | 126 | +                with self.assertRaises(AttributeError):  | 
 | 127 | +                    writeobject("abc", invalid_file, Py_PRINT_RAW)  | 
 | 128 | + | 
 | 129 | +        with self.assertRaises(TypeError):  | 
 | 130 | +            writeobject("abc", NULL, 0)  | 
 | 131 | + | 
 | 132 | +    def test_pyobject_asfiledescriptor(self):  | 
 | 133 | +        # Test PyObject_AsFileDescriptor(obj):  | 
 | 134 | +        # - Return obj if obj is an integer.  | 
 | 135 | +        # - Return obj.fileno() otherwise.  | 
 | 136 | +        # File descriptor must be >= 0.  | 
 | 137 | +        asfd = _testlimitedcapi.pyobject_asfiledescriptor  | 
 | 138 | + | 
 | 139 | +        self.assertEqual(asfd(123), 123)  | 
 | 140 | +        self.assertEqual(asfd(0), 0)  | 
 | 141 | + | 
 | 142 | +        with open(__file__, "rb") as fp:  | 
 | 143 | +            self.assertEqual(asfd(fp), fp.fileno())  | 
 | 144 | + | 
 | 145 | +        # bool emits RuntimeWarning  | 
 | 146 | +        msg = r"bool is used as a file descriptor"  | 
 | 147 | +        with warnings_helper.check_warnings((msg, RuntimeWarning)):  | 
 | 148 | +            self.assertEqual(asfd(True), 1)  | 
 | 149 | + | 
 | 150 | +        class FakeFile:  | 
 | 151 | +            def __init__(self, fd):  | 
 | 152 | +                self.fd = fd  | 
 | 153 | +            def fileno(self):  | 
 | 154 | +                return self.fd  | 
 | 155 | + | 
 | 156 | +        # file descriptor must be positive  | 
 | 157 | +        with self.assertRaises(ValueError):  | 
 | 158 | +            asfd(-1)  | 
 | 159 | +        with self.assertRaises(ValueError):  | 
 | 160 | +            asfd(FakeFile(-1))  | 
 | 161 | + | 
 | 162 | +        # fileno() result must be an integer  | 
 | 163 | +        with self.assertRaises(TypeError):  | 
 | 164 | +            asfd(FakeFile("text"))  | 
 | 165 | + | 
 | 166 | +        # unsupported types  | 
 | 167 | +        for obj in ("string", ["list"], object()):  | 
 | 168 | +            with self.subTest(obj=obj):  | 
 | 169 | +                with self.assertRaises(TypeError):  | 
 | 170 | +                    asfd(obj)  | 
 | 171 | + | 
 | 172 | +        # CRASHES asfd(NULL)  | 
 | 173 | + | 
 | 174 | +    def test_pyfile_newstdprinter(self):  | 
 | 175 | +        # Test PyFile_NewStdPrinter()  | 
 | 176 | +        pyfile_newstdprinter = _testcapi.pyfile_newstdprinter  | 
 | 177 | + | 
 | 178 | +        file = pyfile_newstdprinter(STDOUT_FD)  | 
 | 179 | +        self.assertEqual(file.closed, False)  | 
 | 180 | +        self.assertIsNone(file.encoding)  | 
 | 181 | +        self.assertEqual(file.mode, "w")  | 
 | 182 | + | 
 | 183 | +        self.assertEqual(file.fileno(), STDOUT_FD)  | 
 | 184 | +        self.assertEqual(file.isatty(), os.isatty(STDOUT_FD))  | 
 | 185 | + | 
 | 186 | +        # flush() is a no-op  | 
 | 187 | +        self.assertIsNone(file.flush())  | 
 | 188 | + | 
 | 189 | +        # close() is a no-op  | 
 | 190 | +        self.assertIsNone(file.close())  | 
 | 191 | +        self.assertEqual(file.closed, False)  | 
 | 192 | + | 
 | 193 | +        support.check_disallow_instantiation(self, type(file))  | 
 | 194 | + | 
 | 195 | +    def test_pyfile_newstdprinter_write(self):  | 
 | 196 | +        # Test the write() method of PyFile_NewStdPrinter()  | 
 | 197 | +        pyfile_newstdprinter = _testcapi.pyfile_newstdprinter  | 
 | 198 | + | 
 | 199 | +        filename = os_helper.TESTFN  | 
 | 200 | +        self.addCleanup(os_helper.unlink, filename)  | 
 | 201 | + | 
 | 202 | +        try:  | 
 | 203 | +            old_stdout = os.dup(STDOUT_FD)  | 
 | 204 | +        except OSError as exc:  | 
 | 205 | +            # os.dup(STDOUT_FD) is not supported on WASI  | 
 | 206 | +            self.skipTest(f"os.dup() failed with {exc!r}")  | 
 | 207 | + | 
 | 208 | +        try:  | 
 | 209 | +            with open(filename, "wb") as fp:  | 
 | 210 | +                # PyFile_NewStdPrinter() only accepts fileno(stdout)  | 
 | 211 | +                # or fileno(stderr) file descriptor.  | 
 | 212 | +                fd = fp.fileno()  | 
 | 213 | +                os.dup2(fd, STDOUT_FD)  | 
 | 214 | + | 
 | 215 | +                file = pyfile_newstdprinter(STDOUT_FD)  | 
 | 216 | +                self.assertEqual(file.write("text"), 4)  | 
 | 217 | +                # The surrogate character is encoded with  | 
 | 218 | +                # the "surrogateescape" error handler  | 
 | 219 | +                self.assertEqual(file.write("[\udc80]"), 8)  | 
 | 220 | +        finally:  | 
 | 221 | +            os.dup2(old_stdout, STDOUT_FD)  | 
 | 222 | +            os.close(old_stdout)  | 
 | 223 | + | 
 | 224 | +        with open(filename, "r") as fp:  | 
 | 225 | +            self.assertEqual(fp.read(), "text[\\udc80]")  | 
 | 226 | + | 
 | 227 | +    # TODO: Test Py_UniversalNewlineFgets()  | 
 | 228 | + | 
 | 229 | +    # PyFile_SetOpenCodeHook() and PyFile_OpenCode() are tested by  | 
 | 230 | +    # test_embed.test_open_code_hook()  | 
 | 231 | + | 
 | 232 | + | 
 | 233 | +if __name__ == "__main__":  | 
 | 234 | +    unittest.main()  | 
0 commit comments