diff --git a/README.rst b/README.rst index df8b21b..e25d6e6 100644 --- a/README.rst +++ b/README.rst @@ -56,7 +56,7 @@ Use fs.open_fs to open a filesystem with a FAT `FS URL None: + mode = DefaultModeReadOnly()) -> None: """Wrap basic I/O operations for PyFat. **Currently read-only**. :param fs: `PyFat`: Instance of opened filesystem diff --git a/pyfatfs/PyFat.py b/pyfatfs/PyFat.py index 87da38a..03f5dde 100755 --- a/pyfatfs/PyFat.py +++ b/pyfatfs/PyFat.py @@ -845,13 +845,6 @@ def close(self): self.__fp.close() self.initialized = False - def __del__(self): - """Try to close open handles.""" - try: - self.close() - except PyFATException: - pass - def __determine_fat_type(self) -> Union["PyFat.FAT_TYPE_FAT12", "PyFat.FAT_TYPE_FAT16", "PyFat.FAT_TYPE_FAT32"]: diff --git a/pyfatfs/PyFatFS.py b/pyfatfs/PyFatFS.py index 39c5381..eaf8ecc 100644 --- a/pyfatfs/PyFatFS.py +++ b/pyfatfs/PyFatFS.py @@ -5,30 +5,29 @@ import posixpath import errno from copy import copy -from io import BytesIO, IOBase +from io import BytesIO, IOBase, TextIOWrapper from typing import Union - -from fs.base import FS -from fs.mode import Mode -from fs.path import split, normpath -from fs.permissions import Permissions -from fs.info import Info -from fs.errors import DirectoryExpected, DirectoryExists, \ - ResourceNotFound, FileExpected, DirectoryNotEmpty, RemoveRootError, \ +from shutil import copyfileobj + +from fsspec import AbstractFileSystem +from pyfatfs.mode import Mode +from pyfatfs.path import split, normpath +from pyfatfs.permissions import Permissions +from pyfatfs.info import Info +from pyfatfs.errors import DirectoryExpected, \ + FileExpected, DirectoryNotEmpty, RemoveRootError, \ FileExists -from fs import ResourceType -from fs.subfs import SubFS +from pyfatfs._exceptions import PyFATException from pyfatfs import FAT_OEM_ENCODING from pyfatfs.DosDateTime import DosDateTime from pyfatfs.PyFat import PyFat from pyfatfs.FATDirectoryEntry import FATDirectoryEntry, make_lfn_entry -from pyfatfs._exceptions import PyFATException from pyfatfs.FatIO import FatIO from pyfatfs.EightDotThree import EightDotThree -class PyFatFS(FS): +class PyFatFS(AbstractFileSystem): """PyFilesystem2 extension.""" def __init__(self, filename: str, encoding: str = FAT_OEM_ENCODING, @@ -65,16 +64,14 @@ def __init__(self, filename: str, encoding: str = FAT_OEM_ENCODING, self.tz = datetime.datetime.now(datetime.timezone.utc) self.tz = self.tz.astimezone().tzinfo - def close(self): + def __del__(self): """Clean up open handles.""" try: self.fs.close() - except PyFATException: + except OSError: # Ignore if filesystem is already closed pass - super(PyFatFS, self).close() - def exists(self, path: str): """Verify if given path exists on filesystem. @@ -83,25 +80,26 @@ def exists(self, path: str): """ try: self.fs.root_dir.get_entry(path) - except PyFATException as e: + except OSError as e: if e.errno == errno.ENOENT: return False raise e return True - def getinfo(self, path: str, namespaces=None): - """Generate PyFilesystem2's `Info` struct. - - :param path: Path to file or directory on filesystem - :param namespaces: Info namespaces to query, `NotImplemented` - :returns: `Info` - """ + def info(self, path, **kwargs): + _type = self._gettype(path) + if _type[0] == "d": + # dont call self._getsize + return {"name": path, "size": 0, "type": _type} + return {"name": path, "size": self._getsize(path), "type": _type} + # TODO maybe use some of this code + # TODO remove dead code try: entry = self.fs.root_dir.get_entry(path) - except PyFATException as e: + except OSError as e: if e.errno in [errno.ENOTDIR, errno.ENOENT]: - raise ResourceNotFound(path) + raise FileNotFoundError(path) raise e info = {"basic": {"name": repr(entry), @@ -111,7 +109,7 @@ def getinfo(self, path: str, namespaces=None): "metadata_changed": None, "modified": entry.get_mtime().timestamp(), "size": entry.filesize, - "type": self.gettype(path)}} + "type": self._gettype(path)}} return Info(info) def getmeta(self, namespace=u'standard'): @@ -132,7 +130,7 @@ def getmeta(self, namespace=u'standard'): "unicode_paths": self.fs.encoding.lower().startswith('utf'), "supports_rename": True} - def getsize(self, path: str): + def _getsize(self, path: str): """Get size of file in bytes. :param path: Path to file or directory on filesystem @@ -140,25 +138,25 @@ def getsize(self, path: str): """ try: entry = self.fs.root_dir.get_entry(path) - except PyFATException as e: + except OSError as e: if e.errno == errno.ENOENT: - raise ResourceNotFound(path) + raise FileNotFoundError(path) raise e return entry.filesize - def gettype(self, path: str): - """Get type of file as `ResourceType`. + def _gettype(self, path: str) -> str: + """Get type of file as `str`. :param path: Path to file or directory on filesystem - :returns: `ResourceType.directory` or `ResourceType.file` + :returns: `"directory"` or `"file"` """ entry = self.fs.root_dir.get_entry(path) if entry.is_directory(): - return ResourceType.directory + return "directory" - return ResourceType.file + return "file" - def listdir(self, path: str): + def ls(self, path, detail=True, **kwargs): """List contents of given directory entry. :param path: Path to directory on filesystem @@ -166,13 +164,33 @@ def listdir(self, path: str): dir_entry = self._get_dir_entry(path) try: dirs, files, _ = dir_entry.get_entries() - except PyFATException as e: + except (OSError, PyFATException) as e: + # PyFATException: Cannot get entries of this entry, as it is not a directory. if e.errno == errno.ENOTDIR: - raise DirectoryExpected(path) + raise NotADirectoryError(path) raise e - return [str(e) for e in dirs+files] - - def create(self, path: str, wipe: bool = False) -> bool: + if not detail: + return [str(e) for e in dirs+files] + # no. we already have all infos + # return list(map(self.info, dirs+files)) + path = normpath(path) + path_prefix = "" if (path == "/") else (path + "/") + infos = [] + for entry in dirs: + infos.append({ + "name": path_prefix + str(entry), + "type": "directory", + "size": 0, + }) + for entry in files: + infos.append({ + "name": path_prefix + str(entry), + "type": "file", + "size": entry.filesize, + }) + return infos + + def _create(self, path: str, wipe: bool = False) -> bool: """Create a new file. :param path: Path of new file on filesystem @@ -182,19 +200,20 @@ def create(self, path: str, wipe: bool = False) -> bool: dirname = path.split("/")[-1] # Plausibility checks - try: - self.opendir(basename) - except DirectoryExpected: - raise ResourceNotFound(path) + # FIXME AttributeError: 'PyFatBytesIOFS' object has no attribute 'opendir' + # try: + # self.opendir(basename) + # except DirectoryExpected: + # raise FileNotFoundError(path) base = self._get_dir_entry(basename) try: dentry = self._get_dir_entry(path) - except ResourceNotFound: + except FileNotFoundError: pass else: if dentry.is_directory(): - raise FileExpected(path) + raise FileExistsError(path) if not wipe: return False else: @@ -231,34 +250,42 @@ def create(self, path: str, wipe: bool = False) -> bool: self.fs.flush_fat() return True - def makedir(self, path: str, permissions: Permissions = None, - recreate: bool = False): - """Create directory on filesystem. - - :param path: Path of new directory on filesystem - :param permissions: Currently not implemented - :param recreate: Ignore if directory already exists + def mkdir(self, path, create_parents=True, **kwargs): + """ + Create directory entry at path + + For systems that don't have true directories, may create an for + this instance only and not touch the real filesystem + + Parameters + ---------- + path: str + location + create_parents: bool + if True, this is equivalent to ``makedirs`` + kwargs: + may be permissions, etc. """ + # TODO handle create_parents + # TODO handle kwargs["permissions"] path = normpath(path) base = split(path)[0] dirname = split(path)[1] # Plausibility checks - try: - self.opendir(base) - except DirectoryExpected: - raise ResourceNotFound(path) + # FIXME AttributeError: 'PyFatBytesIOFS' object has no attribute 'opendir' + # try: + # self.opendir(base) + # except DirectoryExpected: + # raise FileNotFoundError(base) base = self._get_dir_entry(base) try: dentry = self._get_dir_entry(path) - except ResourceNotFound: + except FileNotFoundError: pass else: - if not recreate or not dentry.is_directory(): - raise DirectoryExists(path) - else: - return SubFS(self, path) + raise FileExistsError(path) parent_is_root = base == self.fs.root_dir @@ -310,7 +337,34 @@ def makedir(self, path: str, permissions: Permissions = None, # Flush FAT(s) to disk self.fs.flush_fat() - return SubFS(self, path) + def makedirs(self, path, exist_ok=False): + """Recursively make directories + + Creates directory at path and any intervening required directories. + Raises exception if, for instance, the path already exists but is a + file. + + Parameters + ---------- + path: str + leaf directory name + exist_ok: bool (False) + If False, will error if the target already exists + """ + path = normpath(path) + parts = path.split("/") + last_part_exists = False + for num_parts in range(1, len(parts) + 1): + _path = "/".join(parts[:num_parts]) + if _path == "": continue + try: + self.mkdir(_path) + last_part_exists = False + except (FileExistsError, PyFATException): + # PyFATException: Directory entry is already 8.3 conform + last_part_exists = True + if not exist_ok and last_part_exists: + raise FileExistsError(path) def removedir(self, path: str): """Remove empty directories from the filesystem. @@ -320,7 +374,7 @@ def removedir(self, path: str): dir_entry = self._get_dir_entry(path) try: base = dir_entry.get_parent_dir() - except PyFATException as e: + except OSError as e: if e.errno == errno.ENOENT: # Don't remove root directory raise RemoveRootError(path) @@ -330,7 +384,7 @@ def removedir(self, path: str): try: if not dir_entry.is_empty(): raise DirectoryNotEmpty(path) - except PyFATException as e: + except OSError as e: if e.errno == errno.ENOTDIR: raise DirectoryExpected(path) @@ -359,7 +413,7 @@ def removetree(self, dir_path: str): except RemoveRootError: pass - def remove(self, path: str): + def _rm_file_check(self, path: str): """Remove a file from the filesystem. :param path: `str`: Path of file to remove @@ -373,6 +427,19 @@ def remove(self, path: str): base = dir_entry.get_parent_dir() self._remove(base, dir_entry) + def rm_file(self, path): + """Remove a file from the filesystem. + + :param path: `str`: Path of file to remove + """ + dir_entry = self._get_dir_entry(path) + try: + base = dir_entry.get_parent_dir() + except PyFATException: + # Cannot query parent directory of root directory + return + self._remove(base, dir_entry) + def _remove(self, parent_dir: FATDirectoryEntry, dir_entry: FATDirectoryEntry): """Remove directory entry regardless of type (dir or file). @@ -385,7 +452,7 @@ def _remove(self, parent_dir: FATDirectoryEntry, :param parent_dir: ``FATDirectoryEntry``: Parent directory :param dir_entry: ``FATDirectoryEntry``: Directory entry to remove - :raises PyFATException: ``ENOENT`` if given dir entry does not exist + :raises OSError: ``ENOENT`` if given dir entry does not exist in ``parent_dir`` """ # Remove entry from parent directory @@ -403,7 +470,8 @@ def _remove(self, parent_dir: FATDirectoryEntry, self.fs.free_cluster_chain(dir_entry.get_cluster()) del dir_entry - def openbin(self, path: str, mode: str = "r", + # def openbin(self, path: str, mode: str = "r", + def open(self, path: str, mode: str = "r", buffering: int = -1, **options): """Open file from filesystem. @@ -412,29 +480,54 @@ def openbin(self, path: str, mode: str = "r", :param buffering: TBD :returns: `BinaryIO` stream """ - path = self.validatepath(path) - mode = Mode(mode + 'b') + # FIXME restore validatepath? + if "\0" in path: + raise ValueError("embedded null byte") + # path = self.validatepath(path) + # FIXME handle text mode + # mode = Mode(mode + 'b') + mode = Mode(mode) if mode.create: if mode.exclusive: try: - self.getinfo(path) - except ResourceNotFound: + self.info(path) + except FileNotFoundError: pass else: raise FileExists(path) - self.create(path) - if "t" in mode: - raise ValueError('Text-mode not allowed in openbin') + self._create(path) + # if "t" in mode: + # raise ValueError('Text-mode not allowed in openbin') try: - info = self.getinfo(path) - except ResourceNotFound: - raise ResourceNotFound(path) + info = self.info(path) + except FileNotFoundError: + raise FileNotFoundError(path) else: - if info.is_dir: + if info["type"][0] == "d": raise FileExpected(path) - return FatIO(self.fs, path, mode) + # return FatIO(self.fs, path, mode) + _io = FatIO(self.fs, path, mode) + if not "b" in mode: + return TextIOWrapper(_io) + return _io + + def cp_file(self, path1, path2, **kwargs): + # FIXME handle path2 exists + # https://github.com/fsspec/filesystem_spec/issues/909#issuecomment-1204212507 + # copy from one filesystem to the other + # fs.copy expects dir2 to exist + dir2 = split(path2)[0] + if dir2: + self.makedirs(dir2, exist_ok=True) + if self.isdir(path1): + return self.mkdir(path2) + with ( + self.open(path1, "rb") as f1, + self.open(path2, "wb") as f2 + ): + copyfileobj(f1, f2) def _get_dir_entry(self, path: str) -> FATDirectoryEntry: """Get a filesystem object for a path. @@ -442,30 +535,17 @@ def _get_dir_entry(self, path: str) -> FATDirectoryEntry: :param path: `str`: Path on the filesystem :returns: `FATDirectoryEntry` """ - _path = normpath(self.validatepath(path)) + # _path = normpath(self.validatepath(path)) + _path = normpath(path) try: dir_entry = self.fs.root_dir.get_entry(_path) - except PyFATException as e: + except OSError as e: if e.errno == errno.ENOENT: - raise ResourceNotFound(path) + raise FileNotFoundError(path) raise e return dir_entry - def opendir(self, path: str, factory=None) -> SubFS: - """Get a filesystem object for a sub-directory. - - :param path: str: Path to a directory on the filesystem. - """ - factory = factory or self.subfs_class or SubFS - - dir_entry = self._get_dir_entry(path) - - if not dir_entry.is_directory(): - raise DirectoryExpected(path) - - return factory(self, path) - def setinfo(self, path: str, info): """Set file meta information such as timestamps.""" details = info.get('details', {}) diff --git a/pyfatfs/PyFatFSOpener.py b/pyfatfs/PyFatFSOpener.py index c1cc92a..b47dcc1 100644 --- a/pyfatfs/PyFatFSOpener.py +++ b/pyfatfs/PyFatFSOpener.py @@ -4,11 +4,11 @@ import warnings from typing import get_type_hints -from fs.opener.parse import ParseResult +from fsspec.opener.parse import ParseResult __all__ = ['PyFatFSOpener'] -from fs.opener import Opener +from fsspec.opener import Opener from pyfatfs.PyFatFS import PyFatFS diff --git a/pyfatfs/_typing.py b/pyfatfs/_typing.py new file mode 100644 index 0000000..0c80b8e --- /dev/null +++ b/pyfatfs/_typing.py @@ -0,0 +1,22 @@ +""" +Typing objects missing from Python3.5.1 + +""" +import sys + +import six + +_PY = sys.version_info + +from typing import overload # type: ignore + +if _PY.major == 3 and _PY.minor == 5 and _PY.micro in (0, 1): + + def overload(func): # pragma: no cover # noqa: F811 + return func + + +try: + from typing import Text +except ImportError: # pragma: no cover + Text = six.text_type # type: ignore diff --git a/pyfatfs/enums.py b/pyfatfs/enums.py new file mode 100644 index 0000000..adc288d --- /dev/null +++ b/pyfatfs/enums.py @@ -0,0 +1,56 @@ +"""Enums used by PyFilesystem. +""" + +from __future__ import absolute_import, unicode_literals + +import os +from enum import IntEnum, unique + + +@unique +class ResourceType(IntEnum): + """Resource Types. + + Positive values are reserved, negative values are implementation + dependent. + + Most filesystems will support only directory(1) and file(2). Other + types exist to identify more exotic resource types supported + by Linux filesystems. + + """ + + #: Unknown resource type, used if the filesystem is unable to + #: tell what the resource is. + unknown = 0 + #: A directory. + directory = 1 + #: A simple file. + file = 2 + #: A character file. + character = 3 + #: A block special file. + block_special_file = 4 + #: A first in first out file. + fifo = 5 + #: A socket. + socket = 6 + #: A symlink. + symlink = 7 + + +@unique +class Seek(IntEnum): + """Constants used by `io.IOBase.seek`. + + These match `os.SEEK_CUR`, `os.SEEK_END`, and `os.SEEK_SET` + from the standard library. + + """ + + #: Seek from the current file position. + current = os.SEEK_CUR + #: Seek from the end of the file. + end = os.SEEK_END + #: Seek from the start of the file. + set = os.SEEK_SET diff --git a/pyfatfs/errors.py b/pyfatfs/errors.py new file mode 100644 index 0000000..adc9afa --- /dev/null +++ b/pyfatfs/errors.py @@ -0,0 +1,376 @@ +"""Exception classes thrown by filesystem operations. + +Errors relating to the underlying filesystem are translated in +to one of the following exceptions. + +All Exception classes are derived from `~fs.errors.FSError` +which may be used as a catch-all filesystem exception. + +""" + +from __future__ import print_function, unicode_literals + +import typing + +import functools +import six +from six import text_type + +if typing.TYPE_CHECKING: + from typing import Optional, Text + + +__all__ = [ + "BulkCopyFailed", + "CreateFailed", + "DestinationExists", + "DirectoryExists", + "DirectoryExpected", + "DirectoryNotEmpty", + "FileExists", + "FileExpected", + "FilesystemClosed", + "FSError", + "IllegalBackReference", + "IllegalDestination", + "InsufficientStorage", + "InvalidCharsInPath", + "InvalidPath", + "MissingInfoNamespace", + "NoSysPath", + "NoURL", + "OperationFailed", + "OperationTimeout", + "PathError", + "PatternError", + "PermissionDenied", + "RemoteConnectionError", + "RemoveRootError", + "ResourceError", + "ResourceInvalid", + "ResourceLocked", + "ResourceNotFound", + "ResourceReadOnly", + "Unsupported", + "UnsupportedHash", +] + + +class MissingInfoNamespace(AttributeError): + """An expected namespace is missing.""" + + def __init__(self, namespace): # noqa: D107 + # type: (Text) -> None + self.namespace = namespace + msg = "namespace '{}' is required for this attribute" + super(MissingInfoNamespace, self).__init__(msg.format(namespace)) + + def __reduce__(self): + return type(self), (self.namespace,) + + +@six.python_2_unicode_compatible +class FSError(Exception): + """Base exception for the `fs` module.""" + + default_message = "Unspecified error" + + def __init__(self, msg=None): # noqa: D107 + # type: (Optional[Text]) -> None + self._msg = msg or self.default_message + super(FSError, self).__init__() + + def __str__(self): + # type: () -> Text + """Return the error message.""" + msg = self._msg.format(**self.__dict__) + return msg + + def __repr__(self): + # type: () -> Text + msg = self._msg.format(**self.__dict__) + return "{}({!r})".format(self.__class__.__name__, msg) + + +class FilesystemClosed(FSError): + """Attempt to use a closed filesystem.""" + + default_message = "attempt to use closed filesystem" + + +class BulkCopyFailed(FSError): + """A copy operation failed in worker threads.""" + + default_message = "One or more copy operations failed (see errors attribute)" + + def __init__(self, errors): # noqa: D107 + self.errors = errors + super(BulkCopyFailed, self).__init__() + + +class CreateFailed(FSError): + """Filesystem could not be created.""" + + default_message = "unable to create filesystem, {details}" + + def __init__(self, msg=None, exc=None): # noqa: D107 + # type: (Optional[Text], Optional[Exception]) -> None + self._msg = msg or self.default_message + self.details = "" if exc is None else text_type(exc) + self.exc = exc + + @classmethod + def catch_all(cls, func): + @functools.wraps(func) + def new_func(*args, **kwargs): + try: + return func(*args, **kwargs) + except cls: + raise + except Exception as e: + raise cls(exc=e) + + return new_func # type: ignore + + def __reduce__(self): + return type(self), (self._msg, self.exc) + + +class PathError(FSError): + """Base exception for errors to do with a path string.""" + + default_message = "path '{path}' is invalid" + + def __init__(self, path, msg=None, exc=None): # noqa: D107 + # type: (Text, Optional[Text], Optional[Exception]) -> None + self.path = path + self.exc = exc + super(PathError, self).__init__(msg=msg) + + def __reduce__(self): + return type(self), (self.path, self._msg, self.exc) + + +class NoSysPath(PathError): + """The filesystem does not provide *sys paths* to the resource.""" + + default_message = "path '{path}' does not map to the local filesystem" + + +class NoURL(PathError): + """The filesystem does not provide an URL for the resource.""" + + default_message = "path '{path}' has no '{purpose}' URL" + + def __init__(self, path, purpose, msg=None): # noqa: D107 + # type: (Text, Text, Optional[Text]) -> None + self.purpose = purpose + super(NoURL, self).__init__(path, msg=msg) + + def __reduce__(self): + return type(self), (self.path, self.purpose, self._msg) + + +class InvalidPath(PathError): + """Path can't be mapped on to the underlaying filesystem.""" + + default_message = "path '{path}' is invalid on this filesystem " + + +class InvalidCharsInPath(InvalidPath): + """Path contains characters that are invalid on this filesystem.""" + + default_message = "path '{path}' contains invalid characters" + + +class OperationFailed(FSError): + """A specific operation failed.""" + + default_message = "operation failed, {details}" + + def __init__( + self, + path=None, # type: Optional[Text] + exc=None, # type: Optional[Exception] + msg=None, # type: Optional[Text] + ): # noqa: D107 + # type: (...) -> None + self.path = path + self.exc = exc + self.details = "" if exc is None else text_type(exc) + self.errno = getattr(exc, "errno", None) + super(OperationFailed, self).__init__(msg=msg) + + def __reduce__(self): + return type(self), (self.path, self.exc, self._msg) + + +class Unsupported(OperationFailed): + """Operation not supported by the filesystem.""" + + default_message = "not supported" + + +class RemoteConnectionError(OperationFailed): + """Operations encountered remote connection trouble.""" + + default_message = "remote connection error" + + +class InsufficientStorage(OperationFailed): + """Storage is insufficient for requested operation.""" + + default_message = "insufficient storage space" + + +class PermissionDenied(OperationFailed): + """Not enough permissions.""" + + default_message = "permission denied" + + +class OperationTimeout(OperationFailed): + """Filesystem took too long.""" + + default_message = "operation timed out" + + +class RemoveRootError(OperationFailed): + """Attempt to remove the root directory.""" + + default_message = "root directory may not be removed" + + +class IllegalDestination(OperationFailed): + """The given destination cannot be used for the operation. + + This error will occur when attempting to move / copy a folder into itself or copying + a file onto itself. + """ + + default_message = "'{path}' is not a legal destination" + + +class ResourceError(FSError): + """Base exception class for error associated with a specific resource.""" + + default_message = "failed on path {path}" + + def __init__(self, path, exc=None, msg=None): # noqa: D107 + # type: (Text, Optional[Exception], Optional[Text]) -> None + self.path = path + self.exc = exc + super(ResourceError, self).__init__(msg=msg) + + def __reduce__(self): + return type(self), (self.path, self.exc, self._msg) + + +class ResourceNotFound(ResourceError): + """Required resource not found.""" + + default_message = "resource '{path}' not found" + + +class ResourceInvalid(ResourceError): + """Resource has the wrong type.""" + + default_message = "resource '{path}' is invalid for this operation" + + +class FileExists(ResourceError): + """File already exists.""" + + default_message = "resource '{path}' exists" + + +class FileExpected(ResourceInvalid): + """Operation only works on files.""" + + default_message = "path '{path}' should be a file" + + +class DirectoryExpected(ResourceInvalid): + """Operation only works on directories.""" + + default_message = "path '{path}' should be a directory" + + +class DestinationExists(ResourceError): + """Target destination already exists.""" + + default_message = "destination '{path}' exists" + + +class DirectoryExists(ResourceError): + """Directory already exists.""" + + default_message = "directory '{path}' exists" + + +class DirectoryNotEmpty(ResourceError): + """Attempt to remove a non-empty directory.""" + + default_message = "directory '{path}' is not empty" + + +class ResourceLocked(ResourceError): + """Attempt to use a locked resource.""" + + default_message = "resource '{path}' is locked" + + +class ResourceReadOnly(ResourceError): + """Attempting to modify a read-only resource.""" + + default_message = "resource '{path}' is read only" + + +class IllegalBackReference(ValueError): + """Too many backrefs exist in a path. + + This error will occur if the back references in a path would be + outside of the root. For example, ``"/foo/../../"``, contains two back + references which would reference a directory above the root. + + Note: + This exception is a subclass of `ValueError` as it is not + strictly speaking an issue with a filesystem or resource. + + """ + + def __init__(self, path): # noqa: D107 + # type: (Text) -> None + self.path = path + msg = ("path '{path}' contains back-references outside of filesystem").format( + path=path + ) + super(IllegalBackReference, self).__init__(msg) + + def __reduce__(self): + return type(self), (self.path,) + + +class UnsupportedHash(ValueError): + """The requested hash algorithm is not supported. + + This exception will be thrown if a hash algorithm is requested that is + not supported by hashlib. + + """ + + +class PatternError(ValueError): + """A string pattern with invalid syntax was given.""" + + default_message = "pattern '{pattern}' is invalid at position {position}" + + def __init__(self, pattern, position, exc=None, msg=None): # noqa: D107 + # type: (Text, int, Optional[Exception], Optional[Text]) -> None + self.pattern = pattern + self.position = position + self.exc = exc + super(ValueError, self).__init__() + + def __reduce__(self): + return type(self), (self.path, self.position, self.exc, self._msg) diff --git a/pyfatfs/info.py b/pyfatfs/info.py new file mode 100644 index 0000000..21bb149 --- /dev/null +++ b/pyfatfs/info.py @@ -0,0 +1,465 @@ +"""Container for filesystem resource informations. +""" + +from __future__ import absolute_import, print_function, unicode_literals + +import typing +from typing import cast + +import six +from copy import deepcopy + +from ._typing import Text, overload +from .enums import ResourceType +from .errors import MissingInfoNamespace +from .path import join +from .permissions import Permissions +from .time import epoch_to_datetime + +if typing.TYPE_CHECKING: + from typing import Any, Callable, List, Mapping, Optional, Union + + from datetime import datetime + + RawInfo = Mapping[Text, Mapping[Text, object]] + ToDatetime = Callable[[int], datetime] + T = typing.TypeVar("T") + + +@six.python_2_unicode_compatible +class Info(object): + """Container for :ref:`info`. + + Resource information is returned by the following methods: + + * `~fs.base.FS.getinfo` + * `~fs.base.FS.scandir` + * `~fs.base.FS.filterdir` + + Arguments: + raw_info (dict): A dict containing resource info. + to_datetime (callable): A callable that converts an + epoch time to a datetime object. The default uses + `~fs.time.epoch_to_datetime`. + + """ + + __slots__ = ["raw", "_to_datetime", "namespaces"] + + def __init__(self, raw_info, to_datetime=epoch_to_datetime): + # type: (RawInfo, ToDatetime) -> None + """Create a resource info object from a raw info dict.""" + self.raw = raw_info + self._to_datetime = to_datetime + self.namespaces = frozenset(self.raw.keys()) + + def __str__(self): + # type: () -> str + if self.is_dir: + return "".format(self.name) + else: + return "".format(self.name) + + __repr__ = __str__ + + def __eq__(self, other): + # type: (object) -> bool + return self.raw == getattr(other, "raw", None) + + @overload + def _make_datetime(self, t): + # type: (None) -> None + pass + + @overload + def _make_datetime(self, t): # noqa: F811 + # type: (int) -> datetime + pass + + def _make_datetime(self, t): # noqa: F811 + # type: (Optional[int]) -> Optional[datetime] + if t is not None: + return self._to_datetime(t) + else: + return None + + @overload + def get(self, namespace, key): + # type: (Text, Text) -> Any + pass + + @overload # noqa: F811 + def get(self, namespace, key, default): # noqa: F811 + # type: (Text, Text, T) -> Union[Any, T] + pass + + def get(self, namespace, key, default=None): # noqa: F811 + # type: (Text, Text, Optional[Any]) -> Optional[Any] + """Get a raw info value. + + Arguments: + namespace (str): A namespace identifier. + key (str): A key within the namespace. + default (object, optional): A default value to return + if either the namespace or the key within the namespace + is not found. + + Example: + >>> info = my_fs.getinfo("foo.py", namespaces=["details"]) + >>> info.get('details', 'type') + 2 + + """ + try: + return self.raw[namespace].get(key, default) # type: ignore + except KeyError: + return default + + def _require_namespace(self, namespace): + # type: (Text) -> None + """Check if the given namespace is present in the info. + + Raises: + ~fs.errors.MissingInfoNamespace: if the given namespace is not + present in the info. + + """ + if namespace not in self.raw: + raise MissingInfoNamespace(namespace) + + def is_writeable(self, namespace, key): + # type: (Text, Text) -> bool + """Check if a given key in a namespace is writable. + + When creating an `Info` object, you can add a ``_write`` key to + each raw namespace that lists which keys are writable or not. + + In general, this means they are compatible with the `setinfo` + function of filesystem objects. + + Arguments: + namespace (str): A namespace identifier. + key (str): A key within the namespace. + + Returns: + bool: `True` if the key can be modified, `False` otherwise. + + Example: + Create an `Info` object that marks only the ``modified`` key + as writable in the ``details`` namespace:: + + >>> now = time.time() + >>> info = Info({ + ... "basic": {"name": "foo", "is_dir": False}, + ... "details": { + ... "modified": now, + ... "created": now, + ... "_write": ["modified"], + ... } + ... }) + >>> info.is_writeable("details", "created") + False + >>> info.is_writeable("details", "modified") + True + + """ + _writeable = self.get(namespace, "_write", ()) + return key in _writeable + + def has_namespace(self, namespace): + # type: (Text) -> bool + """Check if the resource info contains a given namespace. + + Arguments: + namespace (str): A namespace identifier. + + Returns: + bool: `True` if the namespace was found, `False` otherwise. + + """ + return namespace in self.raw + + def copy(self, to_datetime=None): + # type: (Optional[ToDatetime]) -> Info + """Create a copy of this resource info object.""" + return Info(deepcopy(self.raw), to_datetime=to_datetime or self._to_datetime) + + def make_path(self, dir_path): + # type: (Text) -> Text + """Make a path by joining ``dir_path`` with the resource name. + + Arguments: + dir_path (str): A path to a directory. + + Returns: + str: A path to the resource. + + """ + return join(dir_path, self.name) + + @property + def name(self): + # type: () -> Text + """`str`: the resource name.""" + return cast(Text, self.get("basic", "name")) + + @property + def suffix(self): + # type: () -> Text + """`str`: the last component of the name (with dot). + + In case there is no suffix, an empty string is returned. + + Example: + >>> info = my_fs.getinfo("foo.py") + >>> info.suffix + '.py' + >>> info2 = my_fs.getinfo("bar") + >>> info2.suffix + '' + + """ + name = self.get("basic", "name") + if name.startswith(".") and name.count(".") == 1: + return "" + basename, dot, ext = name.rpartition(".") + return "." + ext if dot else "" + + @property + def suffixes(self): + # type: () -> List[Text] + """`List`: a list of any suffixes in the name. + + Example: + >>> info = my_fs.getinfo("foo.tar.gz") + >>> info.suffixes + ['.tar', '.gz'] + + """ + name = self.get("basic", "name") + if name.startswith(".") and name.count(".") == 1: + return [] + return ["." + suffix for suffix in name.split(".")[1:]] + + @property + def stem(self): + # type: () -> Text + """`str`: the name minus any suffixes. + + Example: + >>> info = my_fs.getinfo("foo.tar.gz") + >>> info.stem + 'foo' + + """ + name = self.get("basic", "name") + if name.startswith("."): + return name + return name.split(".")[0] + + @property + def is_dir(self): + # type: () -> bool + """`bool`: `True` if the resource references a directory.""" + return cast(bool, self.get("basic", "is_dir")) + + @property + def is_file(self): + # type: () -> bool + """`bool`: `True` if the resource references a file.""" + return not cast(bool, self.get("basic", "is_dir")) + + @property + def is_link(self): + # type: () -> bool + """`bool`: `True` if the resource is a symlink.""" + self._require_namespace("link") + return self.get("link", "target", None) is not None + + @property + def type(self): + # type: () -> ResourceType + """`~fs.enums.ResourceType`: the type of the resource. + + Requires the ``"details"`` namespace. + + Raises: + ~fs.errors.MissingInfoNamespace: if the 'details' + namespace is not in the Info. + + """ + self._require_namespace("details") + return ResourceType(self.get("details", "type", 0)) + + @property + def accessed(self): + # type: () -> Optional[datetime] + """`~datetime.datetime`: the resource last access time, or `None`. + + Requires the ``"details"`` namespace. + + Raises: + ~fs.errors.MissingInfoNamespace: if the ``"details"`` + namespace is not in the Info. + + """ + self._require_namespace("details") + _time = self._make_datetime(self.get("details", "accessed")) + return _time + + @property + def modified(self): + # type: () -> Optional[datetime] + """`~datetime.datetime`: the resource last modification time, or `None`. + + Requires the ``"details"`` namespace. + + Raises: + ~fs.errors.MissingInfoNamespace: if the ``"details"`` + namespace is not in the Info. + + """ + self._require_namespace("details") + _time = self._make_datetime(self.get("details", "modified")) + return _time + + @property + def created(self): + # type: () -> Optional[datetime] + """`~datetime.datetime`: the resource creation time, or `None`. + + Requires the ``"details"`` namespace. + + Raises: + ~fs.errors.MissingInfoNamespace: if the ``"details"`` + namespace is not in the Info. + + """ + self._require_namespace("details") + _time = self._make_datetime(self.get("details", "created")) + return _time + + @property + def metadata_changed(self): + # type: () -> Optional[datetime] + """`~datetime.datetime`: the resource metadata change time, or `None`. + + Requires the ``"details"`` namespace. + + Raises: + ~fs.errors.MissingInfoNamespace: if the ``"details"`` + namespace is not in the Info. + + """ + self._require_namespace("details") + _time = self._make_datetime(self.get("details", "metadata_changed")) + return _time + + @property + def permissions(self): + # type: () -> Optional[Permissions] + """`Permissions`: the permissions of the resource, or `None`. + + Requires the ``"access"`` namespace. + + Raises: + ~fs.errors.MissingInfoNamespace: if the ``"access"`` + namespace is not in the Info. + + """ + self._require_namespace("access") + _perm_names = self.get("access", "permissions") + if _perm_names is None: + return None + permissions = Permissions(_perm_names) + return permissions + + @property + def size(self): + # type: () -> int + """`int`: the size of the resource, in bytes. + + Requires the ``"details"`` namespace. + + Raises: + ~fs.errors.MissingInfoNamespace: if the ``"details"`` + namespace is not in the Info. + + """ + self._require_namespace("details") + return cast(int, self.get("details", "size")) + + @property + def user(self): + # type: () -> Optional[Text] + """`str`: the owner of the resource, or `None`. + + Requires the ``"access"`` namespace. + + Raises: + ~fs.errors.MissingInfoNamespace: if the ``"access"`` + namespace is not in the Info. + + """ + self._require_namespace("access") + return self.get("access", "user") + + @property + def uid(self): + # type: () -> Optional[int] + """`int`: the user id of the resource, or `None`. + + Requires the ``"access"`` namespace. + + Raises: + ~fs.errors.MissingInfoNamespace: if the ``"access"`` + namespace is not in the Info. + + """ + self._require_namespace("access") + return self.get("access", "uid") + + @property + def group(self): + # type: () -> Optional[Text] + """`str`: the group of the resource owner, or `None`. + + Requires the ``"access"`` namespace. + + Raises: + ~fs.errors.MissingInfoNamespace: if the ``"access"`` + namespace is not in the Info. + + """ + self._require_namespace("access") + return self.get("access", "group") + + @property + def gid(self): + # type: () -> Optional[int] + """`int`: the group id of the resource, or `None`. + + Requires the ``"access"`` namespace. + + Raises: + ~fs.errors.MissingInfoNamespace: if the ``"access"`` + namespace is not in the Info. + + """ + self._require_namespace("access") + return self.get("access", "gid") + + @property + def target(self): # noqa: D402 + # type: () -> Optional[Text] + """`str`: the link target (if resource is a symlink), or `None`. + + Requires the ``"link"`` namespace. + + Raises: + ~fs.errors.MissingInfoNamespace: if the ``"link"`` + namespace is not in the Info. + + """ + self._require_namespace("link") + return self.get("link", "target") diff --git a/pyfatfs/mode.py b/pyfatfs/mode.py new file mode 100644 index 0000000..c719340 --- /dev/null +++ b/pyfatfs/mode.py @@ -0,0 +1,238 @@ +"""Abstract I/O mode container. + +Mode strings are used in in `~fs.base.FS.open` and +`~fs.base.FS.openbin`. + +""" + +from __future__ import print_function, unicode_literals + +import typing + +import six + +from ._typing import Text + +if typing.TYPE_CHECKING: + from typing import FrozenSet, Set, Union + + +__all__ = ["Mode", "check_readable", "check_writable", "validate_openbin_mode"] + + +# https://docs.python.org/3/library/functions.html#open +@six.python_2_unicode_compatible +class Mode(typing.Container[Text]): + """An abstraction for I/O modes. + + A mode object provides properties that can be used to interrogate the + `mode strings `_ + used when opening files. + + Example: + >>> mode = Mode('rb') + >>> mode.reading + True + >>> mode.writing + False + >>> mode.binary + True + >>> mode.text + False + + """ + + def __init__(self, mode): + # type: (Text) -> None + """Create a new `Mode` instance. + + Arguments: + mode (str): A *mode* string, as used by `io.open`. + + Raises: + ValueError: If the mode string is invalid. + + """ + self._mode = mode + self.validate() + + def __repr__(self): + # type: () -> Text + return "Mode({!r})".format(self._mode) + + def __str__(self): + # type: () -> Text + return self._mode + + def __contains__(self, character): + # type: (object) -> bool + """Check if a mode contains a given character.""" + assert isinstance(character, Text) + return character in self._mode + + def to_platform(self): + # type: () -> Text + """Get a mode string for the current platform. + + Currently, this just removes the 'x' on PY2 because PY2 doesn't + support exclusive mode. + + """ + return self._mode.replace("x", "w") if six.PY2 else self._mode + + def to_platform_bin(self): + # type: () -> Text + """Get a *binary* mode string for the current platform. + + This removes the 't' and adds a 'b' if needed. + + """ + _mode = self.to_platform().replace("t", "") + return _mode if "b" in _mode else _mode + "b" + + def validate(self, _valid_chars=frozenset("rwxtab+")): + # type: (Union[Set[Text], FrozenSet[Text]]) -> None + """Validate the mode string. + + Raises: + ValueError: if the mode contains invalid chars. + + """ + mode = self._mode + if not mode: + raise ValueError("mode must not be empty") + if not _valid_chars.issuperset(mode): + raise ValueError("mode '{}' contains invalid characters".format(mode)) + if mode[0] not in "rwxa": + raise ValueError("mode must start with 'r', 'w', 'x', or 'a'") + if "t" in mode and "b" in mode: + raise ValueError("mode can't be binary ('b') and text ('t')") + + def validate_bin(self): + # type: () -> None + """Validate a mode for opening a binary file. + + Raises: + ValueError: if the mode contains invalid chars. + + """ + self.validate() + if "t" in self: + raise ValueError("mode must be binary") + + @property + def create(self): + # type: () -> bool + """`bool`: `True` if the mode would create a file.""" + return "a" in self or "w" in self or "x" in self + + @property + def reading(self): + # type: () -> bool + """`bool`: `True` if the mode permits reading.""" + return "r" in self or "+" in self + + @property + def writing(self): + # type: () -> bool + """`bool`: `True` if the mode permits writing.""" + return "w" in self or "a" in self or "+" in self or "x" in self + + @property + def appending(self): + # type: () -> bool + """`bool`: `True` if the mode permits appending.""" + return "a" in self + + @property + def updating(self): + # type: () -> bool + """`bool`: `True` if the mode permits both reading and writing.""" + return "+" in self + + @property + def truncate(self): + # type: () -> bool + """`bool`: `True` if the mode would truncate an existing file.""" + return "w" in self or "x" in self + + @property + def exclusive(self): + # type: () -> bool + """`bool`: `True` if the mode require exclusive creation.""" + return "x" in self + + @property + def binary(self): + # type: () -> bool + """`bool`: `True` if a mode specifies binary.""" + return "b" in self + + @property + def text(self): + # type: () -> bool + """`bool`: `True` if a mode specifies text.""" + return "t" in self or "b" not in self + + +def check_readable(mode): + # type: (Text) -> bool + """Check a mode string allows reading. + + Arguments: + mode (str): A mode string, e.g. ``"rt"`` + + Returns: + bool: `True` if the mode allows reading. + + """ + return Mode(mode).reading + + +def check_writable(mode): + # type: (Text) -> bool + """Check a mode string allows writing. + + Arguments: + mode (str): A mode string, e.g. ``"wt"`` + + Returns: + bool: `True` if the mode allows writing. + + """ + return Mode(mode).writing + + +def validate_open_mode(mode): + # type: (Text) -> None + """Check ``mode`` parameter of `~fs.base.FS.open` is valid. + + Arguments: + mode (str): Mode parameter. + + Raises: + `ValueError` if mode is not valid. + + """ + Mode(mode) + + +def validate_openbin_mode(mode, _valid_chars=frozenset("rwxab+")): + # type: (Text, Union[Set[Text], FrozenSet[Text]]) -> None + """Check ``mode`` parameter of `~fs.base.FS.openbin` is valid. + + Arguments: + mode (str): Mode parameter. + + Raises: + `ValueError` if mode is not valid. + + """ + if "t" in mode: + raise ValueError("text mode not valid in openbin") + if not mode: + raise ValueError("mode must not be empty") + if mode[0] not in "rwxa": + raise ValueError("mode must start with 'r', 'w', 'a' or 'x'") + if not _valid_chars.issuperset(mode): + raise ValueError("mode '{}' contains invalid characters".format(mode)) diff --git a/pyfatfs/path.py b/pyfatfs/path.py new file mode 100644 index 0000000..0bfa514 --- /dev/null +++ b/pyfatfs/path.py @@ -0,0 +1,592 @@ +"""Useful functions for working with PyFilesystem paths. + +This is broadly similar to the standard `os.path` module but works +with paths in the canonical format expected by all FS objects (that is, +separated by forward slashes and with an optional leading slash). + +See :ref:`paths` for an explanation of PyFilesystem paths. + +""" + +from __future__ import print_function, unicode_literals + +import typing + +import re + +from .errors import IllegalBackReference + +if typing.TYPE_CHECKING: + from typing import List, Text, Tuple + + +__all__ = [ + "abspath", + "basename", + "combine", + "dirname", + "forcedir", + "frombase", + "isabs", + "isbase", + "isdotfile", + "isparent", + "issamedir", + "iswildcard", + "iteratepath", + "join", + "normpath", + "parts", + "recursepath", + "relativefrom", + "relpath", + "split", + "splitext", +] + +_requires_normalization = re.compile(r"(^|/)\.\.?($|/)|//", re.UNICODE).search + + +def normpath(path): + # type: (Text) -> Text + """Normalize a path. + + This function simplifies a path by collapsing back-references + and removing duplicated separators. + + Arguments: + path (str): Path to normalize. + + Returns: + str: A valid FS path. + + Example: + >>> normpath("/foo//bar/frob/../baz") + '/foo/bar/baz' + >>> normpath("foo/../../bar") + Traceback (most recent call last): + ... + fs.errors.IllegalBackReference: path 'foo/../../bar' contains back-references outside of filesystem + + """ # noqa: E501 + if path in "/": + return path + + # An early out if there is no need to normalize this path + if not _requires_normalization(path): + return path.rstrip("/") + + prefix = "/" if path.startswith("/") else "" + components = [] # type: List[Text] + try: + for component in path.split("/"): + if component in "..": # True for '..', '.', and '' + if component == "..": + components.pop() + else: + components.append(component) + except IndexError: + # FIXME (@althonos): should be raised from the IndexError + raise IllegalBackReference(path) + return prefix + "/".join(components) + + +def iteratepath(path): + # type: (Text) -> List[Text] + """Iterate over the individual components of a path. + + Arguments: + path (str): Path to iterate over. + + Returns: + list: A list of path components. + + Example: + >>> iteratepath('/foo/bar/baz') + ['foo', 'bar', 'baz'] + + """ + path = relpath(normpath(path)) + if not path: + return [] + return path.split("/") + + +def recursepath(path, reverse=False): + # type: (Text, bool) -> List[Text] + """Get intermediate paths from the root to the given path. + + Arguments: + path (str): A PyFilesystem path + reverse (bool): Reverses the order of the paths + (default `False`). + + Returns: + list: A list of paths. + + Example: + >>> recursepath('a/b/c') + ['/', '/a', '/a/b', '/a/b/c'] + + """ + if path in "/": + return ["/"] + + path = abspath(normpath(path)) + "/" + + paths = ["/"] + find = path.find + append = paths.append + pos = 1 + len_path = len(path) + + while pos < len_path: + pos = find("/", pos) + append(path[:pos]) + pos += 1 + + if reverse: + return paths[::-1] + return paths + + +def isabs(path): + # type: (Text) -> bool + """Check if a path is an absolute path. + + Arguments: + path (str): A PyFilesytem path. + + Returns: + bool: `True` if the path is absolute (starts with a ``'/'``). + + """ + # Somewhat trivial, but helps to make code self-documenting + return path.startswith("/") + + +def abspath(path): + # type: (Text) -> Text + """Convert the given path to an absolute path. + + Since FS objects have no concept of a *current directory*, this + simply adds a leading ``/`` character if the path doesn't already + have one. + + Arguments: + path (str): A PyFilesytem path. + + Returns: + str: An absolute path. + + """ + if not path.startswith("/"): + return "/" + path + return path + + +def relpath(path): + # type: (Text) -> Text + """Convert the given path to a relative path. + + This is the inverse of `abspath`, stripping a leading ``'/'`` from + the path if it is present. + + Arguments: + path (str): A path to adjust. + + Returns: + str: A relative path. + + Example: + >>> relpath('/a/b') + 'a/b' + + """ + return path.lstrip("/") + + +def join(*paths): + # type: (*Text) -> Text + """Join any number of paths together. + + Arguments: + *paths (str): Paths to join, given as positional arguments. + + Returns: + str: The joined path. + + Example: + >>> join('foo', 'bar', 'baz') + 'foo/bar/baz' + >>> join('foo/bar', '../baz') + 'foo/baz' + >>> join('foo/bar', '/baz') + '/baz' + + """ + absolute = False + relpaths = [] # type: List[Text] + for p in paths: + if p: + if p[0] == "/": + del relpaths[:] + absolute = True + relpaths.append(p) + + path = normpath("/".join(relpaths)) + if absolute: + path = abspath(path) + return path + + +def combine(path1, path2): + # type: (Text, Text) -> Text + """Join two paths together. + + This is faster than :func:`~fs.path.join`, but only works when the + second path is relative, and there are no back references in either + path. + + Arguments: + path1 (str): A PyFilesytem path. + path2 (str): A PyFilesytem path. + + Returns: + str: The joint path. + + Example: + >>> combine("foo/bar", "baz") + 'foo/bar/baz' + + """ + if not path1: + return path2.lstrip() + return "{}/{}".format(path1.rstrip("/"), path2.lstrip("/")) + + +def parts(path): + # type: (Text) -> List[Text] + """Split a path in to its component parts. + + Arguments: + path (str): Path to split in to parts. + + Returns: + list: List of components + + Example: + >>> parts('/foo/bar/baz') + ['/', 'foo', 'bar', 'baz'] + + """ + _path = normpath(path) + components = _path.strip("/") + + _parts = ["/" if _path.startswith("/") else "./"] + if components: + _parts += components.split("/") + return _parts + + +def split(path): + # type: (Text) -> Tuple[Text, Text] + """Split a path into (head, tail) pair. + + This function splits a path into a pair (head, tail) where 'tail' is + the last pathname component and 'head' is all preceding components. + + Arguments: + path (str): Path to split + + Returns: + (str, str): a tuple containing the head and the tail of the path. + + Example: + >>> split("foo/bar") + ('foo', 'bar') + >>> split("foo/bar/baz") + ('foo/bar', 'baz') + >>> split("/foo/bar/baz") + ('/foo/bar', 'baz') + + """ + if "/" not in path: + return ("", path) + split = path.rsplit("/", 1) + return (split[0] or "/", split[1]) + + +def splitext(path): + # type: (Text) -> Tuple[Text, Text] + """Split the extension from the path. + + Arguments: + path (str): A path to split. + + Returns: + (str, str): A tuple containing the path and the extension. + + Example: + >>> splitext('baz.txt') + ('baz', '.txt') + >>> splitext('foo/bar/baz.txt') + ('foo/bar/baz', '.txt') + >>> splitext('foo/bar/.foo') + ('foo/bar/.foo', '') + + """ + parent_path, pathname = split(path) + if pathname.startswith(".") and pathname.count(".") == 1: + return path, "" + if "." not in pathname: + return path, "" + pathname, ext = pathname.rsplit(".", 1) + path = join(parent_path, pathname) + return path, "." + ext + + +def isdotfile(path): + # type: (Text) -> bool + """Detect if a path references a dot file. + + Arguments: + path (str): Path to check. + + Returns: + bool: `True` if the resource name starts with a ``'.'``. + + Example: + >>> isdotfile('.baz') + True + >>> isdotfile('foo/bar/.baz') + True + >>> isdotfile('foo/bar.baz') + False + + """ + return basename(path).startswith(".") + + +def dirname(path): + # type: (Text) -> Text + """Return the parent directory of a path. + + This is always equivalent to the 'head' component of the value + returned by ``split(path)``. + + Arguments: + path (str): A PyFilesytem path. + + Returns: + str: the parent directory of the given path. + + Example: + >>> dirname('foo/bar/baz') + 'foo/bar' + >>> dirname('/foo/bar') + '/foo' + >>> dirname('/foo') + '/' + + """ + return split(path)[0] + + +def basename(path): + # type: (Text) -> Text + """Return the basename of the resource referenced by a path. + + This is always equivalent to the 'tail' component of the value + returned by split(path). + + Arguments: + path (str): A PyFilesytem path. + + Returns: + str: the name of the resource at the given path. + + Example: + >>> basename('foo/bar/baz') + 'baz' + >>> basename('foo/bar') + 'bar' + >>> basename('foo/bar/') + '' + + """ + return split(path)[1] + + +def issamedir(path1, path2): + # type: (Text, Text) -> bool + """Check if two paths reference a resource in the same directory. + + Arguments: + path1 (str): A PyFilesytem path. + path2 (str): A PyFilesytem path. + + Returns: + bool: `True` if the two resources are in the same directory. + + Example: + >>> issamedir("foo/bar/baz.txt", "foo/bar/spam.txt") + True + >>> issamedir("foo/bar/baz/txt", "spam/eggs/spam.txt") + False + + """ + return dirname(normpath(path1)) == dirname(normpath(path2)) + + +def isbase(path1, path2): + # type: (Text, Text) -> bool + """Check if ``path1`` is a base of ``path2``. + + Arguments: + path1 (str): A PyFilesytem path. + path2 (str): A PyFilesytem path. + + Returns: + bool: `True` if ``path2`` starts with ``path1`` + + Example: + >>> isbase('foo/bar', 'foo/bar/baz/egg.txt') + True + + """ + _path1 = forcedir(abspath(path1)) + _path2 = forcedir(abspath(path2)) + return _path2.startswith(_path1) # longer one is child + + +def isparent(path1, path2): + # type: (Text, Text) -> bool + """Check if ``path1`` is a parent directory of ``path2``. + + Arguments: + path1 (str): A PyFilesytem path. + path2 (str): A PyFilesytem path. + + Returns: + bool: `True` if ``path1`` is a parent directory of ``path2`` + + Example: + >>> isparent("foo/bar", "foo/bar/spam.txt") + True + >>> isparent("foo/bar/", "foo/bar") + True + >>> isparent("foo/barry", "foo/baz/bar") + False + >>> isparent("foo/bar/baz/", "foo/baz/bar") + False + + """ + bits1 = path1.split("/") + bits2 = path2.split("/") + while bits1 and bits1[-1] == "": + bits1.pop() + if len(bits1) > len(bits2): + return False + for (bit1, bit2) in zip(bits1, bits2): + if bit1 != bit2: + return False + return True + + +def forcedir(path): + # type: (Text) -> Text + """Ensure the path ends with a trailing forward slash. + + Arguments: + path (str): A PyFilesytem path. + + Returns: + str: The path, ending with a slash. + + Example: + >>> forcedir("foo/bar") + 'foo/bar/' + >>> forcedir("foo/bar/") + 'foo/bar/' + >>> forcedir("foo/spam.txt") + 'foo/spam.txt/' + + """ + if not path.endswith("/"): + return path + "/" + return path + + +def frombase(path1, path2): + # type: (Text, Text) -> Text + """Get the final path of ``path2`` that isn't in ``path1``. + + Arguments: + path1 (str): A PyFilesytem path. + path2 (str): A PyFilesytem path. + + Returns: + str: the final part of ``path2``. + + Example: + >>> frombase('foo/bar/', 'foo/bar/baz/egg') + 'baz/egg' + + """ + if not isparent(path1, path2): + raise ValueError("path1 must be a prefix of path2") + return path2[len(path1) :] + + +def relativefrom(base, path): + # type: (Text, Text) -> Text + """Return a path relative from a given base path. + + Insert backrefs as appropriate to reach the path from the base. + + Arguments: + base (str): Path to a directory. + path (str): Path to make relative. + + Returns: + str: the path to ``base`` from ``path``. + + >>> relativefrom("foo/bar", "baz/index.html") + '../../baz/index.html' + + """ + base_parts = list(iteratepath(base)) + path_parts = list(iteratepath(path)) + + common = 0 + for component_a, component_b in zip(base_parts, path_parts): + if component_a != component_b: + break + common += 1 + + return "/".join([".."] * (len(base_parts) - common) + path_parts[common:]) + + +_WILD_CHARS = frozenset("*?[]!{}") + + +def iswildcard(path): + # type: (Text) -> bool + """Check if a path ends with a wildcard. + + Arguments: + path (str): A PyFilesystem path. + + Returns: + bool: `True` if path ends with a wildcard. + + Example: + >>> iswildcard('foo/bar/baz.*') + True + >>> iswildcard('foo/bar') + False + + """ + assert path is not None + return not _WILD_CHARS.isdisjoint(path) diff --git a/pyfatfs/permissions.py b/pyfatfs/permissions.py new file mode 100644 index 0000000..3fee335 --- /dev/null +++ b/pyfatfs/permissions.py @@ -0,0 +1,317 @@ +"""Abstract permissions container. +""" + +from __future__ import print_function, unicode_literals + +import typing +from typing import Iterable + +import six + +from ._typing import Text + +if typing.TYPE_CHECKING: + from typing import Iterator, List, Optional, Tuple, Type, Union + + +def make_mode(init): + # type: (Union[int, Iterable[Text], None]) -> int + """Make a mode integer from an initial value.""" + return Permissions.get_mode(init) + + +class _PermProperty(object): + """Creates simple properties to get/set permissions.""" + + def __init__(self, name): + # type: (Text) -> None + self._name = name + self.__doc__ = "Boolean for '{}' permission.".format(name) + + def __get__(self, obj, obj_type=None): + # type: (Permissions, Optional[Type[Permissions]]) -> bool + return self._name in obj + + def __set__(self, obj, value): + # type: (Permissions, bool) -> None + if value: + obj.add(self._name) + else: + obj.remove(self._name) + + +@six.python_2_unicode_compatible +class Permissions(object): + """An abstraction for file system permissions. + + Permissions objects store information regarding the permissions + on a resource. It supports Linux permissions, but is generic enough + to manage permission information from almost any filesystem. + + Example: + >>> from fs.permissions import Permissions + >>> p = Permissions(user='rwx', group='rw-', other='r--') + >>> print(p) + rwxrw-r-- + >>> p.mode + 500 + >>> oct(p.mode) + '0o764' + + """ + + _LINUX_PERMS = [ + ("setuid", 2048), + ("setguid", 1024), + ("sticky", 512), + ("u_r", 256), + ("u_w", 128), + ("u_x", 64), + ("g_r", 32), + ("g_w", 16), + ("g_x", 8), + ("o_r", 4), + ("o_w", 2), + ("o_x", 1), + ] # type: List[Tuple[Text, int]] + _LINUX_PERMS_NAMES = [_name for _name, _mask in _LINUX_PERMS] # type: List[Text] + + def __init__( + self, + names=None, # type: Optional[Iterable[Text]] + mode=None, # type: Optional[int] + user=None, # type: Optional[Text] + group=None, # type: Optional[Text] + other=None, # type: Optional[Text] + sticky=None, # type: Optional[bool] + setuid=None, # type: Optional[bool] + setguid=None, # type: Optional[bool] + ): + # type: (...) -> None + """Create a new `Permissions` instance. + + Arguments: + names (list, optional): A list of permissions. + mode (int, optional): A mode integer. + user (str, optional): A triplet of *user* permissions, e.g. + ``"rwx"`` or ``"r--"`` + group (str, optional): A triplet of *group* permissions, e.g. + ``"rwx"`` or ``"r--"`` + other (str, optional): A triplet of *other* permissions, e.g. + ``"rwx"`` or ``"r--"`` + sticky (bool, optional): A boolean for the *sticky* bit. + setuid (bool, optional): A boolean for the *setuid* bit. + setguid (bool, optional): A boolean for the *setguid* bit. + + """ + if names is not None: + self._perms = set(names) + elif mode is not None: + self._perms = {name for name, mask in self._LINUX_PERMS if mode & mask} + else: + perms = self._perms = set() + perms.update("u_" + p for p in user or "" if p != "-") + perms.update("g_" + p for p in group or "" if p != "-") + perms.update("o_" + p for p in other or "" if p != "-") + + if sticky: + self._perms.add("sticky") + if setuid: + self._perms.add("setuid") + if setguid: + self._perms.add("setguid") + + def __repr__(self): + # type: () -> Text + if not self._perms.issubset(self._LINUX_PERMS_NAMES): + _perms_str = ", ".join("'{}'".format(p) for p in sorted(self._perms)) + return "Permissions(names=[{}])".format(_perms_str) + + def _check(perm, name): + # type: (Text, Text) -> Text + return name if perm in self._perms else "" + + user = "".join((_check("u_r", "r"), _check("u_w", "w"), _check("u_x", "x"))) + group = "".join((_check("g_r", "r"), _check("g_w", "w"), _check("g_x", "x"))) + other = "".join((_check("o_r", "r"), _check("o_w", "w"), _check("o_x", "x"))) + args = [] + _fmt = "user='{}', group='{}', other='{}'" + basic = _fmt.format(user, group, other) + args.append(basic) + if self.sticky: + args.append("sticky=True") + if self.setuid: + args.append("setuid=True") + if self.setuid: + args.append("setguid=True") + return "Permissions({})".format(", ".join(args)) + + def __str__(self): + # type: () -> Text + return self.as_str() + + def __iter__(self): + # type: () -> Iterator[Text] + return iter(self._perms) + + def __contains__(self, permission): + # type: (object) -> bool + return permission in self._perms + + def __eq__(self, other): + # type: (object) -> bool + if isinstance(other, Permissions): + names = other.dump() # type: object + else: + names = other + return self.dump() == names + + def __ne__(self, other): + # type: (object) -> bool + return not self.__eq__(other) + + @classmethod + def parse(cls, ls): + # type: (Text) -> Permissions + """Parse permissions in Linux notation.""" + user = ls[:3] + group = ls[3:6] + other = ls[6:9] + return cls(user=user, group=group, other=other) + + @classmethod + def load(cls, permissions): + # type: (List[Text]) -> Permissions + """Load a serialized permissions object.""" + return cls(names=permissions) + + @classmethod + def create(cls, init=None): + # type: (Union[int, Iterable[Text], None]) -> Permissions + """Create a permissions object from an initial value. + + Arguments: + init (int or list, optional): May be None to use `0o777` + permissions, a mode integer, or a list of permission names. + + Returns: + int: mode integer that may be used for instance by `os.makedir`. + + Example: + >>> Permissions.create(None) + Permissions(user='rwx', group='rwx', other='rwx') + >>> Permissions.create(0o700) + Permissions(user='rwx', group='', other='') + >>> Permissions.create(['u_r', 'u_w', 'u_x']) + Permissions(user='rwx', group='', other='') + + """ + if init is None: + return cls(mode=0o777) + if isinstance(init, cls): + return init + if isinstance(init, int): + return cls(mode=init) + if isinstance(init, list): + return cls(names=init) + raise ValueError("permissions is invalid") + + @classmethod + def get_mode(cls, init): + # type: (Union[int, Iterable[Text], None]) -> int + """Convert an initial value to a mode integer.""" + return cls.create(init).mode + + def copy(self): + # type: () -> Permissions + """Make a copy of this permissions object.""" + return Permissions(names=list(self._perms)) + + def dump(self): + # type: () -> List[Text] + """Get a list suitable for serialization.""" + return sorted(self._perms) + + def as_str(self): + # type: () -> Text + """Get a Linux-style string representation of permissions.""" + perms = [ + c if name in self._perms else "-" + for name, c in zip(self._LINUX_PERMS_NAMES[-9:], "rwxrwxrwx") + ] + if "setuid" in self._perms: + perms[2] = "s" if "u_x" in self._perms else "S" + if "setguid" in self._perms: + perms[5] = "s" if "g_x" in self._perms else "S" + if "sticky" in self._perms: + perms[8] = "t" if "o_x" in self._perms else "T" + + perm_str = "".join(perms) + return perm_str + + @property + def mode(self): + # type: () -> int + """`int`: mode integer.""" + mode = 0 + for name, mask in self._LINUX_PERMS: + if name in self._perms: + mode |= mask + return mode + + @mode.setter + def mode(self, mode): + # type: (int) -> None + self._perms = {name for name, mask in self._LINUX_PERMS if mode & mask} + + u_r = _PermProperty("u_r") + u_w = _PermProperty("u_w") + u_x = _PermProperty("u_x") + + g_r = _PermProperty("g_r") + g_w = _PermProperty("g_w") + g_x = _PermProperty("g_x") + + o_r = _PermProperty("o_r") + o_w = _PermProperty("o_w") + o_x = _PermProperty("o_x") + + sticky = _PermProperty("sticky") + setuid = _PermProperty("setuid") + setguid = _PermProperty("setguid") + + def add(self, *permissions): + # type: (*Text) -> None + """Add permission(s). + + Arguments: + *permissions (str): Permission name(s), such as ``'u_w'`` + or ``'u_x'``. + + """ + self._perms.update(permissions) + + def remove(self, *permissions): + # type: (*Text) -> None + """Remove permission(s). + + Arguments: + *permissions (str): Permission name(s), such as ``'u_w'`` + or ``'u_x'``.s + + """ + self._perms.difference_update(permissions) + + def check(self, *permissions): + # type: (*Text) -> bool + """Check if one or more permissions are enabled. + + Arguments: + *permissions (str): Permission name(s), such as ``'u_w'`` + or ``'u_x'``. + + Returns: + bool: `True` if all given permissions are set. + + """ + return self._perms.issuperset(permissions) diff --git a/pyfatfs/time.py b/pyfatfs/time.py new file mode 100644 index 0000000..b462c47 --- /dev/null +++ b/pyfatfs/time.py @@ -0,0 +1,43 @@ +"""Time related tools. +""" + +from __future__ import print_function, unicode_literals + +import typing + +from calendar import timegm +from datetime import datetime + +try: + from datetime import timezone +except ImportError: + from ._tzcompat import timezone # type: ignore + +if typing.TYPE_CHECKING: + from typing import Optional + + +def datetime_to_epoch(d): + # type: (datetime) -> int + """Convert datetime to epoch.""" + return timegm(d.utctimetuple()) + + +@typing.overload +def epoch_to_datetime(t): # noqa: D103 + # type: (None) -> None + pass + + +@typing.overload +def epoch_to_datetime(t): # noqa: D103 + # type: (int) -> datetime + pass + + +def epoch_to_datetime(t): + # type: (Optional[int]) -> Optional[datetime] + """Convert epoch time to a UTC datetime.""" + if t is None: + return None + return datetime.fromtimestamp(t, tz=timezone.utc) diff --git a/pyproject.toml b/pyproject.toml index 9649c10..1550ae9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,8 +7,8 @@ name = "pyfatfs" description = "FAT12/FAT16/FAT32 implementation with VFAT support" readme = "README.rst" requires-python = "~=3.8" -dependencies = ["fs~=2.4"] -keywords = ["filesystem", "PyFilesystem2", "FAT12", "FAT16", "FAT32", "VFAT", "LFN"] +dependencies = ["fsspec"] +keywords = ["filesystem", "fsspec", "FAT12", "FAT16", "FAT32", "VFAT", "LFN"] license = {file = "LICENSE"} classifiers = [ "Development Status :: 5 - Production/Stable", diff --git a/pytest.sh b/pytest.sh new file mode 100755 index 0000000..f6b0313 --- /dev/null +++ b/pytest.sh @@ -0,0 +1,5 @@ +#!/bin/sh + +# fix: ModuleNotFoundError: No module named 'pyfatfs' + +PYTHONPATH=$PYTHONPATH:$PWD pytest --maxfail=1 diff --git a/tests/fs_test_cases.py b/tests/fs_test_cases.py new file mode 100644 index 0000000..84e7d42 --- /dev/null +++ b/tests/fs_test_cases.py @@ -0,0 +1,1788 @@ +# coding: utf-8 +"""Base class for tests. + +All Filesystems should be able to pass these. + +""" + +from __future__ import absolute_import, unicode_literals + +import io +import itertools +import json +import os +import six +import time +import shutil +import unittest +import warnings +import tempfile +from datetime import datetime +from six import text_type + +from pyfatfs.path import split, normpath + +import fsspec +import fsspec.implementations.dirfs +import fsspec.implementations.local +import fsspec.implementations.memory + +if six.PY2: + import collections as collections_abc +else: + import collections.abc as collections_abc + +try: + from datetime import timezone +except ImportError: + from ._tzcompat import timezone # type: ignore + + + +# based on pyfilesystem2/fs/copy.py + +def copy_file(fs1, path1, fs2, path2): + # FIXME handle path2 exists + if fs1 == fs2: + assert path1 != path2 + # https://github.com/fsspec/filesystem_spec/issues/909#issuecomment-1204212507 + # copy from one filesystem to the other + with ( + fs1.open(path1, "rb") as f1, + fs2.open(path2, "wb") as f2 + ): + shutil.copyfileobj(f1, f2) + +def copy_dir(fs1, path1, fs2, path2): + # FIXME handle path2 exists + if fs1 == fs2: + assert path1 != path2 + path1 = fs1._strip_protocol(path1) + path2 = fs2._strip_protocol(path2) + # no. os.path.normpath would turn "" into "." + # which creates ugly paths starting with "/./" + # path1 = os.path.normpath(path1) + # path2 = os.path.normpath(path2) + for subpath1, dirs, files in fs1.walk(path1): + subpath2 = path2 + subpath1[len(path1):] + for _dir in dirs: + fs2.mkdir(subpath2 + "/" + _dir) + for file in files: + # https://github.com/fsspec/filesystem_spec/issues/909#issuecomment-1204212507 + # copy from one filesystem to the other + with ( + fs1.open(subpath1 + "/" + file, "rb") as f1, + fs2.open(subpath2 + "/" + file, "wb") as f2 + ): + shutil.copyfileobj(f1, f2) + +def copy_fs(fs1, fs2, workers=0): + # FIXME handle path2 exists + assert fs1 != fs2 + return copy_dir(fs1, "/", fs2, "/") + +# based on pyfilesystem2/fs/move.py + +def move_file(fs1, path1, fs2, path2): + if fs1 == fs2: + return fs1.mv(path1, path2) + copy_file(fs1, path1, fs2, path2) + fs2.rm(path2) + +def move_dir(fs1, path1, fs2, path2): + if fs1 == fs2: + return fs1.mv(path1, path2) + copy_dir(fs1, path1, fs2, path2) + fs2.rm(path2, recursive=True) + +def walk_files(fs, path="/"): + for subpath, dirs, files in fs.walk(path): + for file in files: + yield normpath(path + "/" + subpath + "/" + file) + +def walk_dirs(fs, path="/"): + for subpath, dirs, files in fs.walk(path): + for _dir in dirs: + yield normpath(path + "/" + subpath + "/" + _dir) + +class TempdirFileSystem(fsspec.implementations.dirfs.DirFileSystem): + def __init__(self, **kwargs): + self.tempdir = tempfile.mkdtemp(prefix="pyfatfs-test-") + self.local_fs = fsspec.implementations.local.LocalFileSystem( + **kwargs, + ) + super().__init__(path=self.tempdir, fs=self.local_fs, **kwargs) + def __enter__(self): + return self + def __exit__(self, *exc): + if os.path.exists(self.tempdir): + shutil.rmtree(self.tempdir) + def __del__(self): + if os.path.exists(self.tempdir): + shutil.rmtree(self.tempdir) + +# # test +# with TempdirFileSystem() as d: +# pass + +def get_filesystem(protocol): + kwargs = { + # debug: disable caching + "use_listings_cache": False, + "skip_instance_cache": True, + } + if protocol == "tempdir": + fs = TempdirFileSystem(**kwargs) + elif protocol == "memory": + fs = fsspec.implementations.memory.MemoryFileSystem(**kwargs) + else: + fs = fsspec.filesystem(protocol, **kwargs) + if protocol == "memory": + # the memory filesystem is global + # so we have to remove previous files + # https://github.com/fsspec/filesystem_spec/issues/1904 + fs.rm("/", recursive=True) + return fs + +UNICODE_TEXT = """ + +UTF-8 encoded sample plain-text file +‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾ + +Markus Kuhn [ˈmaʳkʊs kuːn] <mkuhn@acm.org> — 1999-08-20 + + +The ASCII compatible UTF-8 encoding of ISO 10646 and Unicode +plain-text files is defined in RFC 2279 and in ISO 10646-1 Annex R. + + +Using Unicode/UTF-8, you can write in emails and source code things such as + +Mathematics and Sciences: + + ∮ E⋅da = Q, n → ∞, ∑ f(i) = ∏ g(i), ∀x∈ℝ: ⌈x⌉ = −⌊−x⌋, α ∧ ¬β = ¬(¬α ∨ β), + + ℕ ⊆ ℕ₀ ⊂ ℤ ⊂ ℚ ⊂ ℝ ⊂ ℂ, ⊥ < a ≠ b ≡ c ≤ d ≪ ⊤ ⇒ (A ⇔ B), + + 2H₂ + O₂ ⇌ 2H₂O, R = 4.7 kΩ, ⌀ 200 mm + +Linguistics and dictionaries: + + ði ıntəˈnæʃənəl fəˈnɛtık əsoʊsiˈeıʃn + Y [ˈʏpsilɔn], Yen [jɛn], Yoga [ˈjoːgɑ] + +APL: + + ((V⍳V)=⍳⍴V)/V←,V ⌷←⍳→⍴∆∇⊃‾⍎⍕⌈ + +Nicer typography in plain text files: + + ╔══════════════════════════════════════════╗ + ║ ║ + ║ • ‘single’ and “double” quotes ║ + ║ ║ + ║ • Curly apostrophes: “We’ve been here” ║ + ║ ║ + ║ • Latin-1 apostrophe and accents: '´` ║ + ║ ║ + ║ • ‚deutsche‘ „Anführungszeichen“ ║ + ║ ║ + ║ • †, ‡, ‰, •, 3–4, —, −5/+5, ™, … ║ + ║ ║ + ║ • ASCII safety test: 1lI|, 0OD, 8B ║ + ║ ╭─────────╮ ║ + ║ • the euro symbol: │ 14.95 € │ ║ + ║ ╰─────────╯ ║ + ╚══════════════════════════════════════════╝ + +Greek (in Polytonic): + + The Greek anthem: + + Σὲ γνωρίζω ἀπὸ τὴν κόψη + τοῦ σπαθιοῦ τὴν τρομερή, + σὲ γνωρίζω ἀπὸ τὴν ὄψη + ποὺ μὲ βία μετράει τὴ γῆ. + + ᾿Απ᾿ τὰ κόκκαλα βγαλμένη + τῶν ῾Ελλήνων τὰ ἱερά + καὶ σὰν πρῶτα ἀνδρειωμένη + χαῖρε, ὦ χαῖρε, ᾿Ελευθεριά! + + From a speech of Demosthenes in the 4th century BC: + + Οὐχὶ ταὐτὰ παρίσταταί μοι γιγνώσκειν, ὦ ἄνδρες ᾿Αθηναῖοι, + ὅταν τ᾿ εἰς τὰ πράγματα ἀποβλέψω καὶ ὅταν πρὸς τοὺς + λόγους οὓς ἀκούω· τοὺς μὲν γὰρ λόγους περὶ τοῦ + τιμωρήσασθαι Φίλιππον ὁρῶ γιγνομένους, τὰ δὲ πράγματ᾿ + εἰς τοῦτο προήκοντα, ὥσθ᾿ ὅπως μὴ πεισόμεθ᾿ αὐτοὶ + πρότερον κακῶς σκέψασθαι δέον. οὐδέν οὖν ἄλλο μοι δοκοῦσιν + οἱ τὰ τοιαῦτα λέγοντες ἢ τὴν ὑπόθεσιν, περὶ ἧς βουλεύεσθαι, + οὐχὶ τὴν οὖσαν παριστάντες ὑμῖν ἁμαρτάνειν. ἐγὼ δέ, ὅτι μέν + ποτ᾿ ἐξῆν τῇ πόλει καὶ τὰ αὑτῆς ἔχειν ἀσφαλῶς καὶ Φίλιππον + τιμωρήσασθαι, καὶ μάλ᾿ ἀκριβῶς οἶδα· ἐπ᾿ ἐμοῦ γάρ, οὐ πάλαι + γέγονεν ταῦτ᾿ ἀμφότερα· νῦν μέντοι πέπεισμαι τοῦθ᾿ ἱκανὸν + προλαβεῖν ἡμῖν εἶναι τὴν πρώτην, ὅπως τοὺς συμμάχους + σώσομεν. ἐὰν γὰρ τοῦτο βεβαίως ὑπάρξῃ, τότε καὶ περὶ τοῦ + τίνα τιμωρήσεταί τις καὶ ὃν τρόπον ἐξέσται σκοπεῖν· πρὶν δὲ + τὴν ἀρχὴν ὀρθῶς ὑποθέσθαι, μάταιον ἡγοῦμαι περὶ τῆς + τελευτῆς ὁντινοῦν ποιεῖσθαι λόγον. + + Δημοσθένους, Γ´ ᾿Ολυνθιακὸς + +Georgian: + + From a Unicode conference invitation: + + გთხოვთ ახლავე გაიაროთ რეგისტრაცია Unicode-ის მეათე საერთაშორისო + კონფერენციაზე დასასწრებად, რომელიც გაიმართება 10-12 მარტს, + ქ. მაინცში, გერმანიაში. კონფერენცია შეჰკრებს ერთად მსოფლიოს + ექსპერტებს ისეთ დარგებში როგორიცაა ინტერნეტი და Unicode-ი, + ინტერნაციონალიზაცია და ლოკალიზაცია, Unicode-ის გამოყენება + ოპერაციულ სისტემებსა, და გამოყენებით პროგრამებში, შრიფტებში, + ტექსტების დამუშავებასა და მრავალენოვან კომპიუტერულ სისტემებში. + +Russian: + + From a Unicode conference invitation: + + Зарегистрируйтесь сейчас на Десятую Международную Конференцию по + Unicode, которая состоится 10-12 марта 1997 года в Майнце в Германии. + Конференция соберет широкий круг экспертов по вопросам глобального + Интернета и Unicode, локализации и интернационализации, воплощению и + применению Unicode в различных операционных системах и программных + приложениях, шрифтах, верстке и многоязычных компьютерных системах. + +Thai (UCS Level 2): + + Excerpt from a poetry on The Romance of The Three Kingdoms (a Chinese + classic 'San Gua'): + + [----------------------------|------------------------] + ๏ แผ่นดินฮั่นเสื่อมโทรมแสนสังเวช พระปกเกศกองบู๊กู้ขึ้นใหม่ + สิบสองกษัตริย์ก่อนหน้าแลถัดไป สององค์ไซร้โง่เขลาเบาปัญญา + ทรงนับถือขันทีเป็นที่พึ่ง บ้านเมืองจึงวิปริตเป็นนักหนา + โฮจิ๋นเรียกทัพทั่วหัวเมืองมา หมายจะฆ่ามดชั่วตัวสำคัญ + เหมือนขับไสไล่เสือจากเคหา รับหมาป่าเข้ามาเลยอาสัญ + ฝ่ายอ้องอุ้นยุแยกให้แตกกัน ใช้สาวนั้นเป็นชนวนชื่นชวนใจ + พลันลิฉุยกุยกีกลับก่อเหตุ ช่างอาเพศจริงหนาฟ้าร้องไห้ + ต้องรบราฆ่าฟันจนบรรลัย ฤๅหาใครค้ำชูกู้บรรลังก์ ฯ + + (The above is a two-column text. If combining characters are handled + correctly, the lines of the second column should be aligned with the + | character above.) + +Ethiopian: + + Proverbs in the Amharic language: + + ሰማይ አይታረስ ንጉሥ አይከሰስ። + ብላ ካለኝ እንደአባቴ በቆመጠኝ። + ጌጥ ያለቤቱ ቁምጥና ነው። + ደሀ በሕልሙ ቅቤ ባይጠጣ ንጣት በገደለው። + የአፍ ወለምታ በቅቤ አይታሽም። + አይጥ በበላ ዳዋ ተመታ። + ሲተረጉሙ ይደረግሙ። + ቀስ በቀስ፥ ዕንቁላል በእግሩ ይሄዳል። + ድር ቢያብር አንበሳ ያስር። + ሰው እንደቤቱ እንጅ እንደ ጉረቤቱ አይተዳደርም። + እግዜር የከፈተውን ጉሮሮ ሳይዘጋው አይድርም። + የጎረቤት ሌባ፥ ቢያዩት ይስቅ ባያዩት ያጠልቅ። + ሥራ ከመፍታት ልጄን ላፋታት። + ዓባይ ማደሪያ የለው፥ ግንድ ይዞ ይዞራል። + የእስላም አገሩ መካ የአሞራ አገሩ ዋርካ። + ተንጋሎ ቢተፉ ተመልሶ ባፉ። + ወዳጅህ ማር ቢሆን ጨርስህ አትላሰው። + እግርህን በፍራሽህ ልክ ዘርጋ። + +Runes: + + ᚻᛖ ᚳᚹᚫᚦ ᚦᚫᛏ ᚻᛖ ᛒᚢᛞᛖ ᚩᚾ ᚦᚫᛗ ᛚᚪᚾᛞᛖ ᚾᚩᚱᚦᚹᛖᚪᚱᛞᚢᛗ ᚹᛁᚦ ᚦᚪ ᚹᛖᛥᚫ + + (Old English, which transcribed into Latin reads 'He cwaeth that he + bude thaem lande northweardum with tha Westsae.' and means 'He said + that he lived in the northern land near the Western Sea.') + +Braille: + + ⡌⠁⠧⠑ ⠼⠁⠒ ⡍⠜⠇⠑⠹⠰⠎ ⡣⠕⠌ + + ⡍⠜⠇⠑⠹ ⠺⠁⠎ ⠙⠑⠁⠙⠒ ⠞⠕ ⠃⠑⠛⠔ ⠺⠊⠹⠲ ⡹⠻⠑ ⠊⠎ ⠝⠕ ⠙⠳⠃⠞ + ⠱⠁⠞⠑⠧⠻ ⠁⠃⠳⠞ ⠹⠁⠞⠲ ⡹⠑ ⠗⠑⠛⠊⠌⠻ ⠕⠋ ⠙⠊⠎ ⠃⠥⠗⠊⠁⠇ ⠺⠁⠎ + ⠎⠊⠛⠝⠫ ⠃⠹ ⠹⠑ ⠊⠇⠻⠛⠹⠍⠁⠝⠂ ⠹⠑ ⠊⠇⠻⠅⠂ ⠹⠑ ⠥⠝⠙⠻⠞⠁⠅⠻⠂ + ⠁⠝⠙ ⠹⠑ ⠡⠊⠑⠋ ⠍⠳⠗⠝⠻⠲ ⡎⠊⠗⠕⠕⠛⠑ ⠎⠊⠛⠝⠫ ⠊⠞⠲ ⡁⠝⠙ + ⡎⠊⠗⠕⠕⠛⠑⠰⠎ ⠝⠁⠍⠑ ⠺⠁⠎ ⠛⠕⠕⠙ ⠥⠏⠕⠝ ⠰⡡⠁⠝⠛⠑⠂ ⠋⠕⠗ ⠁⠝⠹⠹⠔⠛ ⠙⠑ + ⠡⠕⠎⠑ ⠞⠕ ⠏⠥⠞ ⠙⠊⠎ ⠙⠁⠝⠙ ⠞⠕⠲ + + ⡕⠇⠙ ⡍⠜⠇⠑⠹ ⠺⠁⠎ ⠁⠎ ⠙⠑⠁⠙ ⠁⠎ ⠁ ⠙⠕⠕⠗⠤⠝⠁⠊⠇⠲ + + ⡍⠔⠙⠖ ⡊ ⠙⠕⠝⠰⠞ ⠍⠑⠁⠝ ⠞⠕ ⠎⠁⠹ ⠹⠁⠞ ⡊ ⠅⠝⠪⠂ ⠕⠋ ⠍⠹ + ⠪⠝ ⠅⠝⠪⠇⠫⠛⠑⠂ ⠱⠁⠞ ⠹⠻⠑ ⠊⠎ ⠏⠜⠞⠊⠊⠥⠇⠜⠇⠹ ⠙⠑⠁⠙ ⠁⠃⠳⠞ + ⠁ ⠙⠕⠕⠗⠤⠝⠁⠊⠇⠲ ⡊ ⠍⠊⠣⠞ ⠙⠁⠧⠑ ⠃⠑⠲ ⠔⠊⠇⠔⠫⠂ ⠍⠹⠎⠑⠇⠋⠂ ⠞⠕ + ⠗⠑⠛⠜⠙ ⠁ ⠊⠕⠋⠋⠔⠤⠝⠁⠊⠇ ⠁⠎ ⠹⠑ ⠙⠑⠁⠙⠑⠌ ⠏⠊⠑⠊⠑ ⠕⠋ ⠊⠗⠕⠝⠍⠕⠝⠛⠻⠹ + ⠔ ⠹⠑ ⠞⠗⠁⠙⠑⠲ ⡃⠥⠞ ⠹⠑ ⠺⠊⠎⠙⠕⠍ ⠕⠋ ⠳⠗ ⠁⠝⠊⠑⠌⠕⠗⠎ + ⠊⠎ ⠔ ⠹⠑ ⠎⠊⠍⠊⠇⠑⠆ ⠁⠝⠙ ⠍⠹ ⠥⠝⠙⠁⠇⠇⠪⠫ ⠙⠁⠝⠙⠎ + ⠩⠁⠇⠇ ⠝⠕⠞ ⠙⠊⠌⠥⠗⠃ ⠊⠞⠂ ⠕⠗ ⠹⠑ ⡊⠳⠝⠞⠗⠹⠰⠎ ⠙⠕⠝⠑ ⠋⠕⠗⠲ ⡹⠳ + ⠺⠊⠇⠇ ⠹⠻⠑⠋⠕⠗⠑ ⠏⠻⠍⠊⠞ ⠍⠑ ⠞⠕ ⠗⠑⠏⠑⠁⠞⠂ ⠑⠍⠏⠙⠁⠞⠊⠊⠁⠇⠇⠹⠂ ⠹⠁⠞ + ⡍⠜⠇⠑⠹ ⠺⠁⠎ ⠁⠎ ⠙⠑⠁⠙ ⠁⠎ ⠁ ⠙⠕⠕⠗⠤⠝⠁⠊⠇⠲ + + (The first couple of paragraphs of "A Christmas Carol" by Dickens) + +Compact font selection example text: + + ABCDEFGHIJKLMNOPQRSTUVWXYZ /0123456789 + abcdefghijklmnopqrstuvwxyz £©µÀÆÖÞßéöÿ + –—‘“”„†•…‰™œŠŸž€ ΑΒΓΔΩαβγδω АБВГДабвгд + ∀∂∈ℝ∧∪≡∞ ↑↗↨↻⇣ ┐┼╔╘░►☺♀ fi�⑀₂ἠḂӥẄɐː⍎אԱა + +Greetings in various languages: + + Hello world, Καλημέρα κόσμε, コンニチハ + +Box drawing alignment tests: █ + ▉ + ╔══╦══╗ ┌──┬──┐ ╭──┬──╮ ╭──┬──╮ ┏━━┳━━┓ ┎┒┏┑ ╷ ╻ ┏┯┓ ┌┰┐ ▊ ╱╲╱╲╳╳╳ + ║┌─╨─┐║ │╔═╧═╗│ │╒═╪═╕│ │╓─╁─╖│ ┃┌─╂─┐┃ ┗╃╄┙ ╶┼╴╺╋╸┠┼┨ ┝╋┥ ▋ ╲╱╲╱╳╳╳ + ║│╲ ╱│║ │║ ║│ ││ │ ││ │║ ┃ ║│ ┃│ ╿ │┃ ┍╅╆┓ ╵ ╹ ┗┷┛ └┸┘ ▌ ╱╲╱╲╳╳╳ + ╠╡ ╳ ╞╣ ├╢ ╟┤ ├┼─┼─┼┤ ├╫─╂─╫┤ ┣┿╾┼╼┿┫ ┕┛┖┚ ┌┄┄┐ ╎ ┏┅┅┓ ┋ ▍ ╲╱╲╱╳╳╳ + ║│╱ ╲│║ │║ ║│ ││ │ ││ │║ ┃ ║│ ┃│ ╽ │┃ ░░▒▒▓▓██ ┊ ┆ ╎ ╏ ┇ ┋ ▎ + ║└─╥─┘║ │╚═╤═╝│ │╘═╪═╛│ │╙─╀─╜│ ┃└─╂─┘┃ ░░▒▒▓▓██ ┊ ┆ ╎ ╏ ┇ ┋ ▏ + ╚══╩══╝ └──┴──┘ ╰──┴──╯ ╰──┴──╯ ┗━━┻━━┛ └╌╌┘ ╎ ┗╍╍┛ ┋ ▁▂▃▄▅▆▇█ + +""" + + +class FSTestCases(object): + """Basic FS tests.""" + + data1 = b"foo" * 256 * 1024 + data2 = b"bar" * 2 * 256 * 1024 + data3 = b"baz" * 3 * 256 * 1024 + data4 = b"egg" * 7 * 256 * 1024 + + def make_fs(self): + """Return an FS instance.""" + raise NotImplementedError("implement me") + + def destroy_fs(self, fs): + """Destroy a FS instance. + + Arguments: + fs (FS): A filesystem instance previously opened + by `~fs.test.FSTestCases.make_fs`. + + """ + pass + # fs.close() + + def setUp(self): + self.fs = self.make_fs() + + def tearDown(self): + self.destroy_fs(self.fs) + del self.fs + + def assert_exists(self, path): + """Assert a path exists. + + Arguments: + path (str): A path on the filesystem. + + """ + self.assertTrue(self.fs.exists(path)) + + def assert_not_exists(self, path): + """Assert a path does not exist. + + Arguments: + path (str): A path on the filesystem. + + """ + self.assertFalse(self.fs.exists(path)) + + def assert_isempty(self, path): + """Assert a path is an empty directory. + + Arguments: + path (str): A path on the filesystem. + + """ + self.assertTrue(self.fs.isempty(path)) + + def assert_isfile(self, path): + """Assert a path is a file. + + Arguments: + path (str): A path on the filesystem. + + """ + self.assertTrue(self.fs.isfile(path)) + + def assert_isdir(self, path): + """Assert a path is a directory. + + Arguments: + path (str): A path on the filesystem. + + """ + self.assertTrue(self.fs.isdir(path)) + + def assert_bytes(self, path, contents): + """Assert a file contains the given bytes. + + Arguments: + path (str): A path on the filesystem. + contents (bytes): Bytes to compare. + + """ + assert isinstance(contents, bytes) + data = self.fs.read_bytes(path) + self.assertEqual(data, contents) + self.assertIsInstance(data, bytes) + + def assert_text(self, path, contents): + """Assert a file contains the given text. + + Arguments: + path (str): A path on the filesystem. + contents (str): Text to compare. + + """ + assert isinstance(contents, text_type) + assert self.fs.exists(path) + with self.fs.open(path, "rt") as f: + data = f.read() + self.assertEqual(data, contents) + self.assertIsInstance(data, text_type) + + def test_root_dir(self): + with self.assertRaises(IsADirectoryError): + self.fs.open("/") + with self.assertRaises(IsADirectoryError): + self.fs.open("/") + + def test_basic(self): + #  Check str and repr don't break + repr(self.fs) + self.assertIsInstance(six.text_type(self.fs), six.text_type) + + def test_getmeta(self): + # Get the meta dict + meta = self.fs.getmeta() + + # Check default namespace + self.assertEqual(meta, self.fs.getmeta(namespace="standard")) + + # Must be a dict + self.assertTrue(isinstance(meta, dict)) + + no_meta = self.fs.getmeta("__nosuchnamespace__") + self.assertIsInstance(no_meta, dict) + self.assertFalse(no_meta) + + def test_isfile(self): + self.assertFalse(self.fs.isfile("foo.txt")) + self.fs._create("foo.txt") + self.assertTrue(self.fs.isfile("foo.txt")) + self.fs.makedir("bar") + self.assertFalse(self.fs.isfile("bar")) + + def test_isdir(self): + self.assertFalse(self.fs.isdir("foo")) + self.fs._create("bar") + self.fs.makedir("foo") + self.assertTrue(self.fs.isdir("foo")) + self.assertFalse(self.fs.isdir("bar")) + + def test_getsize(self): + self.fs.write_bytes("empty", b"") + self.fs.write_bytes("one", b"a") + self.fs.write_bytes("onethousand", ("b" * 1000).encode("ascii")) + self.assertEqual(self.fs._getsize("empty"), 0) + self.assertEqual(self.fs._getsize("one"), 1) + self.assertEqual(self.fs._getsize("onethousand"), 1000) + with self.assertRaises(FileNotFoundError): + self.fs._getsize("doesnotexist") + + def test_invalid_chars(self): + # Test invalid path method. + with self.assertRaises(ValueError): + # ValueError: embedded null byte + self.fs.open("invalid\0file", "wb") + + # with self.assertRaises(ValueError): + # # ValueError: embedded null byte + # self.fs.validatepath("invalid\0file") + + def test_exists(self): + # Test exists method. + # Check root directory always exists + self.assertTrue(self.fs.exists("/")) + self.assertTrue(self.fs.exists("")) + + # Check files don't exist + self.assertFalse(self.fs.exists("foo")) + self.assertFalse(self.fs.exists("foo/bar")) + self.assertFalse(self.fs.exists("foo/bar/baz")) + self.assertFalse(self.fs.exists("egg")) + + # make some files and directories + self.fs.makedirs("foo/bar") + self.fs.write_bytes("foo/bar/baz", b"test") + + # Check files exists + self.assertTrue(self.fs.exists("foo")) + self.assertTrue(self.fs.exists("foo/bar")) + self.assertTrue(self.fs.exists("foo/bar/baz")) + self.assertFalse(self.fs.exists("egg")) + + self.assert_exists("foo") + self.assert_exists("foo/bar") + self.assert_exists("foo/bar/baz") + self.assert_not_exists("egg") + + # Delete a file + self.fs.rm("foo/bar/baz") + # Check it no longer exists + self.assert_not_exists("foo/bar/baz") + self.assertFalse(self.fs.exists("foo/bar/baz")) + self.assert_not_exists("foo/bar/baz") + + # Check root directory always exists + self.assertTrue(self.fs.exists("/")) + self.assertTrue(self.fs.exists("")) + + def test_listdir(self): + # Check listing directory that doesn't exist + with self.assertRaises(FileNotFoundError): + self.fs.listdir("foobar", detail=False) + + # Check aliases for root + self.assertEqual(self.fs.listdir("/", detail=False), []) + self.assertEqual(self.fs.listdir(".", detail=False), []) + self.assertEqual(self.fs.listdir("./", detail=False), []) + + # Make a few objects + self.fs.write_bytes("foo", b"egg") + self.fs.write_bytes("bar", b"egg") + self.fs.makedir("baz") + + # This should not be listed + self.fs.write_bytes("baz/egg", b"egg") + + # Check list works + six.assertCountEqual(self, self.fs.listdir("/", detail=False), ["foo", "bar", "baz"]) + six.assertCountEqual(self, self.fs.listdir(".", detail=False), ["foo", "bar", "baz"]) + six.assertCountEqual(self, self.fs.listdir("./", detail=False), ["foo", "bar", "baz"]) + + # Check paths are unicode strings + for name in self.fs.listdir("/", detail=False): + self.assertIsInstance(name, text_type) + + # Create a subdirectory + self.fs.makedir("dir") + + # Should start empty + self.assertEqual(self.fs.listdir("/dir", detail=False), []) + + # Write some files + self.fs.write_bytes("dir/foofoo", b"egg") + self.fs.write_bytes("dir/barbar", b"egg") + + # Check listing subdirectory + six.assertCountEqual(self, self.fs.listdir("dir", detail=False), ["foofoo", "barbar"]) + # Make sure they are unicode stringd + for name in self.fs.listdir("dir", detail=False): + self.assertIsInstance(name, text_type) + + self.fs._create("notadir") + with self.assertRaises(NotADirectoryError): + self.fs.listdir("notadir", detail=False) + + def test_move(self): + # Make a file + self.fs.write_bytes("foo", b"egg") + self.assert_isfile("foo") + + # Move it + self.fs.move("foo", "bar") + + # Check it has gone from original location + self.assert_not_exists("foo") + + # Check it exists in the new location, and contents match + self.assert_exists("bar") + self.assert_bytes("bar", b"egg") + + # # Check moving to existing file fails + # self.fs.write_bytes("foo2", b"eggegg") + # with self.assertRaises(errors.DestinationExists): + # self.fs.move("foo2", "bar") + + # Check move with overwrite=True + self.fs.move("foo2", "bar", overwrite=True) + self.assert_not_exists("foo2") + + # # Check moving to a non-existant directory + # with self.assertRaises(FileNotFoundError): + # self.fs.move("bar", "egg/bar") + + # Check moving an unexisting source + with self.assertRaises(FileNotFoundError): + self.fs.move("egg", "spam") + + # Check moving between different directories + self.fs.makedir("baz") + self.fs.write_bytes("baz/bazbaz", b"bazbaz") + self.fs.makedir("baz2") + self.fs.move("baz/bazbaz", "baz2/bazbaz") + self.assert_not_exists("baz/bazbaz") + self.assert_bytes("baz2/bazbaz", b"bazbaz") + + # Check moving a directory raises an error + self.assert_isdir("baz2") + self.assert_not_exists("yolk") + with self.assertRaises(IsADirectoryError): + self.fs.move("baz2", "yolk") + + def test_makedir(self): + # Check edge case of root + with self.assertRaises(FileExistsError): + self.fs.makedir("/") + + # Making root is a null op with exist_ok + self.fs.makedirs("/", exist_ok=True) + self.assertEqual(self.fs.listdir("/"), []) + + self.assert_not_exists("foo") + self.fs.makedir("foo") + self.assert_isdir("foo") + self.assertEqual(self.fs._gettype("foo"), "directory") + self.fs.write_bytes("foo/bar.txt", b"egg") + self.assert_bytes("foo/bar.txt", b"egg") + + # Directory exists + with self.assertRaises(FileExistsError): + self.fs.makedir("foo") + + # Parent directory doesn't exist + with self.assertRaises(FileNotFoundError): + self.fs.makedir("/foo/bar/baz") + + self.fs.makedir("/foo/bar") + self.fs.makedir("/foo/bar/baz") + + with self.assertRaises(FileExistsError): + self.fs.makedir("foo/bar/baz") + + with self.assertRaises(FileExistsError): + self.fs.makedir("foo/bar.txt") + + def test_makedirs(self): + self.assertFalse(self.fs.exists("foo")) + self.fs.makedirs("foo") + self.assertEqual(self.fs._gettype("foo"), "directory") + + self.fs.makedirs("foo/bar/baz") + self.assertTrue(self.fs.isdir("foo/bar")) + self.assertTrue(self.fs.isdir("foo/bar/baz")) + + with self.assertRaises(FileExistsError): + self.fs.makedirs("foo/bar/baz") + + self.fs.makedirs("foo/bar/baz", exist_ok=True) + + self.fs.write_bytes("foo.bin", b"test") + with self.assertRaises(NotADirectoryError): + self.fs.makedirs("foo.bin/bar") + + with self.assertRaises(NotADirectoryError): + self.fs.makedirs("foo.bin/bar/baz/egg") + + def test_repeat_dir(self): + # Catches bug with directories contain repeated names, + # discovered in s3fs + self.fs.makedirs("foo/foo/foo") + self.assertEqual(self.fs.listdir(""), ["foo"]) + self.assertEqual(self.fs.listdir("foo"), ["foo"]) + self.assertEqual(self.fs.listdir("foo/foo"), ["foo"]) + self.assertEqual(self.fs.listdir("foo/foo/foo"), []) + scan = list(self.fs.scandir("foo")) + self.assertEqual(len(scan), 1) + self.assertEqual(scan[0].name, "foo") + + def test_open(self): + # Open a file that doesn't exist + with self.assertRaises(FileNotFoundError): + self.fs.open("doesnotexist", "r") + + self.fs.makedir("foo") + + # Create a new text file + text = "Hello, World" + + with self.fs.open("foo/hello", "wt") as f: + repr(f) + self.assertIsInstance(f, io.IOBase) + self.assertTrue(f.writable()) + self.assertFalse(f.readable()) + self.assertFalse(f.closed) + f.write(text) + self.assertTrue(f.closed) + + # Read it back + with self.fs.open("foo/hello", "rt") as f: + self.assertIsInstance(f, io.IOBase) + self.assertTrue(f.readable()) + self.assertFalse(f.writable()) + self.assertFalse(f.closed) + hello = f.read() + self.assertTrue(f.closed) + self.assertEqual(hello, text) + self.assert_text("foo/hello", text) + + # Test overwrite + text = "Goodbye, World" + with self.fs.open("foo/hello", "wt") as f: + f.write(text) + self.assert_text("foo/hello", text) + + # Open from missing dir + with self.assertRaises(FileNotFoundError): + self.fs.open("/foo/bar/test.txt") + + # Test fileno returns a file number, if supported by the file. + with self.fs.open("foo/hello") as f: + try: + fn = f.fileno() + except io.UnsupportedOperation: + pass + else: + self.assertEqual(os.read(fn, 7), b"Goodbye") + + # Test text files are proper iterators over themselves + lines = os.linesep.join(["Line 1", "Line 2", "Line 3"]) + self.fs.write_text("iter.txt", lines) + with self.fs.open("iter.txt") as f: + for actual, expected in zip(f, lines.splitlines(1)): + self.assertEqual(actual, expected) + + def test_openbin_rw(self): + # Open a file that doesn't exist + with self.assertRaises(FileNotFoundError): + self.fs.open("doesnotexist", "r") + + self.fs.makedir("foo") + + # Create a new text file + text = b"Hello, World\n" + + with self.fs.open("foo/hello", "w") as f: + repr(f) + self.assertIn("b", f.mode) + self.assertIsInstance(f, io.IOBase) + self.assertTrue(f.writable()) + self.assertFalse(f.readable()) + self.assertEqual(len(text), f.write(text)) + self.assertFalse(f.closed) + self.assertTrue(f.closed) + + with self.assertRaises(FileExistsError): + with self.fs.open("foo/hello", "x") as f: + pass + + # Read it back + with self.fs.open("foo/hello", "r") as f: + self.assertIn("b", f.mode) + self.assertIsInstance(f, io.IOBase) + self.assertTrue(f.readable()) + self.assertFalse(f.writable()) + hello = f.read() + self.assertFalse(f.closed) + self.assertTrue(f.closed) + self.assertEqual(hello, text) + self.assert_bytes("foo/hello", text) + + # Test overwrite + text = b"Goodbye, World" + with self.fs.open("foo/hello", "w") as f: + self.assertEqual(len(text), f.write(text)) + self.assert_bytes("foo/hello", text) + + # Test FileExpected raised + with self.assertRaises(IsADirectoryError): + self.fs.open("foo") # directory + + # Open from missing dir + with self.assertRaises(FileNotFoundError): + self.fs.open("/foo/bar/test.txt") + + # Test fileno returns a file number, if supported by the file. + with self.fs.open("foo/hello") as f: + try: + fn = f.fileno() + except io.UnsupportedOperation: + pass + else: + self.assertEqual(os.read(fn, 7), b"Goodbye") + + # Test binary files are proper iterators over themselves + lines = b"\n".join([b"Line 1", b"Line 2", b"Line 3"]) + self.fs.write_bytes("iter.bin", lines) + with self.fs.open("iter.bin") as f: + for actual, expected in zip(f, lines.splitlines(1)): + self.assertEqual(actual, expected) + + def test_open_files(self): + # Test file-like objects work as expected. + + with self.fs.open("text", "w") as f: + repr(f) + text_type(f) + self.assertIsInstance(f, io.IOBase) + self.assertTrue(f.writable()) + self.assertFalse(f.readable()) + self.assertFalse(f.closed) + self.assertEqual(f.tell(), 0) + f.write("Hello\nWorld\n") + self.assertEqual(f.tell(), 12) + f.writelines(["foo\n", "bar\n", "baz\n"]) + with self.assertRaises(IOError): + f.read(1) + self.assertTrue(f.closed) + + with self.fs.open("bin", "wb") as f: + with self.assertRaises(IOError): + f.read(1) + + with self.fs.open("text", "r") as f: + repr(f) + text_type(f) + self.assertIsInstance(f, io.IOBase) + self.assertFalse(f.writable()) + self.assertTrue(f.readable()) + self.assertFalse(f.closed) + self.assertEqual( + f.readlines(), ["Hello\n", "World\n", "foo\n", "bar\n", "baz\n"] + ) + with self.assertRaises(IOError): + f.write("no") + self.assertTrue(f.closed) + + with self.fs.open("text", "rb") as f: + self.assertIsInstance(f, io.IOBase) + self.assertFalse(f.writable()) + self.assertTrue(f.readable()) + self.assertFalse(f.closed) + self.assertEqual(f.readlines(8), [b"Hello\n", b"World\n"]) + self.assertEqual(f.tell(), 12) + buffer = bytearray(4) + self.assertEqual(f.readinto(buffer), 4) + self.assertEqual(f.tell(), 16) + self.assertEqual(buffer, b"foo\n") + with self.assertRaises(IOError): + f.write(b"no") + self.assertTrue(f.closed) + + with self.fs.open("text", "r") as f: + self.assertEqual(list(f), ["Hello\n", "World\n", "foo\n", "bar\n", "baz\n"]) + self.assertFalse(f.closed) + self.assertTrue(f.closed) + + with self.fs.open("text") as f: + iter_lines = iter(f) + self.assertEqual(next(iter_lines), "Hello\n") + + with self.fs.open("unicode", "w") as f: + self.assertEqual(12, f.write("Héllo\nWörld\n")) + + with self.fs.open("text", "rb") as f: + self.assertIsInstance(f, io.IOBase) + self.assertFalse(f.writable()) + self.assertTrue(f.readable()) + self.assertTrue(f.seekable()) + self.assertFalse(f.closed) + self.assertEqual(f.read(1), b"H") + # FIXME from fs import Seek + # self.assertEqual(3, f.seek(3, Seek.set)) + self.assertEqual(f.read(1), b"l") + # self.assertEqual(6, f.seek(2, Seek.current)) + self.assertEqual(f.read(1), b"W") + # self.assertEqual(22, f.seek(-2, Seek.end)) + self.assertEqual(f.read(1), b"z") + with self.assertRaises(ValueError): + f.seek(10, 77) + self.assertTrue(f.closed) + + with self.fs.open("text", "r+b") as f: + self.assertIsInstance(f, io.IOBase) + self.assertTrue(f.readable()) + self.assertTrue(f.writable()) + self.assertTrue(f.seekable()) + self.assertFalse(f.closed) + self.assertEqual(5, f.seek(5)) + self.assertEqual(5, f.truncate()) + self.assertEqual(0, f.seek(0)) + self.assertEqual(f.read(), b"Hello") + self.assertEqual(10, f.truncate(10)) + self.assertEqual(5, f.tell()) + self.assertEqual(0, f.seek(0)) + print(repr(self.fs)) + print(repr(f)) + self.assertEqual(f.read(), b"Hello\0\0\0\0\0") + self.assertEqual(4, f.seek(4)) + f.write(b"O") + self.assertEqual(4, f.seek(4)) + self.assertEqual(f.read(1), b"O") + self.assertTrue(f.closed) + + def test_openbin(self): + # Write a binary file + with self.fs.open("file.bin", "wb") as write_file: + repr(write_file) + text_type(write_file) + self.assertIn("b", write_file.mode) + self.assertIsInstance(write_file, io.IOBase) + self.assertTrue(write_file.writable()) + self.assertFalse(write_file.readable()) + self.assertFalse(write_file.closed) + self.assertEqual(3, write_file.write(b"\0\1\2")) + self.assertTrue(write_file.closed) + + # Read a binary file + with self.fs.open("file.bin", "rb") as read_file: + repr(write_file) + text_type(write_file) + self.assertIn("b", read_file.mode) + self.assertIsInstance(read_file, io.IOBase) + self.assertTrue(read_file.readable()) + self.assertFalse(read_file.writable()) + self.assertFalse(read_file.closed) + data = read_file.read() + self.assertEqual(data, b"\0\1\2") + self.assertTrue(read_file.closed) + + # Check disallow text mode + with self.assertRaises(ValueError): + with self.fs.open("file.bin", "rt") as read_file: + pass + + # Check errors + with self.assertRaises(FileNotFoundError): + self.fs.open("foo.bin") + + # Open from missing dir + with self.assertRaises(FileNotFoundError): + self.fs.open("/foo/bar/test.txt") + + self.fs.makedir("foo") + # Attempt to open a directory + with self.assertRaises(IsADirectoryError): + self.fs.open("/foo") + + # Attempt to write to a directory + with self.assertRaises(IsADirectoryError): + self.fs.open("/foo", "w") + + # Opening a file in a directory which doesn't exist + with self.assertRaises(FileNotFoundError): + self.fs.open("/egg/bar") + + # Opening a file in a directory which doesn't exist + with self.assertRaises(FileNotFoundError): + self.fs.open("/egg/bar", "w") + + # Opening with a invalid mode + with self.assertRaises(ValueError): + self.fs.open("foo.bin", "h") + + def test_open_exclusive(self): + with self.fs.open("test_open_exclusive", "x") as f: + f.write("bananas") + + with self.assertRaises(FileExistsError): + self.fs.open("test_open_exclusive", "x") + + def test_openbin_exclusive(self): + with self.fs.open("test_openbin_exclusive", "x") as f: + f.write(b"bananas") + + with self.assertRaises(FileExistsError): + self.fs.open("test_openbin_exclusive", "x") + + def test_opendir(self): + # Make a simple directory structure + self.fs.makedir("foo") + self.fs.write_bytes("foo/bar", b"barbar") + self.fs.write_bytes("foo/egg", b"eggegg") + + # Open a sub directory + with self.fs.opendir("foo") as foo_fs: + repr(foo_fs) + text_type(foo_fs) + six.assertCountEqual(self, foo_fs.listdir("/"), ["bar", "egg"]) + self.assertTrue(foo_fs.isfile("bar")) + self.assertTrue(foo_fs.isfile("egg")) + self.assertEqual(foo_fs.read_bytes("bar"), b"barbar") + self.assertEqual(foo_fs.read_bytes("egg"), b"eggegg") + + # Attempt to open a non-existent directory + with self.assertRaises(FileNotFoundError): + self.fs.opendir("egg") + + # Check error when doing opendir on a non dir + with self.assertRaises(NotADirectoryError): + self.fs.opendir("foo/egg") + + # These should work, and will essentially return a 'clone' of sorts + self.fs.opendir("") + self.fs.opendir("/") + + def test_remove(self): + + self.fs.write_bytes("foo1", b"test1") + self.fs.write_bytes("foo2", b"test2") + self.fs.write_bytes("foo3", b"test3") + + self.assert_isfile("foo1") + self.assert_isfile("foo2") + self.assert_isfile("foo3") + + self.fs.rm("foo2") + + self.assert_isfile("foo1") + self.assert_not_exists("foo2") + self.assert_isfile("foo3") + + with self.assertRaises(FileNotFoundError): + self.fs.rm("bar") + + self.fs.makedir("dir") + with self.assertRaises(IsADirectoryError): + self.fs.rm("dir") + + self.fs.makedirs("foo/bar/baz/") + + error_msg = "resource 'foo/bar/egg/test.txt' not found" + assertRaisesRegex = getattr(self, "assertRaisesRegex", self.assertRaisesRegexp) + with assertRaisesRegex(FileNotFoundError, error_msg): + self.fs.rm("foo/bar/egg/test.txt") + + def test_removedir(self): + + # Test removing root + with self.assertRaises(OSError): + # OSError: [Errno 39] Directory not empty: '/' + self.fs.removedir("/") + + self.fs.makedirs("foo/bar/baz") + self.assertTrue(self.fs.exists("foo/bar/baz")) + self.fs.removedir("foo/bar/baz") + self.assertFalse(self.fs.exists("foo/bar/baz")) + self.assertTrue(self.fs.isdir("foo/bar")) + + with self.assertRaises(FileNotFoundError): + self.fs.removedir("nodir") + + # Test force removal + self.fs.makedirs("foo/bar/baz") + self.fs.write_bytes("foo/egg", b"test") + + with self.assertRaises(NotADirectoryError): + self.fs.removedir("foo/egg") + + with self.assertRaises(OSError): + # OSError: [Errno 39] Directory not empty: 'foo/bar' + self.fs.removedir("foo/bar") + + def test_removetree(self): + self.fs.makedirs("spam") + self.fs.makedirs("foo/bar/baz") + self.fs.makedirs("foo/egg") + self.fs.makedirs("foo/a/b/c/d/e") + self.fs._create("foo/egg.txt") + self.fs._create("foo/bar/egg.bin") + self.fs._create("foo/bar/baz/egg.txt") + self.fs._create("foo/a/b/c/1.txt") + self.fs._create("foo/a/b/c/2.txt") + self.fs._create("foo/a/b/c/3.txt") + + self.assert_exists("foo/egg.txt") + self.assert_exists("foo/bar/egg.bin") + + self.fs.removetree("foo") + self.assert_not_exists("foo") + self.assert_exists("spam") + + # Errors on files + self.fs._create("bar") + with self.assertRaises(NotADirectoryError): + self.fs.removetree("bar") + + # Errors on non-existing path + with self.assertRaises(FileNotFoundError): + self.fs.removetree("foofoo") + + def test_removetree_root(self): + self.fs.makedirs("foo/bar/baz") + self.fs.makedirs("foo/egg") + self.fs.makedirs("foo/a/b/c/d/e") + self.fs._create("foo/egg.txt") + self.fs._create("foo/bar/egg.bin") + self.fs._create("foo/a/b/c/1.txt") + self.fs._create("foo/a/b/c/2.txt") + self.fs._create("foo/a/b/c/3.txt") + + self.assert_exists("foo/egg.txt") + self.assert_exists("foo/bar/egg.bin") + + # removetree("/") removes the contents, + # but not the root folder itself + self.fs.removetree("/") + self.assert_exists("/") + self.assert_isempty("/") + + # we check we can create a file after + # to catch potential issues with the + # root folder being deleted on faulty + # implementations + self.fs._create("egg") + self.fs.makedir("yolk") + self.assert_exists("egg") + self.assert_exists("yolk") + + def test_setinfo(self): + self.fs._create("birthday.txt") + now = time.time() + + change_info = {"details": {"accessed": now + 60, "modified": now + 60 * 60}} + self.fs.setinfo("birthday.txt", change_info) + new_info = self.fs.getinfo("birthday.txt", namespaces=["details"]) + can_write_acccess = new_info.is_writeable("details", "accessed") + can_write_modified = new_info.is_writeable("details", "modified") + if can_write_acccess: + self.assertAlmostEqual( + new_info.get("details", "accessed"), now + 60, places=4 + ) + if can_write_modified: + self.assertAlmostEqual( + new_info.get("details", "modified"), now + 60 * 60, places=4 + ) + + with self.assertRaises(FileNotFoundError): + self.fs.setinfo("nothing", {}) + + def test_copy(self): + # Test copy to new path + self.fs.write_bytes("foo", b"test") + self.fs.copy("foo", "bar") + self.assert_bytes("bar", b"test") + + # Test copy over existing path + self.fs.write_bytes("baz", b"truncateme") + self.fs.copy("foo", "baz", overwrite=True) + self.assert_bytes("foo", b"test") + + # # Test copying a file to a destination that exists + # with self.assertRaises(errors.DestinationExists): + # self.fs.copy("baz", "foo") + + # # Test copying to a directory that doesn't exist + # with self.assertRaises(FileNotFoundError): + # self.fs.copy("baz", "a/b/c/baz") + + # Test copying a source that doesn't exist + with self.assertRaises(FileNotFoundError): + self.fs.copy("egg", "spam") + + def _test_upload(self, workers): + """Test copy_fs with varying number of worker threads.""" + with get_filesystem("tempdir") as src_fs: + src_fs.write_bytes("foo", self.data1) + src_fs.write_bytes("bar", self.data2) + src_fs.makedir("dir1") + src_fs.write_bytes("dir1/baz", self.data3) + src_fs.makedirs("dir2/dir3") + src_fs.write_bytes("dir2/dir3/egg", self.data4) + dst_fs = self.fs + copy_fs(src_fs, dst_fs, workers=workers) + self.assertEqual(dst_fs.read_bytes("foo"), self.data1) + self.assertEqual(dst_fs.read_bytes("bar"), self.data2) + self.assertEqual(dst_fs.read_bytes("dir1/baz"), self.data3) + self.assertEqual(dst_fs.read_bytes("dir2/dir3/egg"), self.data4) + + def test_upload_0(self): + self._test_upload(0) + + # def test_upload_1(self): + # self._test_upload(1) + + # def test_upload_2(self): + # self._test_upload(2) + + # def test_upload_4(self): + # self._test_upload(4) + + def _test_download(self, workers): + """Test copy_fs with varying number of worker threads.""" + src_fs = self.fs + with get_filesystem("tempdir") as dst_fs: + src_fs.write_bytes("foo", self.data1) + src_fs.write_bytes("bar", self.data2) + src_fs.makedir("dir1") + src_fs.write_bytes("dir1/baz", self.data3) + src_fs.makedirs("dir2/dir3") + src_fs.write_bytes("dir2/dir3/egg", self.data4) + copy_fs(src_fs, dst_fs, workers=workers) + self.assertEqual(dst_fs.read_bytes("foo"), self.data1) + self.assertEqual(dst_fs.read_bytes("bar"), self.data2) + self.assertEqual(dst_fs.read_bytes("dir1/baz"), self.data3) + self.assertEqual(dst_fs.read_bytes("dir2/dir3/egg"), self.data4) + + def test_download_0(self): + self._test_download(0) + + # def test_download_1(self): + # self._test_download(1) + + # def test_download_2(self): + # self._test_download(2) + + # def test_download_4(self): + # self._test_download(4) + + def test_create(self): + # Test create new file + self.assertFalse(self.fs.exists("foo")) + self.fs._create("foo") + self.assertTrue(self.fs.exists("foo")) + self.assertEqual(self.fs._gettype("foo"), "file") + self.assertEqual(self.fs._getsize("foo"), 0) + + # Test wipe existing file + self.fs.write_bytes("foo", b"bar") + self.assertEqual(self.fs._getsize("foo"), 3) + self.fs._create("foo", wipe=True) + self.assertEqual(self.fs._getsize("foo"), 0) + + # Test create with existing file, and not wipe + self.fs.write_bytes("foo", b"bar") + self.assertEqual(self.fs._getsize("foo"), 3) + self.fs._create("foo", wipe=False) + self.assertEqual(self.fs._getsize("foo"), 3) + + def test_scandir(self): + # Check exception for scanning dir that doesn't exist + with self.assertRaises(FileNotFoundError): + for _info in self.fs.scandir("/foobar"): + pass + + # Check scandir returns an iterable + iter_scandir = self.fs.scandir("/") + self.assertTrue(isinstance(iter_scandir, collections_abc.Iterable)) + self.assertEqual(list(iter_scandir), []) + + # Check scanning + self.fs._create("foo") + + # Can't scandir on a file + with self.assertRaises(NotADirectoryError): + list(self.fs.scandir("foo")) + + self.fs._create("bar") + self.fs.makedir("dir") + iter_scandir = self.fs.scandir("/") + self.assertTrue(isinstance(iter_scandir, collections_abc.Iterable)) + + scandir = sorted( + (r.raw for r in iter_scandir), key=lambda info: info["basic"]["name"] + ) + + # Filesystems may send us more than we ask for + # We just want to test the 'basic' namespace + scandir = [{"basic": i["basic"]} for i in scandir] + + self.assertEqual( + scandir, + [ + {"basic": {"name": "bar", "is_dir": False}}, + {"basic": {"name": "dir", "is_dir": True}}, + {"basic": {"name": "foo", "is_dir": False}}, + ], + ) + + # Hard to test optional namespaces, but at least run the code + list( + self.fs.scandir( + "/", namespaces=["details", "link", "stat", "lstat", "access"] + ) + ) + + # Test paging + page1 = list(self.fs.scandir("/", page=(None, 2))) + self.assertEqual(len(page1), 2) + page2 = list(self.fs.scandir("/", page=(2, 4))) + self.assertEqual(len(page2), 1) + page3 = list(self.fs.scandir("/", page=(4, 6))) + self.assertEqual(len(page3), 0) + paged = {r.name for r in itertools.chain(page1, page2)} + self.assertEqual(paged, {"foo", "bar", "dir"}) + + def test_readbytes(self): + # Test readbytes method. + all_bytes = b"".join(six.int2byte(n) for n in range(256)) + with self.fs.open("foo", "wb") as f: + f.write(all_bytes) + self.assertEqual(self.fs.read_bytes("foo"), all_bytes) + _all_bytes = self.fs.read_bytes("foo") + self.assertIsInstance(_all_bytes, bytes) + self.assertEqual(_all_bytes, all_bytes) + + with self.assertRaises(FileNotFoundError): + self.fs.read_bytes("foo/bar") + + self.fs.makedir("baz") + with self.assertRaises(IsADirectoryError): + self.fs.read_bytes("baz") + + def test_download(self): + test_bytes = b"Hello, World" + self.fs.write_bytes("hello.bin", test_bytes) + write_file = tempfile.mktemp(prefix="pyfatfs.test_download.write_file", suffix=".bin") + self.fs.download("hello.bin", write_file) + with open(write_file, "rb") as f: + actual_bytes = f.read() + os.unlink(write_file) + self.assertEqual(actual_bytes, test_bytes) + + with self.assertRaises(FileNotFoundError): + self.fs.download("foo.bin", write_file) + + # no. fs.get_file uses fs.blocksize as chunk size + # def test_download_chunk_size(self): + # test_bytes = b"Hello, World" * 100 + # self.fs.write_bytes("hello.bin", test_bytes) + # write_file = io.BytesIO() + # self.fs.download("hello.bin", write_file, chunk_size=8) + # self.assertEqual(write_file.getvalue(), test_bytes) + + def test_write_bytes(self): + all_bytes = b"".join(six.int2byte(n) for n in range(256)) + self.fs.write_bytes("foo", all_bytes) + with self.fs.open("foo", "rb") as f: + _bytes = f.read() + self.assertIsInstance(_bytes, bytes) + self.assertEqual(_bytes, all_bytes) + self.assert_bytes("foo", all_bytes) + with self.assertRaises(TypeError): + self.fs.write_bytes("notbytes", "unicode") + + def test_readtext(self): + self.fs.makedir("foo") + with self.fs.open("foo/unicode.txt", "wt") as f: + f.write(UNICODE_TEXT) + text = self.fs.readtext("foo/unicode.txt") + self.assertIsInstance(text, text_type) + self.assertEqual(text, UNICODE_TEXT) + self.assert_text("foo/unicode.txt", UNICODE_TEXT) + + def test_writetext(self): + # Test write_text method. + self.fs.write_text("foo", "bar") + with self.fs.open("foo", "rt") as f: + foo = f.read() + self.assertEqual(foo, "bar") + self.assertIsInstance(foo, text_type) + with self.assertRaises(TypeError): + self.fs.write_text("nottext", b"bytes") + + def test_writefile(self): + bytes_file = io.BytesIO(b"bar") + self.fs.writefile("foo", bytes_file) + with self.fs.open("foo", "rb") as f: + data = f.read() + self.assertEqual(data, b"bar") + + def test_upload(self): + bytes_file = io.BytesIO(b"bar") + self.fs.upload("foo", bytes_file) + with self.fs.open("foo", "rb") as f: + data = f.read() + self.assertEqual(data, b"bar") + + # upload to non-existing path (/spam/eggs) + with self.assertRaises(FileNotFoundError): + self.fs.upload("/spam/eggs", bytes_file) + + def test_upload_chunk_size(self): + test_data = b"bar" * 128 + bytes_file = io.BytesIO(test_data) + self.fs.upload("foo", bytes_file, chunk_size=8) + with self.fs.open("foo", "rb") as f: + data = f.read() + self.assertEqual(data, test_data) + + def test_bin_files(self): + # Check binary files. + with self.fs.open("foo1", "wb") as f: + text_type(f) + repr(f) + f.write(b"a") + f.write(b"b") + f.write(b"c") + self.assert_bytes("foo1", b"abc") + + # Test writelines + with self.fs.open("foo2", "wb") as f: + f.writelines([b"hello\n", b"world"]) + self.assert_bytes("foo2", b"hello\nworld") + + # Test readline + with self.fs.open("foo2") as f: + self.assertEqual(f.readline(), "hello\n") + self.assertEqual(f.readline(), "world") + + # Test readlines + with self.fs.open("foo2") as f: + lines = f.readlines() + self.assertEqual(lines, ["hello\n", "world"]) + with self.fs.open("foo2") as f: + lines = list(f) + self.assertEqual(lines, ["hello\n", "world"]) + with self.fs.open("foo2") as f: + lines = [] + for line in f: + lines.append(line) + self.assertEqual(lines, ["hello\n", "world"]) + with self.fs.open("foo2") as f: + print(repr(f)) + self.assertEqual(next(f), "hello\n") + + # Test truncate + with self.fs.open("foo2", "r+b") as f: + f.truncate(3) + self.assertEqual(self.fs._getsize("foo2"), 3) + self.assert_bytes("foo2", b"hel") + + def test_files(self): + # Test multiple writes + + with self.fs.open("foo1", "wt") as f: + text_type(f) + repr(f) + f.write("a") + f.write("b") + f.write("c") + self.assert_text("foo1", "abc") + + # Test writelines + with self.fs.open("foo2", "wt") as f: + f.writelines(["hello\n", "world"]) + self.assert_text("foo2", "hello\nworld") + + # Test readline + with self.fs.open("foo2") as f: + self.assertEqual(f.readline(), "hello\n") + self.assertEqual(f.readline(), "world") + + # Test readlines + with self.fs.open("foo2") as f: + lines = f.readlines() + self.assertEqual(lines, ["hello\n", "world"]) + with self.fs.open("foo2") as f: + lines = list(f) + self.assertEqual(lines, ["hello\n", "world"]) + with self.fs.open("foo2") as f: + lines = [] + for line in f: + lines.append(line) + self.assertEqual(lines, ["hello\n", "world"]) + + # Test truncate + with self.fs.open("foo2", "r+") as f: + f.truncate(3) + self.assertEqual(self.fs._getsize("foo2"), 3) + self.assert_text("foo2", "hel") + + with self.fs.open("foo2", "ab") as f: + f.write(b"p") + self.assert_bytes("foo2", b"help") + + # Test __del__ doesn't throw traceback + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + f = self.fs.open("foo2", "r") + del f + + with self.assertRaises(IOError): + with self.fs.open("foo2", "r") as f: + f.write("no!") + + with self.assertRaises(IOError): + with self.fs.open("newfoo", "w") as f: + f.read(2) + + def test_copy_file(self): + # Test fs.copy + bytes_test = b"Hello, World" + self.fs.write_bytes("foo.txt", bytes_test) + self.fs.copy("foo.txt", "bar.txt") + self.assert_bytes("bar.txt", bytes_test) + + def _test_copy_dir(self, protocol="memory"): + # Test copy_dir. + + self.fs.makedirs("foo/bar/baz") + self.fs.makedir("egg") + self.fs.write_text("top.txt", "Hello, World") + self.fs.write_text("/foo/bar/baz/test.txt", "Goodbye, World") + + self.assertEqual(self.fs.ls("/"), [ + {"name": "foo", "type": "directory", "size": 0}, + {"name": "egg", "type": "directory", "size": 0}, + {"name": "top.txt", "type": "file", "size": 12}, + ]) + self.assertEqual(self.fs.ls("/foo/bar/baz"), [ + {"name": "/foo/bar/baz/test.txt", "type": "file", "size": 14}, + ]) + self.assertEqual(self.fs.ls("/", detail=False), ['foo', 'egg', 'top.txt']) + self.assertEqual(self.fs.ls("/foo", detail=False), ['bar']) + self.assertEqual(self.fs.ls("/foo/bar", detail=False), ['baz']) + self.assertEqual(self.fs.ls("/foo/bar/baz", detail=False), ['test.txt']) + + self.assertEqual(self.fs.find("/", withdirs=True), [ + # '/', + '/egg', + '/foo', + '/foo/bar', + '/foo/bar/baz', + '/foo/bar/baz/test.txt', + '/top.txt', + ]) + + expected = self.fs.find("/foo", withdirs=True) + expected = map(lambda p: p[4:], expected) # remove "/foo" prefix + expected = set(expected) + + self.fs.copy("/foo", "/foo2", recursive=True) + + actual = self.fs.find("/foo2", withdirs=True) + actual = map(lambda p: p[5:], actual) # remove "/foo2" prefix + actual = set(actual) + self.assertEqual(actual, expected) + + self.assert_text("top.txt", "Hello, World") + self.assert_text("/foo/bar/baz/test.txt", "Goodbye, World") + self.assert_text("/foo2/bar/baz/test.txt", "Goodbye, World") + + # Test copying a sub dir + other_fs = get_filesystem(protocol) + copy_dir(self.fs, "/foo", other_fs, "/") + self.assertEqual(list(walk_files(other_fs)), ["/bar/baz/test.txt"]) + + print("BEFORE") + self.fs.tree() + other_fs.tree() + copy_dir(self.fs, "/foo", other_fs, "/egg") + + print("FS") + self.fs.tree() + print("OTHER") + other_fs.tree() + self.assertEqual( + list(walk_files(other_fs)), + ["/bar/baz/test.txt", "/egg/bar/baz/test.txt"], + ) + + # cleanup for following tests + self.fs.rm("/foo2", recursive=True) + + def _test_copy_dir_write(self, protocol): + # Test copying to this filesystem from another. + other_fs = get_filesystem(protocol) + self.assertEqual(other_fs.ls("/", detail=False), []) # other_fs should be empty + other_fs.makedirs("foo/bar/baz") + other_fs.makedir("egg") + other_fs.write_text("top.txt", "Hello, World") + other_fs.write_text("/foo/bar/baz/test.txt", "Goodbye, World") + # fix: FileExistsError: /foo + self.fs.rm("/", recursive=True) + copy_dir(other_fs, "/", self.fs, "/") + expected = {"/egg", "/foo", "/foo/bar", "/foo/bar/baz"} + self.assertEqual(set(walk_dirs(self.fs)), expected) + self.assert_text("top.txt", "Hello, World") + self.assert_text("/foo/bar/baz/test.txt", "Goodbye, World") + + def test_copy_dir_mem(self): + # Test copy_dir with a mem fs. + self._test_copy_dir("memory") + self._test_copy_dir_write("memory") + + def test_copy_dir_temp(self): + # Test copy_dir with a temp fs. + self._test_copy_dir("tempdir") + self._test_copy_dir_write("tempdir") + + def test_move_dir_same_fs(self): + self.fs.makedirs("foo/bar/baz") + self.fs.makedir("egg") + self.fs.write_text("top.txt", "Hello, World") + self.fs.write_text("/foo/bar/baz/test.txt", "Goodbye, World") + + fs.move.move_dir(self.fs, "foo", self.fs, "foo2") + + expected = {"/egg", "/foo2", "/foo2/bar", "/foo2/bar/baz"} + self.assertEqual(set(walk_dirs(self.fs)), expected) + self.assert_text("top.txt", "Hello, World") + self.assert_text("/foo2/bar/baz/test.txt", "Goodbye, World") + + self.assertEqual(sorted(self.fs.listdir("/")), ["egg", "foo2", "top.txt"]) + self.assertEqual( + sorted(x.name for x in self.fs.scandir("/")), ["egg", "foo2", "top.txt"] + ) + + def _test_move_dir_write(self, protocol): + # Test moving to this filesystem from another. + other_fs = get_filesystem(protocol) + other_fs.makedirs("foo/bar/baz") + other_fs.makedir("egg") + other_fs.write_text("top.txt", "Hello, World") + other_fs.write_text("/foo/bar/baz/test.txt", "Goodbye, World") + + fs.move.move_dir(other_fs, "/", self.fs, "/") + + expected = {"/egg", "/foo", "/foo/bar", "/foo/bar/baz"} + self.assertEqual(other_fs.listdir("/"), []) + self.assertEqual(set(walk_dirs(self.fs)), expected) + self.assert_text("top.txt", "Hello, World") + self.assert_text("/foo/bar/baz/test.txt", "Goodbye, World") + + def test_move_dir_mem(self): + self._test_move_dir_write("memory") + + def test_move_dir_temp(self): + self._test_move_dir_write("tempdir") + + def test_move_file_same_fs(self): + text = "Hello, World" + self.fs.makedir("foo").write_text("test.txt", text) + self.assert_text("foo/test.txt", text) + + fs.move.move_file(self.fs, "foo/test.txt", self.fs, "foo/test2.txt") + self.assert_not_exists("foo/test.txt") + self.assert_text("foo/test2.txt", text) + + self.assertEqual(self.fs.listdir("foo"), ["test2.txt"]) + self.assertEqual(next(self.fs.scandir("foo")).name, "test2.txt") + + def _test_move_file(self, protocol): + other_fs = get_filesystem(protocol) + + text = "Hello, World" + self.fs.makedir("foo").write_text("test.txt", text) + self.assert_text("foo/test.txt", text) + + with self.assertRaises(FileNotFoundError): + fs.move.move_file(self.fs, "foo/test.txt", other_fs, "foo/test2.txt") + + other_fs.makedir("foo") + + fs.move.move_file(self.fs, "foo/test.txt", other_fs, "foo/test2.txt") + + self.assertEqual(other_fs.readtext("foo/test2.txt"), text) + + def test_move_file_mem(self): + self._test_move_file("memory") + + def test_move_file_temp(self): + self._test_move_file("tempdir") + + # def test_move_file_onto_itself(self): + # self.fs.write_text("file.txt", "Hello") + # self.fs.move("file.txt", "file.txt", overwrite=True) + # self.assert_text("file.txt", "Hello") + + # with self.assertRaises(errors.DestinationExists): + # self.fs.move("file.txt", "file.txt", overwrite=False) + + # def test_move_file_onto_itself_relpath(self): + # subdir = self.fs.makedir("sub") + # subdir.write_text("file.txt", "Hello") + # self.fs.move("sub/file.txt", "sub/../sub/file.txt", overwrite=True) + # self.assert_text("sub/file.txt", "Hello") + + # with self.assertRaises(errors.DestinationExists): + # self.fs.move("sub/file.txt", "sub/../sub/file.txt", overwrite=False) + + # def test_copy_file_onto_itself(self): + # self.fs.write_text("file.txt", "Hello") + # with self.assertRaises(errors.IllegalDestination): + # self.fs.copy("file.txt", "file.txt", overwrite=True) + # with self.assertRaises(errors.DestinationExists): + # self.fs.copy("file.txt", "file.txt", overwrite=False) + # self.assert_text("file.txt", "Hello") + + # def test_copy_file_onto_itself_relpath(self): + # subdir = self.fs.makedir("sub") + # subdir.write_text("file.txt", "Hello") + # with self.assertRaises(errors.IllegalDestination): + # self.fs.copy("sub/file.txt", "sub/../sub/file.txt", overwrite=True) + # with self.assertRaises(errors.DestinationExists): + # self.fs.copy("sub/file.txt", "sub/../sub/file.txt", overwrite=False) + # self.assert_text("sub/file.txt", "Hello") + + def test_tree(self): + self.fs.makedirs("foo/bar") + self.fs._create("test.txt") + write_tree = io.StringIO() + self.fs.tree(file=write_tree) + written = write_tree.getvalue() + expected = "|-- foo\n| `-- bar\n`-- test.txt\n" + self.assertEqual(expected, written) + + def test_unicode_path(self): + if not self.fs.getmeta().get("unicode_paths", False): + raise unittest.SkipTest("the filesystem does not support unicode paths.") + + self.fs.makedir("földér") + self.fs.write_text("☭.txt", "Smells like communism.") + self.fs.write_bytes("földér/☣.txt", b"Smells like an old syringe.") + + self.assert_isdir("földér") + self.assertEqual(["☣.txt"], self.fs.listdir("földér")) + self.assertEqual("☣.txt", self.fs.getinfo("földér/☣.txt").name) + self.assert_text("☭.txt", "Smells like communism.") + self.assert_bytes("földér/☣.txt", b"Smells like an old syringe.") + + if self.fs.hassyspath("földér/☣.txt"): + self.assertTrue(os.path.exists(self.fs.getsyspath("földér/☣.txt"))) + + self.fs.rm("földér/☣.txt") + self.assert_not_exists("földér/☣.txt") + self.fs.removedir("földér") + self.assert_not_exists("földér") + + def test_case_sensitive(self): + meta = self.fs.getmeta() + if "case_insensitive" not in meta: + raise unittest.SkipTest("case sensitivity not known") + + if meta.get("case_insensitive", False): + raise unittest.SkipTest("the filesystem is not case sensitive.") + + self.fs.makedir("foo") + self.fs.makedir("Foo") + self.fs.touch("fOO") + + self.assert_exists("foo") + self.assert_exists("Foo") + self.assert_exists("fOO") + self.assert_not_exists("FoO") + + self.assert_isdir("foo") + self.assert_isdir("Foo") + self.assert_isfile("fOO") + + # FIXME from fs import glob + # def test_glob(self): + # self.assertIsInstance(self.fs.glob, glob.BoundGlobber) diff --git a/tests/test_PyFatFS.py b/tests/test_PyFatFS.py index e863a77..b645f6a 100644 --- a/tests/test_PyFatFS.py +++ b/tests/test_PyFatFS.py @@ -7,13 +7,12 @@ from unittest import TestCase, mock from io import BytesIO -import fs.errors -from fs.test import FSTestCases from pyfatfs import PyFATException from pyfatfs.FATDirectoryEntry import FATDirectoryEntry from pyfatfs.PyFat import PyFat from pyfatfs.PyFatFS import PyFatBytesIOFS +from fs_test_cases import FSTestCases def _make_fs(fat_type: int, **kwargs) -> (PyFatBytesIOFS, BytesIO): @@ -32,6 +31,8 @@ def _make_fs(fat_type: int, **kwargs) -> (PyFatBytesIOFS, BytesIO): size=part_sz) pf.flush_fat() + # fix: UserWarning: Filesystem was not cleanly unmounted on last access. + pf._mark_clean() in_memory_fs.seek(0) in_memory_fs = BytesIO(in_memory_fs.read()) return (PyFatBytesIOFS(in_memory_fs, @@ -62,7 +63,7 @@ def test_write_lock(self): """Verify concurrent writes to files are processed sequentially.""" from threading import Thread threads = [] - self.fs.create("/WRITE.TXT") + self.fs.touch("/WRITE.TXT") def write_to_file(_f, _i): _f.write(str(_i) * 10 + "\n") @@ -75,9 +76,8 @@ def write_to_file(_f, _i): for t in threads: t.join() - f.close() - read_text = self.fs.readtext("/WRITE.TXT") + read_text = self.fs.read_text("/WRITE.TXT") for i in range(0, 10): self.assertIn(str(i) * 10 + "\n", read_text) @@ -85,10 +85,11 @@ def test_append_lock(self): """Verify concurrent appends to files are processed sequentially.""" from threading import Thread threads = [] - self.fs.create("/APPEND.TXT") + self.fs.touch("/APPEND.TXT") def append_to_file(_fs, _i): - _fs.appendtext("/APPEND.TXT", str(_i) * 10 + "\n") + with _fs.open("/APPEND.TXT", "a") as f: + f.write(str(_i) * 10 + "\n") for i in range(0, 10): t = Thread(target=append_to_file, args=(self.fs, i)) @@ -98,7 +99,7 @@ def append_to_file(_fs, _i): for t in threads: t.join() - read_text = self.fs.readtext("/APPEND.TXT") + read_text = self.fs.read_text("/APPEND.TXT") for i in range(0, 10): self.assertIn(str(i) * 10 + "\n", read_text) @@ -109,7 +110,7 @@ def test_fs_lock(self): def create_dentries(fs, i): for n in range(0, 50): - fs.makedirs(f"/root/{n}DIR", recreate=True) + fs.makedirs(f"/root/{n}DIR", exist_ok=True) fs.touch(f"/root/{n}.dat") fs.touch(f"/root/{n}DIR/{n}.dat") fs.touch(f"/root/{i}.txt") @@ -122,6 +123,9 @@ def create_dentries(fs, i): for t in threads: t.join() + + # fix: UserWarning: Filesystem was not cleanly unmounted on last access. + fs.fs._mark_clean() in_memory_fs.seek(0) fs = PyFatBytesIOFS(BytesIO(in_memory_fs.read()), encoding='UTF-8', lazy_load=True) @@ -133,9 +137,10 @@ def create_dentries(fs, i): expected_dentries_sub.append(f"{i}DIR/{i}.dat") for i in range(0, 10): expected_dentries_root.append(f"{i}.txt") - assert fs.listdir("/root").sort() == expected_dentries_root.sort() + self.assertEqual(fs.ls("/", detail=False), ["root"]) + assert fs.listdir("/root", detail=False).sort() == expected_dentries_root.sort() for i in range(0, 10): - self.assertEqual(fs.listdir(f"/root/{i}DIR").sort(), + self.assertEqual(fs.listdir(f"/root/{i}DIR", detail=False).sort(), expected_dentries_sub.sort()) def test_lazy_load_dentry_parent_update(self): @@ -150,6 +155,8 @@ def test_lazy_load_dentry_parent_update(self): assert foo_dentry.get_full_path() == "foo" assert foobar_dentry.get_full_path() == "foo/bar" + # fix: UserWarning: Filesystem was not cleanly unmounted on last access. + fs.fs._mark_clean() in_memory_fs.seek(0) fs = PyFatBytesIOFS(BytesIO(in_memory_fs.read()), encoding='UTF-8', lazy_load=True) @@ -171,14 +178,16 @@ def test_update_dentry_no_repopulate(self): fs, in_memory_fs = _make_fs(self.FAT_TYPE, lazy_load=True) fs.makedirs("/foo") fs.touch("/foo/bar") - assert fs.listdir("/foo") == ['bar'] + assert fs.listdir("/foo", detail=False) == ['bar'] + # fix: UserWarning: Filesystem was not cleanly unmounted on last access. + fs.fs._mark_clean() in_memory_fs.seek(0) fs = PyFatBytesIOFS(BytesIO(in_memory_fs.read()), encoding='UTF-8', lazy_load=True) fs.touch("/foo/baz") fs.remove("/foo/bar") - assert fs.listdir("/foo") == ['baz'] + assert fs.listdir("/foo", detail=False) == ['baz'] def test_lazy_vs_nonlazy_tree(self): """Compare directory tree between lazy and non-lazy loading.""" @@ -189,31 +198,33 @@ def test_lazy_vs_nonlazy_tree(self): "/0/1/2/3/4/5/6/8/9", "/0/1/2/3/4/5/6/8/9/10/11/12/13/14/15/16"] for d in dirs: - fs1.makedirs(d, recreate=True) + fs1.makedirs(d, exist_ok=True) fs1.touch(os.path.join(d, "FILE1.TXT")) fs1.touch(os.path.join(d, "This requires an LFN entry.TxT")) fs1.touch(os.path.join(d, "FILE2.TXT")) dentries_fs1_initial = list(fs1.walk("/")) - fs1.fs.flush_fat() + # fs1.fs.flush_fat() # called in fs1.fs._mark_clean + # fix: UserWarning: Filesystem was not cleanly unmounted on last access. + fs1.fs._mark_clean() in_memory_fs.seek(0) in_memory_fs = BytesIO(in_memory_fs.read()) fs1 = PyFatBytesIOFS(in_memory_fs, encoding='UTF-8', lazy_load=False) dentries_fs1_reopen = list(fs1.walk("/")) assert dentries_fs1_initial == dentries_fs1_reopen + # fix: UserWarning: Filesystem was not cleanly unmounted on last access. + fs1.fs._mark_clean() in_memory_fs.seek(0) fs2 = PyFatBytesIOFS(BytesIO(in_memory_fs.read()), encoding='UTF-8', lazy_load=True) assert dentries_fs1_reopen == list(fs2.walk("/")) - fs1.close() - fs2.close() def test_write_file_e2big(self): """Verify that files that are too big cannot be written.""" - self.fs.create("/BIGBOI.TXT") + self.fs.touch("/BIGBOI.TXT") old_fat = self.fs.fs.fat.copy() - f = self.fs.openbin("/BIGBOI.TXT", "wb") + f = self.fs.open("/BIGBOI.TXT", "wb") mock_bytes = mock.MagicMock() mock_bytes.__len__.return_value = FATDirectoryEntry.MAX_FILE_SIZE+1 with self.assertRaises(PyFATException) as e: @@ -223,9 +234,9 @@ def test_write_file_e2big(self): def test_write_file_enospc(self): """Verify that files larger than free space cannot be written.""" - self.fs.create("/BIGBOI.TXT") + self.fs.touch("/BIGBOI.TXT") old_fat = self.fs.fs.fat.copy() - f = self.fs.openbin("/BIGBOI.TXT", "wb") + f = self.fs.open("/BIGBOI.TXT", "wb") mock_bytes = mock.MagicMock() mock_bytes.__len__.return_value = FATDirectoryEntry.MAX_FILE_SIZE with self.assertRaises(PyFATException) as e: @@ -235,9 +246,9 @@ def test_write_file_enospc(self): def test_truncate_file_e2big(self): """Verify that truncating past MAX_FILE_SIZE is not possible.""" - self.fs.create("/BIGBOI.TXT") + self.fs.touch("/BIGBOI.TXT") old_fat = self.fs.fs.fat.copy() - f = self.fs.openbin("/BIGBOI.TXT", "wb") + f = self.fs.open("/BIGBOI.TXT", "wb") with self.assertRaises(PyFATException) as e: f.truncate(FATDirectoryEntry.MAX_FILE_SIZE+1) self.assertEqual(errno.E2BIG, e.exception.errno) @@ -245,9 +256,9 @@ def test_truncate_file_e2big(self): def test_truncate_file_enospc(self): """Verify that truncating past available disk space is not possible.""" - self.fs.create("/BIGBOI.TXT") + self.fs.touch("/BIGBOI.TXT") old_fat = self.fs.fs.fat.copy() - f = self.fs.openbin("/BIGBOI.TXT", "wb") + f = self.fs.open("/BIGBOI.TXT", "wb") with self.assertRaises(PyFATException) as e: f.truncate(FATDirectoryEntry.MAX_FILE_SIZE) self.assertEqual(errno.ENOSPC, e.exception.errno) @@ -256,34 +267,24 @@ def test_truncate_file_enospc(self): def test_create_file_folder_dupe(self): """Verify that file creation with duplicate name to a folder fails.""" self.fs.makedir("/test") - with self.assertRaises(fs.errors.FileExpected): - self.fs.create("/test") + with self.assertRaises(FileExistsError): + self.fs._create("/test") def test_create_folder_file_dupe(self): """Verify that folder creation with duplicate name to a file fails.""" - self.fs.create("/test") - with self.assertRaises(fs.errors.DirectoryExists): - self.fs.makedir("/test", recreate=True) - - def test_create_wipe_update_mtime(self): - """Verify that file creation updates mtime on wipe.""" - self.fs.create("/test") - self.fs.settimes("/test", datetime(1999, 12, 31, 23, 59, 59, 9999), - datetime(2000, 1, 1, 0, 0, 0, 0)) - orig_info = self.fs.getinfo("/test") - self.fs.create("/test", wipe=True) - new_info = self.fs.getinfo("/test") - assert orig_info != new_info + self.fs.touch("/test") + with self.assertRaises(FileExistsError): + self.fs.makedir("/test") def test_writetest_truncates(self): """Verify that writetest() properly truncates file contents.""" fname = "/truncatetest.txt" - self.fs.create(fname) + self.fs.touch(fname) self.fs.writetext(fname, '0' * 64) - assert self.fs.readtext(fname) == '0' * 64 + assert self.fs.read_text(fname) == '0' * 64 self.fs.writetext(fname, '1' * 16) - assert len(self.fs.readtext(fname)) == 16 - assert self.fs.readtext(fname) == '1' * 16 + assert len(self.fs.read_text(fname)) == 16 + assert self.fs.read_text(fname) == '1' * 16 class TestPyFatFS32(TestPyFatFS16, FSTestCases, TestCase, PyFsCompatLayer): diff --git a/tests/test_PyFatFSOpener.py b/tests/test_PyFatFSOpener.py index b08991d..05ff01f 100644 --- a/tests/test_PyFatFSOpener.py +++ b/tests/test_PyFatFSOpener.py @@ -2,7 +2,7 @@ """Make sure the fat PyFilesystem2 protocol gets registered properly.""" -import fs +import fsspec as fs import pytest