Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 251 additions & 9 deletions Lib/pathlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
from errno import *
from glob import _StringGlobber, _no_recurse_symlinks
from itertools import chain
from stat import S_ISDIR, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
from stat import (
S_IMODE, S_ISDIR, S_ISREG, S_ISLNK, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO,
)
from _collections_abc import Sequence

try:
Expand All @@ -27,10 +29,9 @@
grp = None

from pathlib._os import (
PathInfo, DirEntryInfo,
vfsopen, vfspath,
ensure_different_files, ensure_distinct_paths,
copyfile2, copyfileobj, copy_info,
copyfile2, copyfileobj,
)


Expand Down Expand Up @@ -612,6 +613,247 @@ class PureWindowsPath(PurePath):
__slots__ = ()


class _Info:
__slots__ = ('_path',)

def __init__(self, path):
self._path = path

def __repr__(self):
path_type = "WindowsPath" if os.name == "nt" else "PosixPath"
return f"<{path_type}.info>"

def _stat(self, *, follow_symlinks=True):
"""Return the status as an os.stat_result."""
raise NotImplementedError

def _posix_permissions(self, *, follow_symlinks=True):
"""Return the POSIX file permissions."""
return S_IMODE(self._stat(follow_symlinks=follow_symlinks).st_mode)

def _file_id(self, *, follow_symlinks=True):
"""Returns the identifier of the file."""
st = self._stat(follow_symlinks=follow_symlinks)
return st.st_dev, st.st_ino

def _access_time_ns(self, *, follow_symlinks=True):
"""Return the access time in nanoseconds."""
return self._stat(follow_symlinks=follow_symlinks).st_atime_ns

def _mod_time_ns(self, *, follow_symlinks=True):
"""Return the modify time in nanoseconds."""
return self._stat(follow_symlinks=follow_symlinks).st_mtime_ns

if hasattr(os.stat_result, 'st_flags'):
def _bsd_flags(self, *, follow_symlinks=True):
"""Return the flags."""
return self._stat(follow_symlinks=follow_symlinks).st_flags

if hasattr(os, 'listxattr'):
def _xattrs(self, *, follow_symlinks=True):
"""Return the xattrs as a list of (attr, value) pairs, or an empty
list if extended attributes aren't supported."""
try:
return [
(attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks))
for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)]
except OSError as err:
if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
raise
return []


_STAT_RESULT_ERROR = [] # falsy sentinel indicating stat() failed.


class _StatResultInfo(_Info):
"""Implementation of pathlib.types.PathInfo that provides status
information by querying a wrapped os.stat_result object. Don't try to
construct it yourself."""
__slots__ = ('_stat_result', '_lstat_result')

def __init__(self, path):
super().__init__(path)
self._stat_result = None
self._lstat_result = None

def _stat(self, *, follow_symlinks=True):
"""Return the status as an os.stat_result."""
if follow_symlinks:
if not self._stat_result:
try:
self._stat_result = os.stat(self._path)
except (OSError, ValueError):
self._stat_result = _STAT_RESULT_ERROR
raise
return self._stat_result
else:
if not self._lstat_result:
try:
self._lstat_result = os.lstat(self._path)
except (OSError, ValueError):
self._lstat_result = _STAT_RESULT_ERROR
raise
return self._lstat_result

def exists(self, *, follow_symlinks=True):
"""Whether this path exists."""
if follow_symlinks:
if self._stat_result is _STAT_RESULT_ERROR:
return False
else:
if self._lstat_result is _STAT_RESULT_ERROR:
return False
try:
self._stat(follow_symlinks=follow_symlinks)
except (OSError, ValueError):
return False
return True

def is_dir(self, *, follow_symlinks=True):
"""Whether this path is a directory."""
if follow_symlinks:
if self._stat_result is _STAT_RESULT_ERROR:
return False
else:
if self._lstat_result is _STAT_RESULT_ERROR:
return False
try:
st = self._stat(follow_symlinks=follow_symlinks)
except (OSError, ValueError):
return False
return S_ISDIR(st.st_mode)

def is_file(self, *, follow_symlinks=True):
"""Whether this path is a regular file."""
if follow_symlinks:
if self._stat_result is _STAT_RESULT_ERROR:
return False
else:
if self._lstat_result is _STAT_RESULT_ERROR:
return False
try:
st = self._stat(follow_symlinks=follow_symlinks)
except (OSError, ValueError):
return False
return S_ISREG(st.st_mode)

def is_symlink(self):
"""Whether this path is a symbolic link."""
if self._lstat_result is _STAT_RESULT_ERROR:
return False
try:
st = self._stat(follow_symlinks=False)
except (OSError, ValueError):
return False
return S_ISLNK(st.st_mode)


class _DirEntryInfo(_Info):
"""Implementation of pathlib.types.PathInfo that provides status
information by querying a wrapped os.DirEntry object. Don't try to
construct it yourself."""
__slots__ = ('_entry',)

def __init__(self, entry):
super().__init__(entry.path)
self._entry = entry

def _stat(self, *, follow_symlinks=True):
"""Return the status as an os.stat_result."""
return self._entry.stat(follow_symlinks=follow_symlinks)

def exists(self, *, follow_symlinks=True):
"""Whether this path exists."""
if not follow_symlinks:
return True
try:
self._stat(follow_symlinks=follow_symlinks)
except OSError:
return False
return True

def is_dir(self, *, follow_symlinks=True):
"""Whether this path is a directory."""
try:
return self._entry.is_dir(follow_symlinks=follow_symlinks)
except OSError:
return False

def is_file(self, *, follow_symlinks=True):
"""Whether this path is a regular file."""
try:
return self._entry.is_file(follow_symlinks=follow_symlinks)
except OSError:
return False

def is_symlink(self):
"""Whether this path is a symbolic link."""
try:
return self._entry.is_symlink()
except OSError:
return False


def _copy_info(info, target, follow_symlinks=True):
"""Copy metadata from the given PathInfo to the given local path."""
copy_times_ns = (
hasattr(info, '_access_time_ns') and
hasattr(info, '_mod_time_ns') and
(follow_symlinks or os.utime in os.supports_follow_symlinks))
if copy_times_ns:
t0 = info._access_time_ns(follow_symlinks=follow_symlinks)
t1 = info._mod_time_ns(follow_symlinks=follow_symlinks)
os.utime(target, ns=(t0, t1), follow_symlinks=follow_symlinks)

# We must copy extended attributes before the file is (potentially)
# chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
copy_xattrs = (
hasattr(info, '_xattrs') and
hasattr(os, 'setxattr') and
(follow_symlinks or os.setxattr in os.supports_follow_symlinks))
if copy_xattrs:
xattrs = info._xattrs(follow_symlinks=follow_symlinks)
for attr, value in xattrs:
try:
os.setxattr(target, attr, value, follow_symlinks=follow_symlinks)
except OSError as e:
if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
raise

copy_posix_permissions = (
hasattr(info, '_posix_permissions') and
(follow_symlinks or os.chmod in os.supports_follow_symlinks))
if copy_posix_permissions:
posix_permissions = info._posix_permissions(follow_symlinks=follow_symlinks)
try:
os.chmod(target, posix_permissions, follow_symlinks=follow_symlinks)
except NotImplementedError:
# if we got a NotImplementedError, it's because
# * follow_symlinks=False,
# * lchown() is unavailable, and
# * either
# * fchownat() is unavailable or
# * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
# (it returned ENOSUP.)
# therefore we're out of options--we simply cannot chown the
# symlink. give up, suppress the error.
# (which is what shutil always did in this circumstance.)
pass

copy_bsd_flags = (
hasattr(info, '_bsd_flags') and
hasattr(os, 'chflags') and
(follow_symlinks or os.chflags in os.supports_follow_symlinks))
if copy_bsd_flags:
bsd_flags = info._bsd_flags(follow_symlinks=follow_symlinks)
try:
os.chflags(target, bsd_flags, follow_symlinks=follow_symlinks)
except OSError as why:
if why.errno not in (EOPNOTSUPP, ENOTSUP):
raise


class Path(PurePath):
"""PurePath subclass that can make system calls.

Expand All @@ -637,7 +879,7 @@ def info(self):
try:
return self._info
except AttributeError:
self._info = PathInfo(self)
self._info = _StatResultInfo(str(self))
return self._info

def stat(self, *, follow_symlinks=True):
Expand Down Expand Up @@ -817,7 +1059,7 @@ def _filter_trailing_slash(self, paths):
def _from_dir_entry(self, dir_entry, path_str):
path = self.with_segments(path_str)
path._str = path_str
path._info = DirEntryInfo(dir_entry)
path._info = _DirEntryInfo(dir_entry)
return path

def iterdir(self):
Expand Down Expand Up @@ -1123,7 +1365,7 @@ def _copy_from(self, source, follow_symlinks=True, preserve_metadata=False):
self.joinpath(child.name)._copy_from(
child, follow_symlinks, preserve_metadata)
if preserve_metadata:
copy_info(source.info, self)
_copy_info(source.info, self)
else:
self._copy_from_file(source, preserve_metadata)

Expand All @@ -1133,7 +1375,7 @@ def _copy_from_file(self, source, preserve_metadata=False):
with open(self, 'wb') as target_f:
copyfileobj(source_f, target_f)
if preserve_metadata:
copy_info(source.info, self)
_copy_info(source.info, self)

if copyfile2:
# Use fast OS routine for local file copying where available.
Expand All @@ -1155,12 +1397,12 @@ def _copy_from_file(self, source, preserve_metadata=False):
def _copy_from_symlink(self, source, preserve_metadata=False):
os.symlink(vfspath(source.readlink()), self, source.info.is_dir())
if preserve_metadata:
copy_info(source.info, self, follow_symlinks=False)
_copy_info(source.info, self, follow_symlinks=False)
else:
def _copy_from_symlink(self, source, preserve_metadata=False):
os.symlink(vfspath(source.readlink()), self)
if preserve_metadata:
copy_info(source.info, self, follow_symlinks=False)
_copy_info(source.info, self, follow_symlinks=False)

def move(self, target):
"""
Expand Down
Loading
Loading