Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/environment-downstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ dependencies:
- python=3.11
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/dask/dask-expr
4 changes: 2 additions & 2 deletions fsspec/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ async def _copy(
continue
raise ex

async def _pipe_file(self, path, value, **kwargs):
async def _pipe_file(self, path, value, mode="overwrite", **kwargs):
raise NotImplementedError

async def _pipe(self, path, value=None, batch_size=None, **kwargs):
Expand Down Expand Up @@ -517,7 +517,7 @@ async def _cat_ranges(
coros, batch_size=batch_size, nofiles=True, return_exceptions=True
)

async def _put_file(self, lpath, rpath, **kwargs):
async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
raise NotImplementedError

async def _put(
Expand Down
4 changes: 4 additions & 0 deletions fsspec/implementations/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,12 @@ async def _put_file(
chunk_size=5 * 2**20,
callback=DEFAULT_CALLBACK,
method="post",
mode="overwrite",
**kwargs,
):
if mode != "overwrite":
raise NotImplementedError("Exclusive write")

async def gen_chunks():
# Support passing arbitrary file-like objects
# and use them instead of streams.
Expand Down
11 changes: 8 additions & 3 deletions fsspec/implementations/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,13 @@ def makedirs(self, path, exist_ok=False):
if not exist_ok:
raise

def pipe_file(self, path, value, **kwargs):
def pipe_file(self, path, value, mode="overwrite", **kwargs):
"""Set the bytes of given file

Avoids copies of the data if possible
"""
self.open(path, "wb", data=value)
mode = "xb" if mode == "create" else "wb"
self.open(path, mode=mode, data=value)

def rmdir(self, path):
path = self._strip_protocol(path)
Expand Down Expand Up @@ -178,6 +179,8 @@ def _open(
**kwargs,
):
path = self._strip_protocol(path)
if "x" in mode and self.exists(path):
raise FileExistsError
if path in self.pseudo_dirs:
raise IsADirectoryError(path)
parent = path
Expand All @@ -197,7 +200,9 @@ def _open(
return f
else:
raise FileNotFoundError(path)
elif mode == "wb":
elif mode in {"wb", "xb"}:
if mode == "xb" and self.exists(path):
raise FileExistsError
m = MemoryFile(self, path, kwargs.get("data"))
if not self._intrans:
m.commit()
Expand Down
8 changes: 6 additions & 2 deletions fsspec/implementations/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -1181,13 +1181,17 @@ async def _rm_file(self, path, **kwargs):
) # ignores FileNotFound, just as well for directories
self.dircache.clear() # this is a bit heavy handed

async def _pipe_file(self, path, data):
async def _pipe_file(self, path, data, mode="overwrite", **kwargs):
if mode == "create" and self.exists(path):
raise FileExistsError
# can be str or bytes
self.references[path] = data
self.dircache.clear() # this is a bit heavy handed

async def _put_file(self, lpath, rpath, **kwargs):
async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
# puts binary
if mode == "create" and self.exists(rpath):
raise FileExistsError
with open(lpath, "rb") as f:
self.references[rpath] = f.read()
self.dircache.clear() # this is a bit heavy handed
Expand Down
8 changes: 8 additions & 0 deletions fsspec/implementations/tests/memory/memory_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,11 @@ class TestMemoryGet(abstract.AbstractGetTests, MemoryFixtures):

class TestMemoryPut(abstract.AbstractPutTests, MemoryFixtures):
pass


class TestMemoryPipe(abstract.AbstractPipeTests, MemoryFixtures):
pass


class TestMemoryOpen(abstract.AbstractOpenTests, MemoryFixtures):
pass
18 changes: 12 additions & 6 deletions fsspec/implementations/tests/test_smb.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@

pytest.importorskip("smbprotocol")


def delay_rerun(*args):
time.sleep(0.1)
return True


# ruff: noqa: F821

if os.environ.get("WSL_INTEROP"):
Expand Down Expand Up @@ -72,7 +78,7 @@ def smb_params(request):
stop_docker(container)


@pytest.mark.flaky(reruns=2, reruns_delay=2)
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
def test_simple(smb_params):
adir = "/home/adir"
adir2 = "/home/adir/otherdir/"
Expand All @@ -89,7 +95,7 @@ def test_simple(smb_params):
assert not fsmb.exists(adir)


@pytest.mark.flaky(reruns=2, reruns_delay=2)
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
def test_auto_mkdir(smb_params):
adir = "/home/adir"
adir2 = "/home/adir/otherdir/"
Expand All @@ -116,7 +122,7 @@ def test_auto_mkdir(smb_params):
assert not fsmb.exists(another_dir)


@pytest.mark.flaky(reruns=2, reruns_delay=2)
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
def test_with_url(smb_params):
if smb_params["port"] is None:
smb_url = "smb://{username}:{password}@{host}/home/someuser.txt"
Expand All @@ -131,7 +137,7 @@ def test_with_url(smb_params):
assert read_result == b"hello"


@pytest.mark.flaky(reruns=2, reruns_delay=2)
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
def test_transaction(smb_params):
afile = "/home/afolder/otherdir/afile"
afile2 = "/home/afolder/otherdir/afile2"
Expand All @@ -152,14 +158,14 @@ def test_transaction(smb_params):
assert fsmb.find(adir) == [afile, afile2]


@pytest.mark.flaky(reruns=2, reruns_delay=2)
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
def test_makedirs_exist_ok(smb_params):
fsmb = fsspec.get_filesystem_class("smb")(**smb_params)
fsmb.makedirs("/home/a/b/c")
fsmb.makedirs("/home/a/b/c", exist_ok=True)


@pytest.mark.flaky(reruns=2, reruns_delay=2)
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
def test_rename_from_upath(smb_params):
fsmb = fsspec.get_filesystem_class("smb")(**smb_params)
fsmb.makedirs("/home/a/b/c", exist_ok=True)
Expand Down
46 changes: 29 additions & 17 deletions fsspec/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,8 +780,12 @@ def cat_file(self, path, start=None, end=None, **kwargs):
return f.read(end - f.tell())
return f.read()

def pipe_file(self, path, value, **kwargs):
def pipe_file(self, path, value, mode="overwrite", **kwargs):
"""Set the bytes of given file"""
if mode == "create" and self.exists(path):
# non-atomic but simple way; or could use "xb" in open(), which is likely
# not as well supported
raise FileExistsError
with self.open(path, "wb", **kwargs) as f:
f.write(value)

Expand Down Expand Up @@ -973,8 +977,12 @@ def get(
with callback.branched(rpath, lpath) as child:
self.get_file(rpath, lpath, callback=child, **kwargs)

def put_file(self, lpath, rpath, callback=DEFAULT_CALLBACK, **kwargs):
def put_file(
self, lpath, rpath, callback=DEFAULT_CALLBACK, mode="overwrite", **kwargs
):
"""Copy single file to remote"""
if mode == "create" and self.exists(rpath):
raise FileExistsError
if os.path.isdir(lpath):
self.makedirs(rpath, exist_ok=True)
return None
Expand Down Expand Up @@ -1264,6 +1272,9 @@ def open(
Target file
mode: str like 'rb', 'w'
See builtin ``open()``
Mode "x" (exclusive write) may be implemented by the backend. Even if
it is, whether it is checked up front or on commit, and whether it is
atomic is implementation-dependent.
block_size: int
Some indication of buffering - this is a value in bytes
cache_options : dict, optional
Expand Down Expand Up @@ -1797,7 +1808,7 @@ def discard(self):

def info(self):
"""File information about this path"""
if "r" in self.mode:
if self.readable():
return self.details
else:
raise ValueError("Info not available while writing")
Expand Down Expand Up @@ -1844,7 +1855,7 @@ def write(self, data):
data: bytes
Set of bytes to be written.
"""
if self.mode not in {"wb", "ab"}:
if not self.writable():
raise ValueError("File not in write mode")
if self.closed:
raise ValueError("I/O operation on closed file.")
Expand Down Expand Up @@ -1877,7 +1888,7 @@ def flush(self, force=False):
if force:
self.forced = True

if self.mode not in {"wb", "ab"}:
if self.readable():
# no-op to flush on read-mode
return

Expand Down Expand Up @@ -2026,29 +2037,30 @@ def close(self):
return
if self.closed:
return
if self.mode == "rb":
self.cache = None
else:
if not self.forced:
self.flush(force=True)

if self.fs is not None:
self.fs.invalidate_cache(self.path)
self.fs.invalidate_cache(self.fs._parent(self.path))
try:
if self.mode == "rb":
self.cache = None
else:
if not self.forced:
self.flush(force=True)

self.closed = True
if self.fs is not None:
self.fs.invalidate_cache(self.path)
self.fs.invalidate_cache(self.fs._parent(self.path))
finally:
self.closed = True

def readable(self):
"""Whether opened for reading"""
return self.mode == "rb" and not self.closed
return "r" in self.mode and not self.closed

def seekable(self):
"""Whether is seekable (only in read mode)"""
return self.readable()

def writable(self):
"""Whether opened for writing"""
return self.mode in {"wb", "ab"} and not self.closed
return self.mode in {"wb", "ab", "xb"} and not self.closed

def __del__(self):
if not self.closed:
Expand Down
2 changes: 2 additions & 0 deletions fsspec/tests/abstract/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from fsspec.implementations.local import LocalFileSystem
from fsspec.tests.abstract.copy import AbstractCopyTests # noqa: F401
from fsspec.tests.abstract.get import AbstractGetTests # noqa: F401
from fsspec.tests.abstract.open import AbstractOpenTests # noqa: F401
from fsspec.tests.abstract.pipe import AbstractPipeTests # noqa: F401
from fsspec.tests.abstract.put import AbstractPutTests # noqa: F401


Expand Down
11 changes: 11 additions & 0 deletions fsspec/tests/abstract/open.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import pytest


class AbstractOpenTests:
def test_open_exclusive(self, fs, fs_target):
with fs.open(fs_target, "wb") as f:
f.write(b"data")
with fs.open(fs_target, "rb") as f:
assert f.read() == b"data"
with pytest.raises(FileExistsError):
fs.open(fs_target, "xb")
11 changes: 11 additions & 0 deletions fsspec/tests/abstract/pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import pytest


class AbstractPipeTests:
def test_pipe_exclusive(self, fs, fs_target):
fs.pipe_file(fs_target, b"data")
assert fs.cat_file(fs_target) == b"data"
with pytest.raises(FileExistsError):
fs.pipe_file(fs_target, b"data", mode="create")
fs.pipe_file(fs_target, b"new data", mode="overwrite")
assert fs.cat_file(fs_target) == b"new data"
Loading