|  | 
|  | 1 | +import io | 
|  | 2 | +import os | 
|  | 3 | +import unittest | 
|  | 4 | +import warnings | 
|  | 5 | +from test import support | 
|  | 6 | +from test.support import import_helper, os_helper, warnings_helper | 
|  | 7 | + | 
|  | 8 | + | 
|  | 9 | +_testcapi = import_helper.import_module('_testcapi') | 
|  | 10 | +_testlimitedcapi = import_helper.import_module('_testlimitedcapi') | 
|  | 11 | +_io = import_helper.import_module('_io') | 
|  | 12 | +NULL = None | 
|  | 13 | +STDOUT_FD = 1 | 
|  | 14 | + | 
|  | 15 | +with open(__file__, 'rb') as fp: | 
|  | 16 | +    FIRST_LINE = next(fp).decode() | 
|  | 17 | +FIRST_LINE_NORM = FIRST_LINE.rstrip() + '\n' | 
|  | 18 | + | 
|  | 19 | + | 
|  | 20 | +class CAPIFileTest(unittest.TestCase): | 
|  | 21 | +    def test_pyfile_fromfd(self): | 
|  | 22 | +        # Test PyFile_FromFd() which is a thin wrapper to _io.open() | 
|  | 23 | +        pyfile_fromfd = _testlimitedcapi.pyfile_fromfd | 
|  | 24 | +        filename = __file__ | 
|  | 25 | +        with open(filename, "rb") as fp: | 
|  | 26 | +            fd = fp.fileno() | 
|  | 27 | + | 
|  | 28 | +            # FileIO | 
|  | 29 | +            fp.seek(0) | 
|  | 30 | +            obj = pyfile_fromfd(fd, filename, "rb", 0, NULL, NULL, NULL, 0) | 
|  | 31 | +            try: | 
|  | 32 | +                self.assertIsInstance(obj, _io.FileIO) | 
|  | 33 | +                self.assertEqual(obj.readline(), FIRST_LINE.encode()) | 
|  | 34 | +            finally: | 
|  | 35 | +                obj.close() | 
|  | 36 | + | 
|  | 37 | +            # BufferedReader | 
|  | 38 | +            fp.seek(0) | 
|  | 39 | +            obj = pyfile_fromfd(fd, filename, "rb", 1024, NULL, NULL, NULL, 0) | 
|  | 40 | +            try: | 
|  | 41 | +                self.assertIsInstance(obj, _io.BufferedReader) | 
|  | 42 | +                self.assertEqual(obj.readline(), FIRST_LINE.encode()) | 
|  | 43 | +            finally: | 
|  | 44 | +                obj.close() | 
|  | 45 | + | 
|  | 46 | +            # TextIOWrapper | 
|  | 47 | +            fp.seek(0) | 
|  | 48 | +            obj = pyfile_fromfd(fd, filename, "r", 1, | 
|  | 49 | +                                "utf-8", "replace", NULL, 0) | 
|  | 50 | +            try: | 
|  | 51 | +                self.assertIsInstance(obj, _io.TextIOWrapper) | 
|  | 52 | +                self.assertEqual(obj.encoding, "utf-8") | 
|  | 53 | +                self.assertEqual(obj.errors, "replace") | 
|  | 54 | +                self.assertEqual(obj.readline(), FIRST_LINE_NORM) | 
|  | 55 | +            finally: | 
|  | 56 | +                obj.close() | 
|  | 57 | + | 
|  | 58 | +    def test_pyfile_getline(self): | 
|  | 59 | +        # Test PyFile_GetLine(file, n): call file.readline() | 
|  | 60 | +        # and strip "\n" suffix if n < 0. | 
|  | 61 | +        pyfile_getline = _testlimitedcapi.pyfile_getline | 
|  | 62 | + | 
|  | 63 | +        # Test Unicode | 
|  | 64 | +        with open(__file__, "r") as fp: | 
|  | 65 | +            fp.seek(0) | 
|  | 66 | +            self.assertEqual(pyfile_getline(fp, -1), | 
|  | 67 | +                             FIRST_LINE_NORM.rstrip('\n')) | 
|  | 68 | +            fp.seek(0) | 
|  | 69 | +            self.assertEqual(pyfile_getline(fp, 0), | 
|  | 70 | +                             FIRST_LINE_NORM) | 
|  | 71 | +            fp.seek(0) | 
|  | 72 | +            self.assertEqual(pyfile_getline(fp, 6), | 
|  | 73 | +                             FIRST_LINE_NORM[:6]) | 
|  | 74 | + | 
|  | 75 | +        # Test bytes | 
|  | 76 | +        with open(__file__, "rb") as fp: | 
|  | 77 | +            fp.seek(0) | 
|  | 78 | +            self.assertEqual(pyfile_getline(fp, -1), | 
|  | 79 | +                             FIRST_LINE.rstrip('\n').encode()) | 
|  | 80 | +            fp.seek(0) | 
|  | 81 | +            self.assertEqual(pyfile_getline(fp, 0), | 
|  | 82 | +                             FIRST_LINE.encode()) | 
|  | 83 | +            fp.seek(0) | 
|  | 84 | +            self.assertEqual(pyfile_getline(fp, 6), | 
|  | 85 | +                             FIRST_LINE.encode()[:6]) | 
|  | 86 | + | 
|  | 87 | +    def test_pyfile_writestring(self): | 
|  | 88 | +        # Test PyFile_WriteString(str, file): call file.write(str) | 
|  | 89 | +        writestr = _testlimitedcapi.pyfile_writestring | 
|  | 90 | + | 
|  | 91 | +        with io.StringIO() as fp: | 
|  | 92 | +            self.assertEqual(writestr("a\xe9\u20ac\U0010FFFF".encode(), fp), 0) | 
|  | 93 | +            with self.assertRaises(UnicodeDecodeError): | 
|  | 94 | +                writestr(b"\xff", fp) | 
|  | 95 | +            with self.assertRaises(UnicodeDecodeError): | 
|  | 96 | +                writestr("\udc80".encode("utf-8", "surrogatepass"), fp) | 
|  | 97 | + | 
|  | 98 | +            text = fp.getvalue() | 
|  | 99 | +            self.assertEqual(text, "a\xe9\u20ac\U0010FFFF") | 
|  | 100 | + | 
|  | 101 | +        with self.assertRaises(SystemError): | 
|  | 102 | +            writestr(b"abc", NULL) | 
|  | 103 | + | 
|  | 104 | +    def test_pyfile_writeobject(self): | 
|  | 105 | +        # Test PyFile_WriteObject(obj, file, flags): | 
|  | 106 | +        # - Call file.write(str(obj)) if flags equals Py_PRINT_RAW. | 
|  | 107 | +        # - Call file.write(repr(obj)) otherwise. | 
|  | 108 | +        writeobject = _testlimitedcapi.pyfile_writeobject | 
|  | 109 | +        Py_PRINT_RAW = 1 | 
|  | 110 | + | 
|  | 111 | +        with io.StringIO() as fp: | 
|  | 112 | +            # Test flags=Py_PRINT_RAW | 
|  | 113 | +            self.assertEqual(writeobject("raw", fp, Py_PRINT_RAW), 0) | 
|  | 114 | +            writeobject(NULL, fp, Py_PRINT_RAW) | 
|  | 115 | + | 
|  | 116 | +            # Test flags=0 | 
|  | 117 | +            self.assertEqual(writeobject("repr", fp, 0), 0) | 
|  | 118 | +            writeobject(NULL, fp, 0) | 
|  | 119 | + | 
|  | 120 | +            text = fp.getvalue() | 
|  | 121 | +            self.assertEqual(text, "raw<NULL>'repr'<NULL>") | 
|  | 122 | + | 
|  | 123 | +        # invalid file type | 
|  | 124 | +        for invalid_file in (123, "abc", object()): | 
|  | 125 | +            with self.subTest(file=invalid_file): | 
|  | 126 | +                with self.assertRaises(AttributeError): | 
|  | 127 | +                    writeobject("abc", invalid_file, Py_PRINT_RAW) | 
|  | 128 | + | 
|  | 129 | +        with self.assertRaises(TypeError): | 
|  | 130 | +            writeobject("abc", NULL, 0) | 
|  | 131 | + | 
|  | 132 | +    def test_pyobject_asfiledescriptor(self): | 
|  | 133 | +        # Test PyObject_AsFileDescriptor(obj): | 
|  | 134 | +        # - Return obj if obj is an integer. | 
|  | 135 | +        # - Return obj.fileno() otherwise. | 
|  | 136 | +        # File descriptor must be >= 0. | 
|  | 137 | +        asfd = _testlimitedcapi.pyobject_asfiledescriptor | 
|  | 138 | + | 
|  | 139 | +        self.assertEqual(asfd(123), 123) | 
|  | 140 | +        self.assertEqual(asfd(0), 0) | 
|  | 141 | + | 
|  | 142 | +        with open(__file__, "rb") as fp: | 
|  | 143 | +            self.assertEqual(asfd(fp), fp.fileno()) | 
|  | 144 | + | 
|  | 145 | +        # bool emits RuntimeWarning | 
|  | 146 | +        msg = r"bool is used as a file descriptor" | 
|  | 147 | +        with warnings_helper.check_warnings((msg, RuntimeWarning)): | 
|  | 148 | +            self.assertEqual(asfd(True), 1) | 
|  | 149 | + | 
|  | 150 | +        class FakeFile: | 
|  | 151 | +            def __init__(self, fd): | 
|  | 152 | +                self.fd = fd | 
|  | 153 | +            def fileno(self): | 
|  | 154 | +                return self.fd | 
|  | 155 | + | 
|  | 156 | +        # file descriptor must be positive | 
|  | 157 | +        with self.assertRaises(ValueError): | 
|  | 158 | +            asfd(-1) | 
|  | 159 | +        with self.assertRaises(ValueError): | 
|  | 160 | +            asfd(FakeFile(-1)) | 
|  | 161 | + | 
|  | 162 | +        # fileno() result must be an integer | 
|  | 163 | +        with self.assertRaises(TypeError): | 
|  | 164 | +            asfd(FakeFile("text")) | 
|  | 165 | + | 
|  | 166 | +        # unsupported types | 
|  | 167 | +        for obj in ("string", ["list"], object()): | 
|  | 168 | +            with self.subTest(obj=obj): | 
|  | 169 | +                with self.assertRaises(TypeError): | 
|  | 170 | +                    asfd(obj) | 
|  | 171 | + | 
|  | 172 | +        # CRASHES asfd(NULL) | 
|  | 173 | + | 
|  | 174 | +    def test_pyfile_newstdprinter(self): | 
|  | 175 | +        # Test PyFile_NewStdPrinter() | 
|  | 176 | +        pyfile_newstdprinter = _testcapi.pyfile_newstdprinter | 
|  | 177 | + | 
|  | 178 | +        file = pyfile_newstdprinter(STDOUT_FD) | 
|  | 179 | +        self.assertEqual(file.closed, False) | 
|  | 180 | +        self.assertIsNone(file.encoding) | 
|  | 181 | +        self.assertEqual(file.mode, "w") | 
|  | 182 | + | 
|  | 183 | +        self.assertEqual(file.fileno(), STDOUT_FD) | 
|  | 184 | +        self.assertEqual(file.isatty(), os.isatty(STDOUT_FD)) | 
|  | 185 | + | 
|  | 186 | +        # flush() is a no-op | 
|  | 187 | +        self.assertIsNone(file.flush()) | 
|  | 188 | + | 
|  | 189 | +        # close() is a no-op | 
|  | 190 | +        self.assertIsNone(file.close()) | 
|  | 191 | +        self.assertEqual(file.closed, False) | 
|  | 192 | + | 
|  | 193 | +        support.check_disallow_instantiation(self, type(file)) | 
|  | 194 | + | 
|  | 195 | +    def test_pyfile_newstdprinter_write(self): | 
|  | 196 | +        # Test the write() method of PyFile_NewStdPrinter() | 
|  | 197 | +        pyfile_newstdprinter = _testcapi.pyfile_newstdprinter | 
|  | 198 | + | 
|  | 199 | +        filename = os_helper.TESTFN | 
|  | 200 | +        self.addCleanup(os_helper.unlink, filename) | 
|  | 201 | + | 
|  | 202 | +        try: | 
|  | 203 | +            old_stdout = os.dup(STDOUT_FD) | 
|  | 204 | +        except OSError as exc: | 
|  | 205 | +            # os.dup(STDOUT_FD) is not supported on WASI | 
|  | 206 | +            self.skipTest(f"os.dup() failed with {exc!r}") | 
|  | 207 | + | 
|  | 208 | +        try: | 
|  | 209 | +            with open(filename, "wb") as fp: | 
|  | 210 | +                # PyFile_NewStdPrinter() only accepts fileno(stdout) | 
|  | 211 | +                # or fileno(stderr) file descriptor. | 
|  | 212 | +                fd = fp.fileno() | 
|  | 213 | +                os.dup2(fd, STDOUT_FD) | 
|  | 214 | + | 
|  | 215 | +                file = pyfile_newstdprinter(STDOUT_FD) | 
|  | 216 | +                self.assertEqual(file.write("text"), 4) | 
|  | 217 | +                # The surrogate character is encoded with | 
|  | 218 | +                # the "surrogateescape" error handler | 
|  | 219 | +                self.assertEqual(file.write("[\udc80]"), 8) | 
|  | 220 | +        finally: | 
|  | 221 | +            os.dup2(old_stdout, STDOUT_FD) | 
|  | 222 | +            os.close(old_stdout) | 
|  | 223 | + | 
|  | 224 | +        with open(filename, "r") as fp: | 
|  | 225 | +            self.assertEqual(fp.read(), "text[\\udc80]") | 
|  | 226 | + | 
|  | 227 | +    # TODO: Test Py_UniversalNewlineFgets() | 
|  | 228 | + | 
|  | 229 | +    # PyFile_SetOpenCodeHook() and PyFile_OpenCode() are tested by | 
|  | 230 | +    # test_embed.test_open_code_hook() | 
|  | 231 | + | 
|  | 232 | + | 
|  | 233 | +if __name__ == "__main__": | 
|  | 234 | +    unittest.main() | 
0 commit comments