diff --git a/Lib/pathlib/__init__.py b/Lib/pathlib/__init__.py index bc39a30c6538ce..8a892102cc00ea 100644 --- a/Lib/pathlib/__init__.py +++ b/Lib/pathlib/__init__.py @@ -14,7 +14,9 @@ from errno import * from glob import _StringGlobber, _no_recurse_symlinks from itertools import chain -from stat import S_ISDIR, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO +from stat import ( + S_IMODE, S_ISDIR, S_ISREG, S_ISLNK, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO, +) from _collections_abc import Sequence try: @@ -27,10 +29,9 @@ grp = None from pathlib._os import ( - PathInfo, DirEntryInfo, vfsopen, vfspath, ensure_different_files, ensure_distinct_paths, - copyfile2, copyfileobj, copy_info, + copyfile2, copyfileobj, ) @@ -612,6 +613,247 @@ class PureWindowsPath(PurePath): __slots__ = () +class _Info: + __slots__ = ('_path',) + + def __init__(self, path): + self._path = path + + def __repr__(self): + path_type = "WindowsPath" if os.name == "nt" else "PosixPath" + return f"<{path_type}.info>" + + def _stat(self, *, follow_symlinks=True): + """Return the status as an os.stat_result.""" + raise NotImplementedError + + def _posix_permissions(self, *, follow_symlinks=True): + """Return the POSIX file permissions.""" + return S_IMODE(self._stat(follow_symlinks=follow_symlinks).st_mode) + + def _file_id(self, *, follow_symlinks=True): + """Returns the identifier of the file.""" + st = self._stat(follow_symlinks=follow_symlinks) + return st.st_dev, st.st_ino + + def _access_time_ns(self, *, follow_symlinks=True): + """Return the access time in nanoseconds.""" + return self._stat(follow_symlinks=follow_symlinks).st_atime_ns + + def _mod_time_ns(self, *, follow_symlinks=True): + """Return the modify time in nanoseconds.""" + return self._stat(follow_symlinks=follow_symlinks).st_mtime_ns + + if hasattr(os.stat_result, 'st_flags'): + def _bsd_flags(self, *, follow_symlinks=True): + """Return the flags.""" + return self._stat(follow_symlinks=follow_symlinks).st_flags + + if hasattr(os, 'listxattr'): + def _xattrs(self, *, follow_symlinks=True): + """Return the xattrs as a list of (attr, value) pairs, or an empty + list if extended attributes aren't supported.""" + try: + return [ + (attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks)) + for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)] + except OSError as err: + if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES): + raise + return [] + + +_STAT_RESULT_ERROR = [] # falsy sentinel indicating stat() failed. + + +class _StatResultInfo(_Info): + """Implementation of pathlib.types.PathInfo that provides status + information by querying a wrapped os.stat_result object. Don't try to + construct it yourself.""" + __slots__ = ('_stat_result', '_lstat_result') + + def __init__(self, path): + super().__init__(path) + self._stat_result = None + self._lstat_result = None + + def _stat(self, *, follow_symlinks=True): + """Return the status as an os.stat_result.""" + if follow_symlinks: + if not self._stat_result: + try: + self._stat_result = os.stat(self._path) + except (OSError, ValueError): + self._stat_result = _STAT_RESULT_ERROR + raise + return self._stat_result + else: + if not self._lstat_result: + try: + self._lstat_result = os.lstat(self._path) + except (OSError, ValueError): + self._lstat_result = _STAT_RESULT_ERROR + raise + return self._lstat_result + + def exists(self, *, follow_symlinks=True): + """Whether this path exists.""" + if follow_symlinks: + if self._stat_result is _STAT_RESULT_ERROR: + return False + else: + if self._lstat_result is _STAT_RESULT_ERROR: + return False + try: + self._stat(follow_symlinks=follow_symlinks) + except (OSError, ValueError): + return False + return True + + def is_dir(self, *, follow_symlinks=True): + """Whether this path is a directory.""" + if follow_symlinks: + if self._stat_result is _STAT_RESULT_ERROR: + return False + else: + if self._lstat_result is _STAT_RESULT_ERROR: + return False + try: + st = self._stat(follow_symlinks=follow_symlinks) + except (OSError, ValueError): + return False + return S_ISDIR(st.st_mode) + + def is_file(self, *, follow_symlinks=True): + """Whether this path is a regular file.""" + if follow_symlinks: + if self._stat_result is _STAT_RESULT_ERROR: + return False + else: + if self._lstat_result is _STAT_RESULT_ERROR: + return False + try: + st = self._stat(follow_symlinks=follow_symlinks) + except (OSError, ValueError): + return False + return S_ISREG(st.st_mode) + + def is_symlink(self): + """Whether this path is a symbolic link.""" + if self._lstat_result is _STAT_RESULT_ERROR: + return False + try: + st = self._stat(follow_symlinks=False) + except (OSError, ValueError): + return False + return S_ISLNK(st.st_mode) + + +class _DirEntryInfo(_Info): + """Implementation of pathlib.types.PathInfo that provides status + information by querying a wrapped os.DirEntry object. Don't try to + construct it yourself.""" + __slots__ = ('_entry',) + + def __init__(self, entry): + super().__init__(entry.path) + self._entry = entry + + def _stat(self, *, follow_symlinks=True): + """Return the status as an os.stat_result.""" + return self._entry.stat(follow_symlinks=follow_symlinks) + + def exists(self, *, follow_symlinks=True): + """Whether this path exists.""" + if not follow_symlinks: + return True + try: + self._stat(follow_symlinks=follow_symlinks) + except OSError: + return False + return True + + def is_dir(self, *, follow_symlinks=True): + """Whether this path is a directory.""" + try: + return self._entry.is_dir(follow_symlinks=follow_symlinks) + except OSError: + return False + + def is_file(self, *, follow_symlinks=True): + """Whether this path is a regular file.""" + try: + return self._entry.is_file(follow_symlinks=follow_symlinks) + except OSError: + return False + + def is_symlink(self): + """Whether this path is a symbolic link.""" + try: + return self._entry.is_symlink() + except OSError: + return False + + +def _copy_info(info, target, follow_symlinks=True): + """Copy metadata from the given PathInfo to the given local path.""" + copy_times_ns = ( + hasattr(info, '_access_time_ns') and + hasattr(info, '_mod_time_ns') and + (follow_symlinks or os.utime in os.supports_follow_symlinks)) + if copy_times_ns: + t0 = info._access_time_ns(follow_symlinks=follow_symlinks) + t1 = info._mod_time_ns(follow_symlinks=follow_symlinks) + os.utime(target, ns=(t0, t1), follow_symlinks=follow_symlinks) + + # We must copy extended attributes before the file is (potentially) + # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. + copy_xattrs = ( + hasattr(info, '_xattrs') and + hasattr(os, 'setxattr') and + (follow_symlinks or os.setxattr in os.supports_follow_symlinks)) + if copy_xattrs: + xattrs = info._xattrs(follow_symlinks=follow_symlinks) + for attr, value in xattrs: + try: + os.setxattr(target, attr, value, follow_symlinks=follow_symlinks) + except OSError as e: + if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES): + raise + + copy_posix_permissions = ( + hasattr(info, '_posix_permissions') and + (follow_symlinks or os.chmod in os.supports_follow_symlinks)) + if copy_posix_permissions: + posix_permissions = info._posix_permissions(follow_symlinks=follow_symlinks) + try: + os.chmod(target, posix_permissions, follow_symlinks=follow_symlinks) + except NotImplementedError: + # if we got a NotImplementedError, it's because + # * follow_symlinks=False, + # * lchown() is unavailable, and + # * either + # * fchownat() is unavailable or + # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW. + # (it returned ENOSUP.) + # therefore we're out of options--we simply cannot chown the + # symlink. give up, suppress the error. + # (which is what shutil always did in this circumstance.) + pass + + copy_bsd_flags = ( + hasattr(info, '_bsd_flags') and + hasattr(os, 'chflags') and + (follow_symlinks or os.chflags in os.supports_follow_symlinks)) + if copy_bsd_flags: + bsd_flags = info._bsd_flags(follow_symlinks=follow_symlinks) + try: + os.chflags(target, bsd_flags, follow_symlinks=follow_symlinks) + except OSError as why: + if why.errno not in (EOPNOTSUPP, ENOTSUP): + raise + + class Path(PurePath): """PurePath subclass that can make system calls. @@ -637,7 +879,7 @@ def info(self): try: return self._info except AttributeError: - self._info = PathInfo(self) + self._info = _StatResultInfo(str(self)) return self._info def stat(self, *, follow_symlinks=True): @@ -817,7 +1059,7 @@ def _filter_trailing_slash(self, paths): def _from_dir_entry(self, dir_entry, path_str): path = self.with_segments(path_str) path._str = path_str - path._info = DirEntryInfo(dir_entry) + path._info = _DirEntryInfo(dir_entry) return path def iterdir(self): @@ -1123,7 +1365,7 @@ def _copy_from(self, source, follow_symlinks=True, preserve_metadata=False): self.joinpath(child.name)._copy_from( child, follow_symlinks, preserve_metadata) if preserve_metadata: - copy_info(source.info, self) + _copy_info(source.info, self) else: self._copy_from_file(source, preserve_metadata) @@ -1133,7 +1375,7 @@ def _copy_from_file(self, source, preserve_metadata=False): with open(self, 'wb') as target_f: copyfileobj(source_f, target_f) if preserve_metadata: - copy_info(source.info, self) + _copy_info(source.info, self) if copyfile2: # Use fast OS routine for local file copying where available. @@ -1155,12 +1397,12 @@ def _copy_from_file(self, source, preserve_metadata=False): def _copy_from_symlink(self, source, preserve_metadata=False): os.symlink(vfspath(source.readlink()), self, source.info.is_dir()) if preserve_metadata: - copy_info(source.info, self, follow_symlinks=False) + _copy_info(source.info, self, follow_symlinks=False) else: def _copy_from_symlink(self, source, preserve_metadata=False): os.symlink(vfspath(source.readlink()), self) if preserve_metadata: - copy_info(source.info, self, follow_symlinks=False) + _copy_info(source.info, self, follow_symlinks=False) def move(self, target): """ diff --git a/Lib/pathlib/_os.py b/Lib/pathlib/_os.py index 6508a9bca0d72b..79a1969d5f83d6 100644 --- a/Lib/pathlib/_os.py +++ b/Lib/pathlib/_os.py @@ -4,7 +4,6 @@ from errno import * from io import TextIOWrapper, text_encoding -from stat import S_ISDIR, S_ISREG, S_ISLNK, S_IMODE import os import sys try: @@ -302,281 +301,3 @@ def ensure_different_files(source, target): err.filename = vfspath(source) err.filename2 = vfspath(target) raise err - - -def copy_info(info, target, follow_symlinks=True): - """Copy metadata from the given PathInfo to the given local path.""" - copy_times_ns = ( - hasattr(info, '_access_time_ns') and - hasattr(info, '_mod_time_ns') and - (follow_symlinks or os.utime in os.supports_follow_symlinks)) - if copy_times_ns: - t0 = info._access_time_ns(follow_symlinks=follow_symlinks) - t1 = info._mod_time_ns(follow_symlinks=follow_symlinks) - os.utime(target, ns=(t0, t1), follow_symlinks=follow_symlinks) - - # We must copy extended attributes before the file is (potentially) - # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. - copy_xattrs = ( - hasattr(info, '_xattrs') and - hasattr(os, 'setxattr') and - (follow_symlinks or os.setxattr in os.supports_follow_symlinks)) - if copy_xattrs: - xattrs = info._xattrs(follow_symlinks=follow_symlinks) - for attr, value in xattrs: - try: - os.setxattr(target, attr, value, follow_symlinks=follow_symlinks) - except OSError as e: - if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES): - raise - - copy_posix_permissions = ( - hasattr(info, '_posix_permissions') and - (follow_symlinks or os.chmod in os.supports_follow_symlinks)) - if copy_posix_permissions: - posix_permissions = info._posix_permissions(follow_symlinks=follow_symlinks) - try: - os.chmod(target, posix_permissions, follow_symlinks=follow_symlinks) - except NotImplementedError: - # if we got a NotImplementedError, it's because - # * follow_symlinks=False, - # * lchown() is unavailable, and - # * either - # * fchownat() is unavailable or - # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW. - # (it returned ENOSUP.) - # therefore we're out of options--we simply cannot chown the - # symlink. give up, suppress the error. - # (which is what shutil always did in this circumstance.) - pass - - copy_bsd_flags = ( - hasattr(info, '_bsd_flags') and - hasattr(os, 'chflags') and - (follow_symlinks or os.chflags in os.supports_follow_symlinks)) - if copy_bsd_flags: - bsd_flags = info._bsd_flags(follow_symlinks=follow_symlinks) - try: - os.chflags(target, bsd_flags, follow_symlinks=follow_symlinks) - except OSError as why: - if why.errno not in (EOPNOTSUPP, ENOTSUP): - raise - - -class _PathInfoBase: - __slots__ = ('_path', '_stat_result', '_lstat_result') - - def __init__(self, path): - self._path = str(path) - - def __repr__(self): - path_type = "WindowsPath" if os.name == "nt" else "PosixPath" - return f"<{path_type}.info>" - - def _stat(self, *, follow_symlinks=True, ignore_errors=False): - """Return the status as an os.stat_result, or None if stat() fails and - ignore_errors is true.""" - if follow_symlinks: - try: - result = self._stat_result - except AttributeError: - pass - else: - if ignore_errors or result is not None: - return result - try: - self._stat_result = os.stat(self._path) - except (OSError, ValueError): - self._stat_result = None - if not ignore_errors: - raise - return self._stat_result - else: - try: - result = self._lstat_result - except AttributeError: - pass - else: - if ignore_errors or result is not None: - return result - try: - self._lstat_result = os.lstat(self._path) - except (OSError, ValueError): - self._lstat_result = None - if not ignore_errors: - raise - return self._lstat_result - - def _posix_permissions(self, *, follow_symlinks=True): - """Return the POSIX file permissions.""" - return S_IMODE(self._stat(follow_symlinks=follow_symlinks).st_mode) - - def _file_id(self, *, follow_symlinks=True): - """Returns the identifier of the file.""" - st = self._stat(follow_symlinks=follow_symlinks) - return st.st_dev, st.st_ino - - def _access_time_ns(self, *, follow_symlinks=True): - """Return the access time in nanoseconds.""" - return self._stat(follow_symlinks=follow_symlinks).st_atime_ns - - def _mod_time_ns(self, *, follow_symlinks=True): - """Return the modify time in nanoseconds.""" - return self._stat(follow_symlinks=follow_symlinks).st_mtime_ns - - if hasattr(os.stat_result, 'st_flags'): - def _bsd_flags(self, *, follow_symlinks=True): - """Return the flags.""" - return self._stat(follow_symlinks=follow_symlinks).st_flags - - if hasattr(os, 'listxattr'): - def _xattrs(self, *, follow_symlinks=True): - """Return the xattrs as a list of (attr, value) pairs, or an empty - list if extended attributes aren't supported.""" - try: - return [ - (attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks)) - for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)] - except OSError as err: - if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES): - raise - return [] - - -class _WindowsPathInfo(_PathInfoBase): - """Implementation of pathlib.types.PathInfo that provides status - information for Windows paths. Don't try to construct it yourself.""" - __slots__ = ('_exists', '_is_dir', '_is_file', '_is_symlink') - - def exists(self, *, follow_symlinks=True): - """Whether this path exists.""" - if not follow_symlinks and self.is_symlink(): - return True - try: - return self._exists - except AttributeError: - if os.path.exists(self._path): - self._exists = True - return True - else: - self._exists = self._is_dir = self._is_file = False - return False - - def is_dir(self, *, follow_symlinks=True): - """Whether this path is a directory.""" - if not follow_symlinks and self.is_symlink(): - return False - try: - return self._is_dir - except AttributeError: - if os.path.isdir(self._path): - self._is_dir = self._exists = True - return True - else: - self._is_dir = False - return False - - def is_file(self, *, follow_symlinks=True): - """Whether this path is a regular file.""" - if not follow_symlinks and self.is_symlink(): - return False - try: - return self._is_file - except AttributeError: - if os.path.isfile(self._path): - self._is_file = self._exists = True - return True - else: - self._is_file = False - return False - - def is_symlink(self): - """Whether this path is a symbolic link.""" - try: - return self._is_symlink - except AttributeError: - self._is_symlink = os.path.islink(self._path) - return self._is_symlink - - -class _PosixPathInfo(_PathInfoBase): - """Implementation of pathlib.types.PathInfo that provides status - information for POSIX paths. Don't try to construct it yourself.""" - __slots__ = () - - def exists(self, *, follow_symlinks=True): - """Whether this path exists.""" - st = self._stat(follow_symlinks=follow_symlinks, ignore_errors=True) - if st is None: - return False - return True - - def is_dir(self, *, follow_symlinks=True): - """Whether this path is a directory.""" - st = self._stat(follow_symlinks=follow_symlinks, ignore_errors=True) - if st is None: - return False - return S_ISDIR(st.st_mode) - - def is_file(self, *, follow_symlinks=True): - """Whether this path is a regular file.""" - st = self._stat(follow_symlinks=follow_symlinks, ignore_errors=True) - if st is None: - return False - return S_ISREG(st.st_mode) - - def is_symlink(self): - """Whether this path is a symbolic link.""" - st = self._stat(follow_symlinks=False, ignore_errors=True) - if st is None: - return False - return S_ISLNK(st.st_mode) - - -PathInfo = _WindowsPathInfo if os.name == 'nt' else _PosixPathInfo - - -class DirEntryInfo(_PathInfoBase): - """Implementation of pathlib.types.PathInfo that provides status - information by querying a wrapped os.DirEntry object. Don't try to - construct it yourself.""" - __slots__ = ('_entry',) - - def __init__(self, entry): - super().__init__(entry.path) - self._entry = entry - - def _stat(self, *, follow_symlinks=True, ignore_errors=False): - try: - return self._entry.stat(follow_symlinks=follow_symlinks) - except OSError: - if not ignore_errors: - raise - return None - - def exists(self, *, follow_symlinks=True): - """Whether this path exists.""" - if not follow_symlinks: - return True - return self._stat(ignore_errors=True) is not None - - def is_dir(self, *, follow_symlinks=True): - """Whether this path is a directory.""" - try: - return self._entry.is_dir(follow_symlinks=follow_symlinks) - except OSError: - return False - - def is_file(self, *, follow_symlinks=True): - """Whether this path is a regular file.""" - try: - return self._entry.is_file(follow_symlinks=follow_symlinks) - except OSError: - return False - - def is_symlink(self): - """Whether this path is a symbolic link.""" - try: - return self._entry.is_symlink() - except OSError: - return False