Skip to content

Commit 305f80a

Browse files
authored
Merge branch 'fsspec:master' into master
2 parents 6ac55a7 + bbe0591 commit 305f80a

File tree

12 files changed

+151
-52
lines changed

12 files changed

+151
-52
lines changed

ci/environment-downstream.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ dependencies:
55
- python=3.11
66
- pip:
77
- git+https://github.com/dask/dask
8+
- git+https://github.com/dask/dask-expr

fsspec/asyn.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ async def _copy(
408408
continue
409409
raise ex
410410

411-
async def _pipe_file(self, path, value, **kwargs):
411+
async def _pipe_file(self, path, value, mode="overwrite", **kwargs):
412412
raise NotImplementedError
413413

414414
async def _pipe(self, path, value=None, batch_size=None, **kwargs):
@@ -517,7 +517,7 @@ async def _cat_ranges(
517517
coros, batch_size=batch_size, nofiles=True, return_exceptions=True
518518
)
519519

520-
async def _put_file(self, lpath, rpath, **kwargs):
520+
async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
521521
raise NotImplementedError
522522

523523
async def _put(

fsspec/implementations/http.py

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,12 @@ async def _put_file(
273273
chunk_size=5 * 2**20,
274274
callback=DEFAULT_CALLBACK,
275275
method="post",
276+
mode="overwrite",
276277
**kwargs,
277278
):
279+
if mode != "overwrite":
280+
raise NotImplementedError("Exclusive write")
281+
278282
async def gen_chunks():
279283
# Support passing arbitrary file-like objects
280284
# and use them instead of streams.
@@ -692,25 +696,6 @@ async def async_fetch_range(self, start, end):
692696

693697
_fetch_range = sync_wrapper(async_fetch_range)
694698

695-
def __reduce__(self):
696-
return (
697-
reopen,
698-
(
699-
self.fs,
700-
self.url,
701-
self.mode,
702-
self.blocksize,
703-
self.cache.name if self.cache else "none",
704-
self.size,
705-
),
706-
)
707-
708-
709-
def reopen(fs, url, mode, blocksize, cache_type, size=None):
710-
return fs.open(
711-
url, mode=mode, block_size=blocksize, cache_type=cache_type, size=size
712-
)
713-
714699

715700
magic_check = re.compile("([*[])")
716701

@@ -760,9 +745,6 @@ def close(self):
760745
asyncio.run_coroutine_threadsafe(self._close(), self.loop)
761746
super().close()
762747

763-
def __reduce__(self):
764-
return reopen, (self.fs, self.url, self.mode, self.blocksize, self.cache.name)
765-
766748

767749
class AsyncStreamFile(AbstractAsyncStreamedFile):
768750
def __init__(

fsspec/implementations/memory.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,13 @@ def makedirs(self, path, exist_ok=False):
126126
if not exist_ok:
127127
raise
128128

129-
def pipe_file(self, path, value, **kwargs):
129+
def pipe_file(self, path, value, mode="overwrite", **kwargs):
130130
"""Set the bytes of given file
131131
132132
Avoids copies of the data if possible
133133
"""
134-
self.open(path, "wb", data=value)
134+
mode = "xb" if mode == "create" else "wb"
135+
self.open(path, mode=mode, data=value)
135136

136137
def rmdir(self, path):
137138
path = self._strip_protocol(path)
@@ -178,6 +179,8 @@ def _open(
178179
**kwargs,
179180
):
180181
path = self._strip_protocol(path)
182+
if "x" in mode and self.exists(path):
183+
raise FileExistsError
181184
if path in self.pseudo_dirs:
182185
raise IsADirectoryError(path)
183186
parent = path
@@ -197,7 +200,9 @@ def _open(
197200
return f
198201
else:
199202
raise FileNotFoundError(path)
200-
elif mode == "wb":
203+
elif mode in {"wb", "xb"}:
204+
if mode == "xb" and self.exists(path):
205+
raise FileExistsError
201206
m = MemoryFile(self, path, kwargs.get("data"))
202207
if not self._intrans:
203208
m.commit()

fsspec/implementations/reference.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1181,13 +1181,17 @@ async def _rm_file(self, path, **kwargs):
11811181
) # ignores FileNotFound, just as well for directories
11821182
self.dircache.clear() # this is a bit heavy handed
11831183

1184-
async def _pipe_file(self, path, data):
1184+
async def _pipe_file(self, path, data, mode="overwrite", **kwargs):
1185+
if mode == "create" and self.exists(path):
1186+
raise FileExistsError
11851187
# can be str or bytes
11861188
self.references[path] = data
11871189
self.dircache.clear() # this is a bit heavy handed
11881190

1189-
async def _put_file(self, lpath, rpath, **kwargs):
1191+
async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
11901192
# puts binary
1193+
if mode == "create" and self.exists(rpath):
1194+
raise FileExistsError
11911195
with open(lpath, "rb") as f:
11921196
self.references[rpath] = f.read()
11931197
self.dircache.clear() # this is a bit heavy handed

fsspec/implementations/tests/memory/memory_test.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,11 @@ class TestMemoryGet(abstract.AbstractGetTests, MemoryFixtures):
1212

1313
class TestMemoryPut(abstract.AbstractPutTests, MemoryFixtures):
1414
pass
15+
16+
17+
class TestMemoryPipe(abstract.AbstractPipeTests, MemoryFixtures):
18+
pass
19+
20+
21+
class TestMemoryOpen(abstract.AbstractOpenTests, MemoryFixtures):
22+
pass

fsspec/implementations/tests/test_smb.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414

1515
pytest.importorskip("smbprotocol")
1616

17+
18+
def delay_rerun(*args):
19+
time.sleep(0.1)
20+
return True
21+
22+
1723
# ruff: noqa: F821
1824

1925
if os.environ.get("WSL_INTEROP"):
@@ -72,7 +78,7 @@ def smb_params(request):
7278
stop_docker(container)
7379

7480

75-
@pytest.mark.flaky(reruns=2, reruns_delay=2)
81+
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
7682
def test_simple(smb_params):
7783
adir = "/home/adir"
7884
adir2 = "/home/adir/otherdir/"
@@ -89,7 +95,7 @@ def test_simple(smb_params):
8995
assert not fsmb.exists(adir)
9096

9197

92-
@pytest.mark.flaky(reruns=2, reruns_delay=2)
98+
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
9399
def test_auto_mkdir(smb_params):
94100
adir = "/home/adir"
95101
adir2 = "/home/adir/otherdir/"
@@ -116,7 +122,7 @@ def test_auto_mkdir(smb_params):
116122
assert not fsmb.exists(another_dir)
117123

118124

119-
@pytest.mark.flaky(reruns=2, reruns_delay=2)
125+
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
120126
def test_with_url(smb_params):
121127
if smb_params["port"] is None:
122128
smb_url = "smb://{username}:{password}@{host}/home/someuser.txt"
@@ -131,7 +137,7 @@ def test_with_url(smb_params):
131137
assert read_result == b"hello"
132138

133139

134-
@pytest.mark.flaky(reruns=2, reruns_delay=2)
140+
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
135141
def test_transaction(smb_params):
136142
afile = "/home/afolder/otherdir/afile"
137143
afile2 = "/home/afolder/otherdir/afile2"
@@ -152,14 +158,14 @@ def test_transaction(smb_params):
152158
assert fsmb.find(adir) == [afile, afile2]
153159

154160

155-
@pytest.mark.flaky(reruns=2, reruns_delay=2)
161+
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
156162
def test_makedirs_exist_ok(smb_params):
157163
fsmb = fsspec.get_filesystem_class("smb")(**smb_params)
158164
fsmb.makedirs("/home/a/b/c")
159165
fsmb.makedirs("/home/a/b/c", exist_ok=True)
160166

161167

162-
@pytest.mark.flaky(reruns=2, reruns_delay=2)
168+
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
163169
def test_rename_from_upath(smb_params):
164170
fsmb = fsspec.get_filesystem_class("smb")(**smb_params)
165171
fsmb.makedirs("/home/a/b/c", exist_ok=True)

fsspec/spec.py

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -778,8 +778,12 @@ def cat_file(self, path, start=None, end=None, **kwargs):
778778
return f.read(end - f.tell())
779779
return f.read()
780780

781-
def pipe_file(self, path, value, **kwargs):
781+
def pipe_file(self, path, value, mode="overwrite", **kwargs):
782782
"""Set the bytes of given file"""
783+
if mode == "create" and self.exists(path):
784+
# non-atomic but simple way; or could use "xb" in open(), which is likely
785+
# not as well supported
786+
raise FileExistsError
783787
with self.open(path, "wb", **kwargs) as f:
784788
f.write(value)
785789

@@ -971,8 +975,12 @@ def get(
971975
with callback.branched(rpath, lpath) as child:
972976
self.get_file(rpath, lpath, callback=child, **kwargs)
973977

974-
def put_file(self, lpath, rpath, callback=DEFAULT_CALLBACK, **kwargs):
978+
def put_file(
979+
self, lpath, rpath, callback=DEFAULT_CALLBACK, mode="overwrite", **kwargs
980+
):
975981
"""Copy single file to remote"""
982+
if mode == "create" and self.exists(rpath):
983+
raise FileExistsError
976984
if os.path.isdir(lpath):
977985
self.makedirs(rpath, exist_ok=True)
978986
return None
@@ -1262,6 +1270,9 @@ def open(
12621270
Target file
12631271
mode: str like 'rb', 'w'
12641272
See builtin ``open()``
1273+
Mode "x" (exclusive write) may be implemented by the backend. Even if
1274+
it is, whether it is checked up front or on commit, and whether it is
1275+
atomic is implementation-dependent.
12651276
block_size: int
12661277
Some indication of buffering - this is a value in bytes
12671278
cache_options : dict, optional
@@ -1911,7 +1922,7 @@ def discard(self):
19111922

19121923
def info(self):
19131924
"""File information about this path"""
1914-
if "r" in self.mode:
1925+
if self.readable():
19151926
return self.details
19161927
else:
19171928
raise ValueError("Info not available while writing")
@@ -1958,7 +1969,7 @@ def write(self, data):
19581969
data: bytes
19591970
Set of bytes to be written.
19601971
"""
1961-
if self.mode not in {"wb", "ab"}:
1972+
if not self.writable():
19621973
raise ValueError("File not in write mode")
19631974
if self.closed:
19641975
raise ValueError("I/O operation on closed file.")
@@ -1991,7 +2002,7 @@ def flush(self, force=False):
19912002
if force:
19922003
self.forced = True
19932004

1994-
if self.mode not in {"wb", "ab"}:
2005+
if self.readable():
19952006
# no-op to flush on read-mode
19962007
return
19972008

@@ -2140,29 +2151,46 @@ def close(self):
21402151
return
21412152
if self.closed:
21422153
return
2143-
if self.mode == "rb":
2144-
self.cache = None
2145-
else:
2146-
if not self.forced:
2147-
self.flush(force=True)
2148-
2149-
if self.fs is not None:
2150-
self.fs.invalidate_cache(self.path)
2151-
self.fs.invalidate_cache(self.fs._parent(self.path))
2154+
try:
2155+
if self.mode == "rb":
2156+
self.cache = None
2157+
else:
2158+
if not self.forced:
2159+
self.flush(force=True)
21522160

2153-
self.closed = True
2161+
if self.fs is not None:
2162+
self.fs.invalidate_cache(self.path)
2163+
self.fs.invalidate_cache(self.fs._parent(self.path))
2164+
finally:
2165+
self.closed = True
21542166

21552167
def readable(self):
21562168
"""Whether opened for reading"""
2157-
return self.mode == "rb" and not self.closed
2169+
return "r" in self.mode and not self.closed
21582170

21592171
def seekable(self):
21602172
"""Whether is seekable (only in read mode)"""
21612173
return self.readable()
21622174

21632175
def writable(self):
21642176
"""Whether opened for writing"""
2165-
return self.mode in {"wb", "ab"} and not self.closed
2177+
return self.mode in {"wb", "ab", "xb"} and not self.closed
2178+
2179+
def __reduce__(self):
2180+
if self.mode != "rb":
2181+
raise RuntimeError("Pickling a writeable file is not supported")
2182+
2183+
return reopen, (
2184+
self.fs,
2185+
self.path,
2186+
self.mode,
2187+
self.blocksize,
2188+
self.loc,
2189+
self.size,
2190+
self.autocommit,
2191+
self.cache.name if self.cache else "none",
2192+
self.kwargs,
2193+
)
21662194

21672195
def __del__(self):
21682196
if not self.closed:
@@ -2178,3 +2206,18 @@ def __enter__(self):
21782206

21792207
def __exit__(self, *args):
21802208
self.close()
2209+
2210+
2211+
def reopen(fs, path, mode, blocksize, loc, size, autocommit, cache_type, kwargs):
2212+
file = fs.open(
2213+
path,
2214+
mode=mode,
2215+
block_size=blocksize,
2216+
autocommit=autocommit,
2217+
cache_type=cache_type,
2218+
size=size,
2219+
**kwargs,
2220+
)
2221+
if loc > 0:
2222+
file.seek(loc)
2223+
return file

fsspec/tests/abstract/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from fsspec.implementations.local import LocalFileSystem
77
from fsspec.tests.abstract.copy import AbstractCopyTests # noqa: F401
88
from fsspec.tests.abstract.get import AbstractGetTests # noqa: F401
9+
from fsspec.tests.abstract.open import AbstractOpenTests # noqa: F401
10+
from fsspec.tests.abstract.pipe import AbstractPipeTests # noqa: F401
911
from fsspec.tests.abstract.put import AbstractPutTests # noqa: F401
1012

1113

fsspec/tests/abstract/open.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import pytest
2+
3+
4+
class AbstractOpenTests:
5+
def test_open_exclusive(self, fs, fs_target):
6+
with fs.open(fs_target, "wb") as f:
7+
f.write(b"data")
8+
with fs.open(fs_target, "rb") as f:
9+
assert f.read() == b"data"
10+
with pytest.raises(FileExistsError):
11+
fs.open(fs_target, "xb")

0 commit comments

Comments
 (0)