Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 45 additions & 61 deletions fsspec/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@


def set_generic_fs(protocol, **storage_options):
"""Populate the dict used for method=="generic" lookups"""
_generic_fs[protocol] = filesystem(protocol, **storage_options)


default_method = "default"


def _resolve_fs(url, method=None, protocol=None, storage_options=None):
def _resolve_fs(url, method, protocol=None, storage_options=None):
"""Pick instance of backend FS"""
method = method or default_method
url = url[0] if isinstance(url, (list, tuple)) else url
protocol = protocol or split_protocol(url)[0]
storage_options = storage_options or {}
if method == "default":
Expand Down Expand Up @@ -159,7 +157,7 @@ class GenericFileSystem(AsyncFileSystem):

protocol = "generic" # there is no real reason to ever use a protocol with this FS

def __init__(self, default_method="default", **kwargs):
def __init__(self, default_method="default", storage_options=None, **kwargs):
"""

Parameters
Expand All @@ -171,22 +169,25 @@ def __init__(self, default_method="default", **kwargs):
configured via the config system
- "generic": takes instances from the `_generic_fs` dict in this module,
which you must populate before use. Keys are by protocol
- "options": expects storage_options, a dict mapping protocol to
kwargs to use when constructing the filesystem
- "current": takes the most recently instantiated version of each FS
"""
self.method = default_method
self.st_opts = storage_options
super().__init__(**kwargs)

def _parent(self, path):
fs = _resolve_fs(path, self.method)
fs = _resolve_fs(path, self.method, storage_options=self.st_opts)
return fs.unstrip_protocol(fs._parent(path))

def _strip_protocol(self, path):
# normalization only
fs = _resolve_fs(path, self.method)
fs = _resolve_fs(path, self.method, storage_options=self.st_opts)
return fs.unstrip_protocol(fs._strip_protocol(path))

async def _find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
fs = _resolve_fs(path, self.method)
fs = _resolve_fs(path, self.method, storage_options=self.st_opts)
if fs.async_impl:
out = await fs._find(
path, maxdepth=maxdepth, withdirs=withdirs, detail=True, **kwargs
Expand Down Expand Up @@ -251,7 +252,7 @@ async def _pipe_file(
value,
**kwargs,
):
fs = _resolve_fs(path, self.method)
fs = _resolve_fs(path, self.method, storage_options=self.st_opts)
if fs.async_impl:
return await fs._pipe_file(path, value, **kwargs)
else:
Expand All @@ -269,7 +270,7 @@ async def _rm(self, url, **kwargs):

async def _makedirs(self, path, exist_ok=False):
logger.debug("Make dir %s", path)
fs = _resolve_fs(path, self.method)
fs = _resolve_fs(path, self.method, storage_options=self.st_opts)
if fs.async_impl:
await fs._makedirs(path, exist_ok=exist_ok)
else:
Expand All @@ -288,42 +289,18 @@ async def _cp_file(
url2,
blocksize=2**20,
callback=DEFAULT_CALLBACK,
tempdir: Optional[str] = None,
**kwargs,
):
fs = _resolve_fs(url, self.method)
fs2 = _resolve_fs(url2, self.method)
if fs is fs2:
# pure remote
if fs.async_impl:
return await fs._cp_file(url, url2, **kwargs)
return await fs._copy(url, url2, **kwargs)
else:
return fs.cp_file(url, url2, **kwargs)
kw = {"blocksize": 0, "cache_type": "none"}
try:
f1 = (
await fs.open_async(url, "rb")
if hasattr(fs, "open_async")
else fs.open(url, "rb", **kw)
)
callback.set_size(await maybe_await(f1.size))
f2 = (
await fs2.open_async(url2, "wb")
if hasattr(fs2, "open_async")
else fs2.open(url2, "wb", **kw)
)
while f1.size is None or f2.tell() < f1.size:
data = await maybe_await(f1.read(blocksize))
if f1.size is None and not data:
break
await maybe_await(f2.write(data))
callback.absolute_update(f2.tell())
finally:
try:
await maybe_await(f2.close())
await maybe_await(f1.close())
except NameError:
# fail while opening f1 or f2
pass
return fs.copy(url, url2, **kwargs)
await copy_file_op(fs, [url], fs2, [url2], tempdir, 1, on_error="raise")

async def _make_many_dirs(self, urls, exist_ok=True):
fs = _resolve_fs(urls[0], self.method)
Expand All @@ -347,17 +324,22 @@ async def _copy(
tempdir: Optional[str] = None,
**kwargs,
):
# TODO: special case for one FS being local, which can use get/put
# TODO: special case for one being memFS, which can use cat/pipe
if recursive:
raise NotImplementedError
fs = _resolve_fs(path1[0], self.method)
fs2 = _resolve_fs(path2[0], self.method)
# not expanding paths atm., assume call is from rsync()
raise NotImplementedError("Please use fsspec.generic.rsync")
path1 = [path1] if isinstance(path1, str) else path1
path2 = [path2] if isinstance(path2, str) else path2

fs = _resolve_fs(path1, self.method)
fs2 = _resolve_fs(path2, self.method)

if fs is fs2:
# pure remote
if fs.async_impl:
return await fs._copy(path1, path2, **kwargs)
else:
return fs.copy(path1, path2, **kwargs)

await copy_file_op(
fs, path1, fs2, path2, tempdir, batch_size, on_error=on_error
)
Expand All @@ -377,31 +359,33 @@ async def copy_file_op(
fs2,
u2,
os.path.join(tempdir, uuid.uuid4().hex),
on_error=on_error,
)
for u1, u2 in zip(url1, url2)
]
await _run_coros_in_chunks(coros, batch_size=batch_size)
out = await _run_coros_in_chunks(
coros, batch_size=batch_size, return_exceptions=True
)
finally:
shutil.rmtree(tempdir)
if on_error == "return":
return out
elif on_error == "raise":
for o in out:
if isinstance(o, Exception):
raise o


async def _copy_file_op(fs1, url1, fs2, url2, local, on_error="ignore"):
ex = () if on_error == "raise" else Exception
logger.debug("Copy %s -> %s", url1, url2)
try:
if fs1.async_impl:
await fs1._get_file(url1, local)
else:
fs1.get_file(url1, local)
if fs2.async_impl:
await fs2._put_file(local, url2)
else:
fs2.put_file(local, url2)
os.unlink(local)
logger.debug("Copy %s -> %s; done", url1, url2)
except ex as e:
logger.debug("ignoring cp exception for %s: %s", url1, e)
if fs1.async_impl:
await fs1._get_file(url1, local)
else:
fs1.get_file(url1, local)
if fs2.async_impl:
await fs2._put_file(local, url2)
else:
fs2.put_file(local, url2)
os.unlink(local)
logger.debug("Copy %s -> %s; done", url1, url2)


async def maybe_await(cor):
Expand Down
28 changes: 28 additions & 0 deletions fsspec/tests/test_generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,34 @@ def test_cat_async(server):
assert fs.cat(server.realfile) == data


def test_cp_one(server, tmpdir):
fsspec.filesystem("http", headers={"give_length": "true", "head_ok": "true"})
local = fsspec.filesystem("file")
fn = f"file://{tmpdir}/afile"

fs = fsspec.filesystem("generic", default_method="current")

fs.copy([server.realfile], [fn])
assert local.cat(fn) == data
fs.rm(fn)
assert not fs.exists(fn)

fs.copy(server.realfile, fn)
assert local.cat(fn) == data
fs.rm(fn)
assert not fs.exists(fn)

fs.cp([server.realfile], [fn])
assert local.cat(fn) == data
fs.rm(fn)
assert not fs.exists(fn)

fs.cp_file(server.realfile, fn)
assert local.cat(fn) == data
fs.rm(fn)
assert not fs.exists(fn)


def test_rsync(tmpdir, m):
from fsspec.generic import GenericFileSystem, rsync

Expand Down
Loading