|
2 | 2 | # The .NET Foundation licenses this file to you under the Apache 2.0 License. |
3 | 3 | # See the LICENSE file in the project root for more information. |
4 | 4 |
|
| 5 | +import errno |
5 | 6 | import os |
6 | 7 | import sys |
7 | 8 | import unittest |
8 | 9 | import _thread |
9 | 10 |
|
10 | 11 | CP16623_LOCK = _thread.allocate_lock() |
11 | 12 |
|
12 | | -from iptest import IronPythonTestCase, is_cli, is_cpython, is_netcoreapp, is_posix, run_test, skipUnlessIronPython |
| 13 | +from iptest import IronPythonTestCase, is_cli, is_cpython, is_netcoreapp, is_posix, is_windows, run_test, skipUnlessIronPython |
13 | 14 |
|
14 | 15 | class FileTest(IronPythonTestCase): |
15 | 16 |
|
@@ -505,12 +506,12 @@ def test_file_manager_leak(self): |
505 | 506 | # the number of iterations should be larger than Microsoft.Scripting.Utils.HybridMapping.SIZE (currently 4K) |
506 | 507 | N = 5000 |
507 | 508 | for i in range(N): |
508 | | - fd = os.open(self.temp_file, os.O_WRONLY | os.O_CREAT) |
| 509 | + fd = os.open(self.temp_file, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) |
509 | 510 | f = os.fdopen(fd, 'w', closefd=True) |
510 | 511 | f.close() |
511 | 512 |
|
512 | 513 | for i in range(N): |
513 | | - fd = os.open(self.temp_file, os.O_WRONLY | os.O_CREAT) |
| 514 | + fd = os.open(self.temp_file, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) |
514 | 515 | f = os.fdopen(fd, 'w', closefd=False) |
515 | 516 | g = os.fdopen(f.fileno(), 'w', closefd=True) |
516 | 517 | g.close() |
@@ -624,7 +625,7 @@ def test_modes(self): |
624 | 625 | self.assertRaisesMessage(ValueError, "invalid mode: 'p'", open, 'abc', 'p') |
625 | 626 |
|
626 | 627 | # allow anything w/ U but r and w |
627 | | - err_msg = "mode U cannot be combined with 'x', 'w', 'a', or '+'" if is_cli or sys.version_info >= (3,7) else "mode U cannot be combined with x', 'w', 'a', or '+'" if sys.version_info >= (3,6) else "can't use U and writing mode at once" |
| 628 | + err_msg = "mode U cannot be combined with 'x', 'w', 'a', or '+'" if is_cli or sys.version_info >= (3,7,4) else "mode U cannot be combined with x', 'w', 'a', or '+'" if sys.version_info >= (3,6) else "can't use U and writing mode at once" |
628 | 629 | self.assertRaisesMessage(ValueError, err_msg, open, 'abc', 'Uw') |
629 | 630 | self.assertRaisesMessage(ValueError, err_msg, open, 'abc', 'Ua') |
630 | 631 | self.assertRaisesMessage(ValueError, err_msg, open, 'abc', 'Uw+') |
@@ -701,7 +702,7 @@ def test_errors(self): |
701 | 702 |
|
702 | 703 | with self.assertRaises(OSError) as cm: |
703 | 704 | open('path_too_long' * 100) |
704 | | - self.assertEqual(cm.exception.errno, (36 if is_posix else 22) if is_netcoreapp and not is_posix or sys.version_info >= (3,6) else 2) |
| 705 | + self.assertEqual(cm.exception.errno, (errno.ENAMETOOLONG if is_posix else errno.EINVAL) if is_netcoreapp and not is_posix or sys.version_info >= (3,6) else errno.ENOENT) |
705 | 706 |
|
706 | 707 | def test_write_bytes(self): |
707 | 708 | fname = self.temp_file |
@@ -756,6 +757,39 @@ def test_open_with_BOM(self): |
756 | 757 | with open(fileName, "rb") as f: |
757 | 758 | self.assertEqual(f.read(), b"\xef\xbb\xbf\x42\xc3\x93\x4d\x0d\x0a") |
758 | 759 |
|
| 760 | + |
| 761 | + def test_open_flags(self): |
| 762 | + test_data = { |
| 763 | + 'rb': os.O_RDONLY, |
| 764 | + 'wb': os.O_WRONLY | os.O_CREAT | os.O_TRUNC, |
| 765 | + 'ab': os.O_WRONLY | os.O_CREAT | os.O_APPEND, |
| 766 | + 'xb': os.O_WRONLY | os.O_CREAT | os.O_EXCL, |
| 767 | + 'rb+': os.O_RDWR, |
| 768 | + 'wb+': os.O_RDWR | os.O_CREAT | os.O_TRUNC, |
| 769 | + 'ab+': os.O_RDWR | os.O_CREAT | os.O_APPEND, |
| 770 | + 'xb+': os.O_RDWR | os.O_CREAT | os.O_EXCL, |
| 771 | + } |
| 772 | + extra_flags = 0 |
| 773 | + if is_posix: |
| 774 | + extra_flags |= os.O_CLOEXEC |
| 775 | + elif is_windows: |
| 776 | + extra_flags |= os.O_NOINHERIT | os.O_BINARY |
| 777 | + test_data = {k: v | extra_flags for k, v in test_data.items()} |
| 778 | + |
| 779 | + flags_received = None |
| 780 | + def test_open(name, flags): |
| 781 | + nonlocal flags_received |
| 782 | + flags_received = flags |
| 783 | + if mode[0] == 'x': |
| 784 | + os.unlink(name) |
| 785 | + return os.open(name, flags) |
| 786 | + |
| 787 | + for mode in sorted(test_data): |
| 788 | + with self.subTest(mode=mode): |
| 789 | + with open(self.temp_file, mode, opener=test_open): pass |
| 790 | + self.assertEqual(flags_received, test_data[mode]) |
| 791 | + |
| 792 | + |
759 | 793 | def test_opener(self): |
760 | 794 | data = "test message\n" |
761 | 795 | with open(self.temp_file, "w", opener=os.open) as f: |
|
0 commit comments