From 491cb893e5cc6743fbe4789c4d56563b79f8a5fd Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Tue, 30 Sep 2025 17:41:03 +0200 Subject: [PATCH 1/4] gh-139322: Create test_os package * Move test_posix.py and test_os.py to Lib/test/test_os/ * Split Windows specific test cases to a new test_windows.py file. --- Lib/test/libregrtest/findtests.py | 3 +- Lib/test/test_os/__init__.py | 6 + Lib/test/{ => test_os}/test_os.py | 628 +-------------------------- Lib/test/{ => test_os}/test_posix.py | 0 4 files changed, 9 insertions(+), 628 deletions(-) create mode 100644 Lib/test/test_os/__init__.py rename Lib/test/{ => test_os}/test_os.py (88%) rename Lib/test/{ => test_os}/test_posix.py (100%) diff --git a/Lib/test/libregrtest/findtests.py b/Lib/test/libregrtest/findtests.py index 79afaf9083ae59..6c0e50846a466b 100644 --- a/Lib/test/libregrtest/findtests.py +++ b/Lib/test/libregrtest/findtests.py @@ -25,10 +25,11 @@ "test_gdb", "test_inspect", "test_io", - "test_pydoc", "test_multiprocessing_fork", "test_multiprocessing_forkserver", "test_multiprocessing_spawn", + "test_os", + "test_pydoc", } diff --git a/Lib/test/test_os/__init__.py b/Lib/test/test_os/__init__.py new file mode 100644 index 00000000000000..bc502ef32d2916 --- /dev/null +++ b/Lib/test/test_os/__init__.py @@ -0,0 +1,6 @@ +import os.path +from test.support import load_package_tests + + +def load_tests(*args): + return load_package_tests(os.path.dirname(__file__), *args) diff --git a/Lib/test/test_os.py b/Lib/test/test_os/test_os.py similarity index 88% rename from Lib/test/test_os.py rename to Lib/test/test_os/test_os.py index 1180e27a7a5310..07db67f07a8acd 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os/test_os.py @@ -37,6 +37,7 @@ from test.support import infinite_recursion from test.support import warnings_helper from platform import win32_is_iot +from .utils import create_file try: import resource @@ -46,10 +47,6 @@ import fcntl except ImportError: fcntl = None -try: - import _winapi -except ImportError: - _winapi = None try: import pwd all_users = [u.pw_uid for u in pwd.getpwall()] @@ -93,11 +90,6 @@ def requires_os_func(name): return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name) -def create_file(filename, content=b'content'): - with open(filename, "xb", 0) as fp: - fp.write(content) - - # bpo-41625: On AIX, splice() only works with a socket, not with a pipe. requires_splice_pipe = unittest.skipIf(sys.platform.startswith("aix"), 'on AIX, splice() only accepts sockets') @@ -2466,42 +2458,6 @@ def test_execve_with_empty_path(self): self.fail('No OSError raised') -@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") -class Win32ErrorTests(unittest.TestCase): - def setUp(self): - try: - os.stat(os_helper.TESTFN) - except FileNotFoundError: - exists = False - except OSError as exc: - exists = True - self.fail("file %s must not exist; os.stat failed with %s" - % (os_helper.TESTFN, exc)) - else: - self.fail("file %s must not exist" % os_helper.TESTFN) - - def test_rename(self): - self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak") - - def test_remove(self): - self.assertRaises(OSError, os.remove, os_helper.TESTFN) - - def test_chdir(self): - self.assertRaises(OSError, os.chdir, os_helper.TESTFN) - - def test_mkdir(self): - self.addCleanup(os_helper.unlink, os_helper.TESTFN) - - with open(os_helper.TESTFN, "x") as f: - self.assertRaises(OSError, os.mkdir, os_helper.TESTFN) - - def test_utime(self): - self.assertRaises(OSError, os.utime, os_helper.TESTFN, None) - - def test_chmod(self): - self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0) - - @unittest.skipIf(support.is_wasi, "Cannot create invalid FD on WASI.") class TestInvalidFD(unittest.TestCase): singles = ["fchdir", "dup", "fstat", "fstatvfs", "tcgetpgrp", "ttyname"] @@ -2836,224 +2792,6 @@ def test_stat(self): for fn in self.unicodefn: os.stat(os.path.join(self.dir, fn)) -@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") -class Win32KillTests(unittest.TestCase): - def _kill(self, sig): - # Start sys.executable as a subprocess and communicate from the - # subprocess to the parent that the interpreter is ready. When it - # becomes ready, send *sig* via os.kill to the subprocess and check - # that the return code is equal to *sig*. - import ctypes - from ctypes import wintypes - import msvcrt - - # Since we can't access the contents of the process' stdout until the - # process has exited, use PeekNamedPipe to see what's inside stdout - # without waiting. This is done so we can tell that the interpreter - # is started and running at a point where it could handle a signal. - PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe - PeekNamedPipe.restype = wintypes.BOOL - PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle - ctypes.POINTER(ctypes.c_char), # stdout buf - wintypes.DWORD, # Buffer size - ctypes.POINTER(wintypes.DWORD), # bytes read - ctypes.POINTER(wintypes.DWORD), # bytes avail - ctypes.POINTER(wintypes.DWORD)) # bytes left - msg = "running" - proc = subprocess.Popen([sys.executable, "-c", - "import sys;" - "sys.stdout.write('{}');" - "sys.stdout.flush();" - "input()".format(msg)], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=subprocess.PIPE) - self.addCleanup(proc.stdout.close) - self.addCleanup(proc.stderr.close) - self.addCleanup(proc.stdin.close) - - count, max = 0, 100 - while count < max and proc.poll() is None: - # Create a string buffer to store the result of stdout from the pipe - buf = ctypes.create_string_buffer(len(msg)) - # Obtain the text currently in proc.stdout - # Bytes read/avail/left are left as NULL and unused - rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()), - buf, ctypes.sizeof(buf), None, None, None) - self.assertNotEqual(rslt, 0, "PeekNamedPipe failed") - if buf.value: - self.assertEqual(msg, buf.value.decode()) - break - time.sleep(0.1) - count += 1 - else: - self.fail("Did not receive communication from the subprocess") - - os.kill(proc.pid, sig) - self.assertEqual(proc.wait(), sig) - - def test_kill_sigterm(self): - # SIGTERM doesn't mean anything special, but make sure it works - self._kill(signal.SIGTERM) - - def test_kill_int(self): - # os.kill on Windows can take an int which gets set as the exit code - self._kill(100) - - @unittest.skipIf(mmap is None, "requires mmap") - def _kill_with_event(self, event, name): - tagname = "test_os_%s" % uuid.uuid1() - m = mmap.mmap(-1, 1, tagname) - m[0] = 0 - - # Run a script which has console control handling enabled. - script = os.path.join(os.path.dirname(__file__), - "win_console_handler.py") - cmd = [sys.executable, script, tagname] - proc = subprocess.Popen(cmd, - creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) - - with proc: - # Let the interpreter startup before we send signals. See #3137. - for _ in support.sleeping_retry(support.SHORT_TIMEOUT): - if proc.poll() is None: - break - else: - # Forcefully kill the process if we weren't able to signal it. - proc.kill() - self.fail("Subprocess didn't finish initialization") - - os.kill(proc.pid, event) - - try: - # proc.send_signal(event) could also be done here. - # Allow time for the signal to be passed and the process to exit. - proc.wait(timeout=support.SHORT_TIMEOUT) - except subprocess.TimeoutExpired: - # Forcefully kill the process if we weren't able to signal it. - proc.kill() - self.fail("subprocess did not stop on {}".format(name)) - - @unittest.skip("subprocesses aren't inheriting Ctrl+C property") - @support.requires_subprocess() - def test_CTRL_C_EVENT(self): - from ctypes import wintypes - import ctypes - - # Make a NULL value by creating a pointer with no argument. - NULL = ctypes.POINTER(ctypes.c_int)() - SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler - SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int), - wintypes.BOOL) - SetConsoleCtrlHandler.restype = wintypes.BOOL - - # Calling this with NULL and FALSE causes the calling process to - # handle Ctrl+C, rather than ignore it. This property is inherited - # by subprocesses. - SetConsoleCtrlHandler(NULL, 0) - - self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT") - - @support.requires_subprocess() - def test_CTRL_BREAK_EVENT(self): - self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT") - - -@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") -class Win32ListdirTests(unittest.TestCase): - """Test listdir on Windows.""" - - def setUp(self): - self.created_paths = [] - for i in range(2): - dir_name = 'SUB%d' % i - dir_path = os.path.join(os_helper.TESTFN, dir_name) - file_name = 'FILE%d' % i - file_path = os.path.join(os_helper.TESTFN, file_name) - os.makedirs(dir_path) - with open(file_path, 'w', encoding='utf-8') as f: - f.write("I'm %s and proud of it. Blame test_os.\n" % file_path) - self.created_paths.extend([dir_name, file_name]) - self.created_paths.sort() - - def tearDown(self): - shutil.rmtree(os_helper.TESTFN) - - def test_listdir_no_extended_path(self): - """Test when the path is not an "extended" path.""" - # unicode - self.assertEqual( - sorted(os.listdir(os_helper.TESTFN)), - self.created_paths) - - # bytes - self.assertEqual( - sorted(os.listdir(os.fsencode(os_helper.TESTFN))), - [os.fsencode(path) for path in self.created_paths]) - - def test_listdir_extended_path(self): - """Test when the path starts with '\\\\?\\'.""" - # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath - # unicode - path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN) - self.assertEqual( - sorted(os.listdir(path)), - self.created_paths) - - # bytes - path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN)) - self.assertEqual( - sorted(os.listdir(path)), - [os.fsencode(path) for path in self.created_paths]) - - -@unittest.skipUnless(os.name == "nt", "NT specific tests") -class Win32ListdriveTests(unittest.TestCase): - """Test listdrive, listmounts and listvolume on Windows.""" - - def setUp(self): - # Get drives and volumes from fsutil - out = subprocess.check_output( - ["fsutil.exe", "volume", "list"], - cwd=os.path.join(os.getenv("SystemRoot", "\\Windows"), "System32"), - encoding="mbcs", - errors="ignore", - ) - lines = out.splitlines() - self.known_volumes = {l for l in lines if l.startswith('\\\\?\\')} - self.known_drives = {l for l in lines if l[1:] == ':\\'} - self.known_mounts = {l for l in lines if l[1:3] == ':\\'} - - def test_listdrives(self): - drives = os.listdrives() - self.assertIsInstance(drives, list) - self.assertSetEqual( - self.known_drives, - self.known_drives & set(drives), - ) - - def test_listvolumes(self): - volumes = os.listvolumes() - self.assertIsInstance(volumes, list) - self.assertSetEqual( - self.known_volumes, - self.known_volumes & set(volumes), - ) - - def test_listmounts(self): - for volume in os.listvolumes(): - try: - mounts = os.listmounts(volume) - except OSError as ex: - if support.verbose: - print("Skipping", volume, "because of", ex) - else: - self.assertIsInstance(mounts, list) - self.assertSetEqual( - set(mounts), - self.known_mounts & set(mounts), - ) - @unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()') class ReadlinkTests(unittest.TestCase): @@ -3116,370 +2854,6 @@ def test_bytes(self): self.assertIsInstance(path, bytes) -@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") -@os_helper.skip_unless_symlink -class Win32SymlinkTests(unittest.TestCase): - filelink = 'filelinktest' - filelink_target = os.path.abspath(__file__) - dirlink = 'dirlinktest' - dirlink_target = os.path.dirname(filelink_target) - missing_link = 'missing link' - - def setUp(self): - assert os.path.exists(self.dirlink_target) - assert os.path.exists(self.filelink_target) - assert not os.path.exists(self.dirlink) - assert not os.path.exists(self.filelink) - assert not os.path.exists(self.missing_link) - - def tearDown(self): - if os.path.exists(self.filelink): - os.remove(self.filelink) - if os.path.exists(self.dirlink): - os.rmdir(self.dirlink) - if os.path.lexists(self.missing_link): - os.remove(self.missing_link) - - def test_directory_link(self): - os.symlink(self.dirlink_target, self.dirlink) - self.assertTrue(os.path.exists(self.dirlink)) - self.assertTrue(os.path.isdir(self.dirlink)) - self.assertTrue(os.path.islink(self.dirlink)) - self.check_stat(self.dirlink, self.dirlink_target) - - def test_file_link(self): - os.symlink(self.filelink_target, self.filelink) - self.assertTrue(os.path.exists(self.filelink)) - self.assertTrue(os.path.isfile(self.filelink)) - self.assertTrue(os.path.islink(self.filelink)) - self.check_stat(self.filelink, self.filelink_target) - - def _create_missing_dir_link(self): - 'Create a "directory" link to a non-existent target' - linkname = self.missing_link - if os.path.lexists(linkname): - os.remove(linkname) - target = r'c:\\target does not exist.29r3c740' - assert not os.path.exists(target) - target_is_dir = True - os.symlink(target, linkname, target_is_dir) - - def test_remove_directory_link_to_missing_target(self): - self._create_missing_dir_link() - # For compatibility with Unix, os.remove will check the - # directory status and call RemoveDirectory if the symlink - # was created with target_is_dir==True. - os.remove(self.missing_link) - - def test_isdir_on_directory_link_to_missing_target(self): - self._create_missing_dir_link() - self.assertFalse(os.path.isdir(self.missing_link)) - - def test_rmdir_on_directory_link_to_missing_target(self): - self._create_missing_dir_link() - os.rmdir(self.missing_link) - - def check_stat(self, link, target): - self.assertEqual(os.stat(link), os.stat(target)) - self.assertNotEqual(os.lstat(link), os.stat(link)) - - bytes_link = os.fsencode(link) - self.assertEqual(os.stat(bytes_link), os.stat(target)) - self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link)) - - def test_12084(self): - level1 = os.path.abspath(os_helper.TESTFN) - level2 = os.path.join(level1, "level2") - level3 = os.path.join(level2, "level3") - self.addCleanup(os_helper.rmtree, level1) - - os.mkdir(level1) - os.mkdir(level2) - os.mkdir(level3) - - file1 = os.path.abspath(os.path.join(level1, "file1")) - create_file(file1) - - orig_dir = os.getcwd() - try: - os.chdir(level2) - link = os.path.join(level2, "link") - os.symlink(os.path.relpath(file1), "link") - self.assertIn("link", os.listdir(os.getcwd())) - - # Check os.stat calls from the same dir as the link - self.assertEqual(os.stat(file1), os.stat("link")) - - # Check os.stat calls from a dir below the link - os.chdir(level1) - self.assertEqual(os.stat(file1), - os.stat(os.path.relpath(link))) - - # Check os.stat calls from a dir above the link - os.chdir(level3) - self.assertEqual(os.stat(file1), - os.stat(os.path.relpath(link))) - finally: - os.chdir(orig_dir) - - @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users') - and os.path.exists(r'C:\ProgramData'), - 'Test directories not found') - def test_29248(self): - # os.symlink() calls CreateSymbolicLink, which creates - # the reparse data buffer with the print name stored - # first, so the offset is always 0. CreateSymbolicLink - # stores the "PrintName" DOS path (e.g. "C:\") first, - # with an offset of 0, followed by the "SubstituteName" - # NT path (e.g. "\??\C:\"). The "All Users" link, on - # the other hand, seems to have been created manually - # with an inverted order. - target = os.readlink(r'C:\Users\All Users') - self.assertTrue(os.path.samefile(target, r'C:\ProgramData')) - - def test_buffer_overflow(self): - # Older versions would have a buffer overflow when detecting - # whether a link source was a directory. This test ensures we - # no longer crash, but does not otherwise validate the behavior - segment = 'X' * 27 - path = os.path.join(*[segment] * 10) - test_cases = [ - # overflow with absolute src - ('\\' + path, segment), - # overflow dest with relative src - (segment, path), - # overflow when joining src - (path[:180], path[:180]), - ] - for src, dest in test_cases: - try: - os.symlink(src, dest) - except FileNotFoundError: - pass - else: - try: - os.remove(dest) - except OSError: - pass - # Also test with bytes, since that is a separate code path. - try: - os.symlink(os.fsencode(src), os.fsencode(dest)) - except FileNotFoundError: - pass - else: - try: - os.remove(dest) - except OSError: - pass - - def test_appexeclink(self): - root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps') - if not os.path.isdir(root): - self.skipTest("test requires a WindowsApps directory") - - aliases = [os.path.join(root, a) - for a in fnmatch.filter(os.listdir(root), '*.exe')] - - for alias in aliases: - if support.verbose: - print() - print("Testing with", alias) - st = os.lstat(alias) - self.assertEqual(st, os.stat(alias)) - self.assertFalse(stat.S_ISLNK(st.st_mode)) - self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK) - self.assertTrue(os.path.isfile(alias)) - # testing the first one we see is sufficient - break - else: - self.skipTest("test requires an app execution alias") - -@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") -class Win32JunctionTests(unittest.TestCase): - junction = 'junctiontest' - junction_target = os.path.dirname(os.path.abspath(__file__)) - - def setUp(self): - assert os.path.exists(self.junction_target) - assert not os.path.lexists(self.junction) - - def tearDown(self): - if os.path.lexists(self.junction): - os.unlink(self.junction) - - def test_create_junction(self): - _winapi.CreateJunction(self.junction_target, self.junction) - self.assertTrue(os.path.lexists(self.junction)) - self.assertTrue(os.path.exists(self.junction)) - self.assertTrue(os.path.isdir(self.junction)) - self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction)) - self.assertEqual(os.stat(self.junction), os.stat(self.junction_target)) - - # bpo-37834: Junctions are not recognized as links. - self.assertFalse(os.path.islink(self.junction)) - self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target), - os.path.normcase(os.readlink(self.junction))) - - def test_unlink_removes_junction(self): - _winapi.CreateJunction(self.junction_target, self.junction) - self.assertTrue(os.path.exists(self.junction)) - self.assertTrue(os.path.lexists(self.junction)) - - os.unlink(self.junction) - self.assertFalse(os.path.exists(self.junction)) - -@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") -class Win32NtTests(unittest.TestCase): - def test_getfinalpathname_handles(self): - nt = import_helper.import_module('nt') - ctypes = import_helper.import_module('ctypes') - # Ruff false positive -- it thinks we're redefining `ctypes` here - import ctypes.wintypes # noqa: F811 - - kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True) - kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE - - kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL - kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE, - ctypes.wintypes.LPDWORD) - - # This is a pseudo-handle that doesn't need to be closed - hproc = kernel.GetCurrentProcess() - - handle_count = ctypes.wintypes.DWORD() - ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count)) - self.assertEqual(1, ok) - - before_count = handle_count.value - - # The first two test the error path, __file__ tests the success path - filenames = [ - r'\\?\C:', - r'\\?\NUL', - r'\\?\CONIN', - __file__, - ] - - for _ in range(10): - for name in filenames: - try: - nt._getfinalpathname(name) - except Exception: - # Failure is expected - pass - try: - os.stat(name) - except Exception: - pass - - ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count)) - self.assertEqual(1, ok) - - handle_delta = handle_count.value - before_count - - self.assertEqual(0, handle_delta) - - @support.requires_subprocess() - def test_stat_unlink_race(self): - # bpo-46785: the implementation of os.stat() falls back to reading - # the parent directory if CreateFileW() fails with a permission - # error. If reading the parent directory fails because the file or - # directory are subsequently unlinked, or because the volume or - # share are no longer available, then the original permission error - # should not be restored. - filename = os_helper.TESTFN - self.addCleanup(os_helper.unlink, filename) - deadline = time.time() + 5 - command = textwrap.dedent("""\ - import os - import sys - import time - - filename = sys.argv[1] - deadline = float(sys.argv[2]) - - while time.time() < deadline: - try: - with open(filename, "w") as f: - pass - except OSError: - pass - try: - os.remove(filename) - except OSError: - pass - """) - - with subprocess.Popen([sys.executable, '-c', command, filename, str(deadline)]) as proc: - while time.time() < deadline: - try: - os.stat(filename) - except FileNotFoundError as e: - assert e.winerror == 2 # ERROR_FILE_NOT_FOUND - try: - proc.wait(1) - except subprocess.TimeoutExpired: - proc.terminate() - - @support.requires_subprocess() - def test_stat_inaccessible_file(self): - filename = os_helper.TESTFN - ICACLS = os.path.expandvars(r"%SystemRoot%\System32\icacls.exe") - - with open(filename, "wb") as f: - f.write(b'Test data') - - stat1 = os.stat(filename) - - try: - # Remove all permissions from the file - subprocess.check_output([ICACLS, filename, "/inheritance:r"], - stderr=subprocess.STDOUT) - except subprocess.CalledProcessError as ex: - if support.verbose: - print(ICACLS, filename, "/inheritance:r", "failed.") - print(ex.stdout.decode("oem", "replace").rstrip()) - try: - os.unlink(filename) - except OSError: - pass - self.skipTest("Unable to create inaccessible file") - - def cleanup(): - # Give delete permission to the owner (us) - subprocess.check_output([ICACLS, filename, "/grant", "*WD:(D)"], - stderr=subprocess.STDOUT) - os.unlink(filename) - - self.addCleanup(cleanup) - - if support.verbose: - print("File:", filename) - print("stat with access:", stat1) - - # First test - we shouldn't raise here, because we still have access to - # the directory and can extract enough information from its metadata. - stat2 = os.stat(filename) - - if support.verbose: - print(" without access:", stat2) - - # We may not get st_dev/st_ino, so ensure those are 0 or match - self.assertIn(stat2.st_dev, (0, stat1.st_dev)) - self.assertIn(stat2.st_ino, (0, stat1.st_ino)) - - # st_mode and st_size should match (for a normal file, at least) - self.assertEqual(stat1.st_mode, stat2.st_mode) - self.assertEqual(stat1.st_size, stat2.st_size) - - # st_ctime and st_mtime should be the same - self.assertEqual(stat1.st_ctime, stat2.st_ctime) - self.assertEqual(stat1.st_mtime, stat2.st_mtime) - - # st_atime should be the same or later - self.assertGreaterEqual(stat1.st_atime, stat2.st_atime) - - @os_helper.skip_unless_symlink class NonLocalSymlinkTests(unittest.TestCase): diff --git a/Lib/test/test_posix.py b/Lib/test/test_os/test_posix.py similarity index 100% rename from Lib/test/test_posix.py rename to Lib/test/test_os/test_posix.py From 852e6a1ebb1d53f338a9ed5626e63f2005cb21aa Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Wed, 1 Oct 2025 08:58:55 +0200 Subject: [PATCH 2/4] Add missing files --- Lib/test/test_os/test_windows.py | 640 +++++++++++++++++++++++++++++++ Lib/test/test_os/utils.py | 3 + 2 files changed, 643 insertions(+) create mode 100644 Lib/test/test_os/test_windows.py create mode 100644 Lib/test/test_os/utils.py diff --git a/Lib/test/test_os/test_windows.py b/Lib/test/test_os/test_windows.py new file mode 100644 index 00000000000000..b306a037399fc8 --- /dev/null +++ b/Lib/test/test_os/test_windows.py @@ -0,0 +1,640 @@ +import sys +import unittest + +if sys.platform != "win32": + raise unittest.SkipTest("Win32 specific tests") + +import _winapi +import fnmatch +import mmap +import os +import shutil +import signal +import stat +import subprocess +import textwrap +import time +import uuid +from test import support +from test.support import import_helper +from test.support import os_helper +from .utils import create_file + + +class Win32ErrorTests(unittest.TestCase): + def setUp(self): + try: + os.stat(os_helper.TESTFN) + except FileNotFoundError: + exists = False + except OSError as exc: + exists = True + self.fail("file %s must not exist; os.stat failed with %s" + % (os_helper.TESTFN, exc)) + else: + self.fail("file %s must not exist" % os_helper.TESTFN) + + def test_rename(self): + self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak") + + def test_remove(self): + self.assertRaises(OSError, os.remove, os_helper.TESTFN) + + def test_chdir(self): + self.assertRaises(OSError, os.chdir, os_helper.TESTFN) + + def test_mkdir(self): + self.addCleanup(os_helper.unlink, os_helper.TESTFN) + + with open(os_helper.TESTFN, "x") as f: + self.assertRaises(OSError, os.mkdir, os_helper.TESTFN) + + def test_utime(self): + self.assertRaises(OSError, os.utime, os_helper.TESTFN, None) + + def test_chmod(self): + self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0) + + +class Win32KillTests(unittest.TestCase): + def _kill(self, sig): + # Start sys.executable as a subprocess and communicate from the + # subprocess to the parent that the interpreter is ready. When it + # becomes ready, send *sig* via os.kill to the subprocess and check + # that the return code is equal to *sig*. + import ctypes + from ctypes import wintypes + import msvcrt + + # Since we can't access the contents of the process' stdout until the + # process has exited, use PeekNamedPipe to see what's inside stdout + # without waiting. This is done so we can tell that the interpreter + # is started and running at a point where it could handle a signal. + PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe + PeekNamedPipe.restype = wintypes.BOOL + PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle + ctypes.POINTER(ctypes.c_char), # stdout buf + wintypes.DWORD, # Buffer size + ctypes.POINTER(wintypes.DWORD), # bytes read + ctypes.POINTER(wintypes.DWORD), # bytes avail + ctypes.POINTER(wintypes.DWORD)) # bytes left + msg = "running" + proc = subprocess.Popen([sys.executable, "-c", + "import sys;" + "sys.stdout.write('{}');" + "sys.stdout.flush();" + "input()".format(msg)], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + self.addCleanup(proc.stdout.close) + self.addCleanup(proc.stderr.close) + self.addCleanup(proc.stdin.close) + + count, max = 0, 100 + while count < max and proc.poll() is None: + # Create a string buffer to store the result of stdout from the pipe + buf = ctypes.create_string_buffer(len(msg)) + # Obtain the text currently in proc.stdout + # Bytes read/avail/left are left as NULL and unused + rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()), + buf, ctypes.sizeof(buf), None, None, None) + self.assertNotEqual(rslt, 0, "PeekNamedPipe failed") + if buf.value: + self.assertEqual(msg, buf.value.decode()) + break + time.sleep(0.1) + count += 1 + else: + self.fail("Did not receive communication from the subprocess") + + os.kill(proc.pid, sig) + self.assertEqual(proc.wait(), sig) + + def test_kill_sigterm(self): + # SIGTERM doesn't mean anything special, but make sure it works + self._kill(signal.SIGTERM) + + def test_kill_int(self): + # os.kill on Windows can take an int which gets set as the exit code + self._kill(100) + + @unittest.skipIf(mmap is None, "requires mmap") + def _kill_with_event(self, event, name): + tagname = "test_os_%s" % uuid.uuid1() + m = mmap.mmap(-1, 1, tagname) + m[0] = 0 + + # Run a script which has console control handling enabled. + script = os.path.join(os.path.dirname(__file__), + "win_console_handler.py") + cmd = [sys.executable, script, tagname] + proc = subprocess.Popen(cmd, + creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) + + with proc: + # Let the interpreter startup before we send signals. See #3137. + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if proc.poll() is None: + break + else: + # Forcefully kill the process if we weren't able to signal it. + proc.kill() + self.fail("Subprocess didn't finish initialization") + + os.kill(proc.pid, event) + + try: + # proc.send_signal(event) could also be done here. + # Allow time for the signal to be passed and the process to exit. + proc.wait(timeout=support.SHORT_TIMEOUT) + except subprocess.TimeoutExpired: + # Forcefully kill the process if we weren't able to signal it. + proc.kill() + self.fail("subprocess did not stop on {}".format(name)) + + @unittest.skip("subprocesses aren't inheriting Ctrl+C property") + @support.requires_subprocess() + def test_CTRL_C_EVENT(self): + from ctypes import wintypes + import ctypes + + # Make a NULL value by creating a pointer with no argument. + NULL = ctypes.POINTER(ctypes.c_int)() + SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler + SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int), + wintypes.BOOL) + SetConsoleCtrlHandler.restype = wintypes.BOOL + + # Calling this with NULL and FALSE causes the calling process to + # handle Ctrl+C, rather than ignore it. This property is inherited + # by subprocesses. + SetConsoleCtrlHandler(NULL, 0) + + self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT") + + @support.requires_subprocess() + def test_CTRL_BREAK_EVENT(self): + self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT") + + +class Win32ListdirTests(unittest.TestCase): + """Test listdir on Windows.""" + + def setUp(self): + self.created_paths = [] + for i in range(2): + dir_name = 'SUB%d' % i + dir_path = os.path.join(os_helper.TESTFN, dir_name) + file_name = 'FILE%d' % i + file_path = os.path.join(os_helper.TESTFN, file_name) + os.makedirs(dir_path) + with open(file_path, 'w', encoding='utf-8') as f: + f.write("I'm %s and proud of it. Blame test_os.\n" % file_path) + self.created_paths.extend([dir_name, file_name]) + self.created_paths.sort() + + def tearDown(self): + shutil.rmtree(os_helper.TESTFN) + + def test_listdir_no_extended_path(self): + """Test when the path is not an "extended" path.""" + # unicode + self.assertEqual( + sorted(os.listdir(os_helper.TESTFN)), + self.created_paths) + + # bytes + self.assertEqual( + sorted(os.listdir(os.fsencode(os_helper.TESTFN))), + [os.fsencode(path) for path in self.created_paths]) + + def test_listdir_extended_path(self): + """Test when the path starts with '\\\\?\\'.""" + # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath + # unicode + path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN) + self.assertEqual( + sorted(os.listdir(path)), + self.created_paths) + + # bytes + path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN)) + self.assertEqual( + sorted(os.listdir(path)), + [os.fsencode(path) for path in self.created_paths]) + + +@unittest.skipUnless(os.name == "nt", "NT specific tests") +class Win32ListdriveTests(unittest.TestCase): + """Test listdrive, listmounts and listvolume on Windows.""" + + def setUp(self): + # Get drives and volumes from fsutil + out = subprocess.check_output( + ["fsutil.exe", "volume", "list"], + cwd=os.path.join(os.getenv("SystemRoot", "\\Windows"), "System32"), + encoding="mbcs", + errors="ignore", + ) + lines = out.splitlines() + self.known_volumes = {l for l in lines if l.startswith('\\\\?\\')} + self.known_drives = {l for l in lines if l[1:] == ':\\'} + self.known_mounts = {l for l in lines if l[1:3] == ':\\'} + + def test_listdrives(self): + drives = os.listdrives() + self.assertIsInstance(drives, list) + self.assertSetEqual( + self.known_drives, + self.known_drives & set(drives), + ) + + def test_listvolumes(self): + volumes = os.listvolumes() + self.assertIsInstance(volumes, list) + self.assertSetEqual( + self.known_volumes, + self.known_volumes & set(volumes), + ) + + def test_listmounts(self): + for volume in os.listvolumes(): + try: + mounts = os.listmounts(volume) + except OSError as ex: + if support.verbose: + print("Skipping", volume, "because of", ex) + else: + self.assertIsInstance(mounts, list) + self.assertSetEqual( + set(mounts), + self.known_mounts & set(mounts), + ) + + +@os_helper.skip_unless_symlink +class Win32SymlinkTests(unittest.TestCase): + filelink = 'filelinktest' + filelink_target = os.path.abspath(__file__) + dirlink = 'dirlinktest' + dirlink_target = os.path.dirname(filelink_target) + missing_link = 'missing link' + + def setUp(self): + assert os.path.exists(self.dirlink_target) + assert os.path.exists(self.filelink_target) + assert not os.path.exists(self.dirlink) + assert not os.path.exists(self.filelink) + assert not os.path.exists(self.missing_link) + + def tearDown(self): + if os.path.exists(self.filelink): + os.remove(self.filelink) + if os.path.exists(self.dirlink): + os.rmdir(self.dirlink) + if os.path.lexists(self.missing_link): + os.remove(self.missing_link) + + def test_directory_link(self): + os.symlink(self.dirlink_target, self.dirlink) + self.assertTrue(os.path.exists(self.dirlink)) + self.assertTrue(os.path.isdir(self.dirlink)) + self.assertTrue(os.path.islink(self.dirlink)) + self.check_stat(self.dirlink, self.dirlink_target) + + def test_file_link(self): + os.symlink(self.filelink_target, self.filelink) + self.assertTrue(os.path.exists(self.filelink)) + self.assertTrue(os.path.isfile(self.filelink)) + self.assertTrue(os.path.islink(self.filelink)) + self.check_stat(self.filelink, self.filelink_target) + + def _create_missing_dir_link(self): + 'Create a "directory" link to a non-existent target' + linkname = self.missing_link + if os.path.lexists(linkname): + os.remove(linkname) + target = r'c:\\target does not exist.29r3c740' + assert not os.path.exists(target) + target_is_dir = True + os.symlink(target, linkname, target_is_dir) + + def test_remove_directory_link_to_missing_target(self): + self._create_missing_dir_link() + # For compatibility with Unix, os.remove will check the + # directory status and call RemoveDirectory if the symlink + # was created with target_is_dir==True. + os.remove(self.missing_link) + + def test_isdir_on_directory_link_to_missing_target(self): + self._create_missing_dir_link() + self.assertFalse(os.path.isdir(self.missing_link)) + + def test_rmdir_on_directory_link_to_missing_target(self): + self._create_missing_dir_link() + os.rmdir(self.missing_link) + + def check_stat(self, link, target): + self.assertEqual(os.stat(link), os.stat(target)) + self.assertNotEqual(os.lstat(link), os.stat(link)) + + bytes_link = os.fsencode(link) + self.assertEqual(os.stat(bytes_link), os.stat(target)) + self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link)) + + def test_12084(self): + level1 = os.path.abspath(os_helper.TESTFN) + level2 = os.path.join(level1, "level2") + level3 = os.path.join(level2, "level3") + self.addCleanup(os_helper.rmtree, level1) + + os.mkdir(level1) + os.mkdir(level2) + os.mkdir(level3) + + file1 = os.path.abspath(os.path.join(level1, "file1")) + create_file(file1) + + orig_dir = os.getcwd() + try: + os.chdir(level2) + link = os.path.join(level2, "link") + os.symlink(os.path.relpath(file1), "link") + self.assertIn("link", os.listdir(os.getcwd())) + + # Check os.stat calls from the same dir as the link + self.assertEqual(os.stat(file1), os.stat("link")) + + # Check os.stat calls from a dir below the link + os.chdir(level1) + self.assertEqual(os.stat(file1), + os.stat(os.path.relpath(link))) + + # Check os.stat calls from a dir above the link + os.chdir(level3) + self.assertEqual(os.stat(file1), + os.stat(os.path.relpath(link))) + finally: + os.chdir(orig_dir) + + @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users') + and os.path.exists(r'C:\ProgramData'), + 'Test directories not found') + def test_29248(self): + # os.symlink() calls CreateSymbolicLink, which creates + # the reparse data buffer with the print name stored + # first, so the offset is always 0. CreateSymbolicLink + # stores the "PrintName" DOS path (e.g. "C:\") first, + # with an offset of 0, followed by the "SubstituteName" + # NT path (e.g. "\??\C:\"). The "All Users" link, on + # the other hand, seems to have been created manually + # with an inverted order. + target = os.readlink(r'C:\Users\All Users') + self.assertTrue(os.path.samefile(target, r'C:\ProgramData')) + + def test_buffer_overflow(self): + # Older versions would have a buffer overflow when detecting + # whether a link source was a directory. This test ensures we + # no longer crash, but does not otherwise validate the behavior + segment = 'X' * 27 + path = os.path.join(*[segment] * 10) + test_cases = [ + # overflow with absolute src + ('\\' + path, segment), + # overflow dest with relative src + (segment, path), + # overflow when joining src + (path[:180], path[:180]), + ] + for src, dest in test_cases: + try: + os.symlink(src, dest) + except FileNotFoundError: + pass + else: + try: + os.remove(dest) + except OSError: + pass + # Also test with bytes, since that is a separate code path. + try: + os.symlink(os.fsencode(src), os.fsencode(dest)) + except FileNotFoundError: + pass + else: + try: + os.remove(dest) + except OSError: + pass + + def test_appexeclink(self): + root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps') + if not os.path.isdir(root): + self.skipTest("test requires a WindowsApps directory") + + aliases = [os.path.join(root, a) + for a in fnmatch.filter(os.listdir(root), '*.exe')] + + for alias in aliases: + if support.verbose: + print() + print("Testing with", alias) + st = os.lstat(alias) + self.assertEqual(st, os.stat(alias)) + self.assertFalse(stat.S_ISLNK(st.st_mode)) + self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK) + self.assertTrue(os.path.isfile(alias)) + # testing the first one we see is sufficient + break + else: + self.skipTest("test requires an app execution alias") + + +class Win32JunctionTests(unittest.TestCase): + junction = 'junctiontest' + junction_target = os.path.dirname(os.path.abspath(__file__)) + + def setUp(self): + assert os.path.exists(self.junction_target) + assert not os.path.lexists(self.junction) + + def tearDown(self): + if os.path.lexists(self.junction): + os.unlink(self.junction) + + def test_create_junction(self): + _winapi.CreateJunction(self.junction_target, self.junction) + self.assertTrue(os.path.lexists(self.junction)) + self.assertTrue(os.path.exists(self.junction)) + self.assertTrue(os.path.isdir(self.junction)) + self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction)) + self.assertEqual(os.stat(self.junction), os.stat(self.junction_target)) + + # bpo-37834: Junctions are not recognized as links. + self.assertFalse(os.path.islink(self.junction)) + self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target), + os.path.normcase(os.readlink(self.junction))) + + def test_unlink_removes_junction(self): + _winapi.CreateJunction(self.junction_target, self.junction) + self.assertTrue(os.path.exists(self.junction)) + self.assertTrue(os.path.lexists(self.junction)) + + os.unlink(self.junction) + self.assertFalse(os.path.exists(self.junction)) + + +class Win32NtTests(unittest.TestCase): + def test_getfinalpathname_handles(self): + nt = import_helper.import_module('nt') + ctypes = import_helper.import_module('ctypes') + # Ruff false positive -- it thinks we're redefining `ctypes` here + import ctypes.wintypes # noqa: F811 + + kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True) + kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE + + kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL + kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE, + ctypes.wintypes.LPDWORD) + + # This is a pseudo-handle that doesn't need to be closed + hproc = kernel.GetCurrentProcess() + + handle_count = ctypes.wintypes.DWORD() + ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count)) + self.assertEqual(1, ok) + + before_count = handle_count.value + + # The first two test the error path, __file__ tests the success path + filenames = [ + r'\\?\C:', + r'\\?\NUL', + r'\\?\CONIN', + __file__, + ] + + for _ in range(10): + for name in filenames: + try: + nt._getfinalpathname(name) + except Exception: + # Failure is expected + pass + try: + os.stat(name) + except Exception: + pass + + ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count)) + self.assertEqual(1, ok) + + handle_delta = handle_count.value - before_count + + self.assertEqual(0, handle_delta) + + @support.requires_subprocess() + def test_stat_unlink_race(self): + # bpo-46785: the implementation of os.stat() falls back to reading + # the parent directory if CreateFileW() fails with a permission + # error. If reading the parent directory fails because the file or + # directory are subsequently unlinked, or because the volume or + # share are no longer available, then the original permission error + # should not be restored. + filename = os_helper.TESTFN + self.addCleanup(os_helper.unlink, filename) + deadline = time.time() + 5 + command = textwrap.dedent("""\ + import os + import sys + import time + + filename = sys.argv[1] + deadline = float(sys.argv[2]) + + while time.time() < deadline: + try: + with open(filename, "w") as f: + pass + except OSError: + pass + try: + os.remove(filename) + except OSError: + pass + """) + + with subprocess.Popen([sys.executable, '-c', command, filename, str(deadline)]) as proc: + while time.time() < deadline: + try: + os.stat(filename) + except FileNotFoundError as e: + assert e.winerror == 2 # ERROR_FILE_NOT_FOUND + try: + proc.wait(1) + except subprocess.TimeoutExpired: + proc.terminate() + + @support.requires_subprocess() + def test_stat_inaccessible_file(self): + filename = os_helper.TESTFN + ICACLS = os.path.expandvars(r"%SystemRoot%\System32\icacls.exe") + + with open(filename, "wb") as f: + f.write(b'Test data') + + stat1 = os.stat(filename) + + try: + # Remove all permissions from the file + subprocess.check_output([ICACLS, filename, "/inheritance:r"], + stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as ex: + if support.verbose: + print(ICACLS, filename, "/inheritance:r", "failed.") + print(ex.stdout.decode("oem", "replace").rstrip()) + try: + os.unlink(filename) + except OSError: + pass + self.skipTest("Unable to create inaccessible file") + + def cleanup(): + # Give delete permission to the owner (us) + subprocess.check_output([ICACLS, filename, "/grant", "*WD:(D)"], + stderr=subprocess.STDOUT) + os.unlink(filename) + + self.addCleanup(cleanup) + + if support.verbose: + print("File:", filename) + print("stat with access:", stat1) + + # First test - we shouldn't raise here, because we still have access to + # the directory and can extract enough information from its metadata. + stat2 = os.stat(filename) + + if support.verbose: + print(" without access:", stat2) + + # We may not get st_dev/st_ino, so ensure those are 0 or match + self.assertIn(stat2.st_dev, (0, stat1.st_dev)) + self.assertIn(stat2.st_ino, (0, stat1.st_ino)) + + # st_mode and st_size should match (for a normal file, at least) + self.assertEqual(stat1.st_mode, stat2.st_mode) + self.assertEqual(stat1.st_size, stat2.st_size) + + # st_ctime and st_mtime should be the same + self.assertEqual(stat1.st_ctime, stat2.st_ctime) + self.assertEqual(stat1.st_mtime, stat2.st_mtime) + + # st_atime should be the same or later + self.assertGreaterEqual(stat1.st_atime, stat2.st_atime) + + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_os/utils.py b/Lib/test/test_os/utils.py new file mode 100644 index 00000000000000..e0c39598c316c8 --- /dev/null +++ b/Lib/test/test_os/utils.py @@ -0,0 +1,3 @@ +def create_file(filename, content=b'content'): + with open(filename, "xb", 0) as fp: + fp.write(content) From 5bc5f5781835f0d06ab2a6f131042721ebf06937 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Wed, 1 Oct 2025 09:01:06 +0200 Subject: [PATCH 3/4] Remove unused imports --- Lib/test/test_os/test_os.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/Lib/test/test_os/test_os.py b/Lib/test/test_os/test_os.py index 07db67f07a8acd..623d05235835c2 100644 --- a/Lib/test/test_os/test_os.py +++ b/Lib/test/test_os/test_os.py @@ -7,7 +7,6 @@ import contextlib import decimal import errno -import fnmatch import fractions import itertools import locale @@ -31,7 +30,6 @@ import uuid import warnings from test import support -from test.support import import_helper from test.support import os_helper from test.support import socket_helper from test.support import infinite_recursion From f3dcf72855290ee789514f1ebcd4666f5d442fe7 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Wed, 1 Oct 2025 16:04:27 +0200 Subject: [PATCH 4/4] Add test_os package to TESTSUBDIRS --- Makefile.pre.in | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile.pre.in b/Makefile.pre.in index eedccc3ffe6a49..6651b093e20c8d 100644 --- a/Makefile.pre.in +++ b/Makefile.pre.in @@ -2682,6 +2682,7 @@ TESTSUBDIRS= idlelib/idle_test \ test/test_multiprocessing_fork \ test/test_multiprocessing_forkserver \ test/test_multiprocessing_spawn \ + test/test_os \ test/test_pathlib \ test/test_pathlib/support \ test/test_peg_generator \