diff --git a/fsspec/generic.py b/fsspec/generic.py index 9bad0f048..af3677ec2 100644 --- a/fsspec/generic.py +++ b/fsspec/generic.py @@ -16,15 +16,13 @@ def set_generic_fs(protocol, **storage_options): + """Populate the dict used for method=="generic" lookups""" _generic_fs[protocol] = filesystem(protocol, **storage_options) -default_method = "default" - - -def _resolve_fs(url, method=None, protocol=None, storage_options=None): +def _resolve_fs(url, method, protocol=None, storage_options=None): """Pick instance of backend FS""" - method = method or default_method + url = url[0] if isinstance(url, (list, tuple)) else url protocol = protocol or split_protocol(url)[0] storage_options = storage_options or {} if method == "default": @@ -159,7 +157,7 @@ class GenericFileSystem(AsyncFileSystem): protocol = "generic" # there is no real reason to ever use a protocol with this FS - def __init__(self, default_method="default", **kwargs): + def __init__(self, default_method="default", storage_options=None, **kwargs): """ Parameters @@ -171,22 +169,25 @@ def __init__(self, default_method="default", **kwargs): configured via the config system - "generic": takes instances from the `_generic_fs` dict in this module, which you must populate before use. Keys are by protocol + - "options": expects storage_options, a dict mapping protocol to + kwargs to use when constructing the filesystem - "current": takes the most recently instantiated version of each FS """ self.method = default_method + self.st_opts = storage_options super().__init__(**kwargs) def _parent(self, path): - fs = _resolve_fs(path, self.method) + fs = _resolve_fs(path, self.method, storage_options=self.st_opts) return fs.unstrip_protocol(fs._parent(path)) def _strip_protocol(self, path): # normalization only - fs = _resolve_fs(path, self.method) + fs = _resolve_fs(path, self.method, storage_options=self.st_opts) return fs.unstrip_protocol(fs._strip_protocol(path)) async def _find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs): - fs = _resolve_fs(path, self.method) + fs = _resolve_fs(path, self.method, storage_options=self.st_opts) if fs.async_impl: out = await fs._find( path, maxdepth=maxdepth, withdirs=withdirs, detail=True, **kwargs @@ -251,7 +252,7 @@ async def _pipe_file( value, **kwargs, ): - fs = _resolve_fs(path, self.method) + fs = _resolve_fs(path, self.method, storage_options=self.st_opts) if fs.async_impl: return await fs._pipe_file(path, value, **kwargs) else: @@ -269,7 +270,7 @@ async def _rm(self, url, **kwargs): async def _makedirs(self, path, exist_ok=False): logger.debug("Make dir %s", path) - fs = _resolve_fs(path, self.method) + fs = _resolve_fs(path, self.method, storage_options=self.st_opts) if fs.async_impl: await fs._makedirs(path, exist_ok=exist_ok) else: @@ -288,6 +289,7 @@ async def _cp_file( url2, blocksize=2**20, callback=DEFAULT_CALLBACK, + tempdir: Optional[str] = None, **kwargs, ): fs = _resolve_fs(url, self.method) @@ -295,35 +297,10 @@ async def _cp_file( if fs is fs2: # pure remote if fs.async_impl: - return await fs._cp_file(url, url2, **kwargs) + return await fs._copy(url, url2, **kwargs) else: - return fs.cp_file(url, url2, **kwargs) - kw = {"blocksize": 0, "cache_type": "none"} - try: - f1 = ( - await fs.open_async(url, "rb") - if hasattr(fs, "open_async") - else fs.open(url, "rb", **kw) - ) - callback.set_size(await maybe_await(f1.size)) - f2 = ( - await fs2.open_async(url2, "wb") - if hasattr(fs2, "open_async") - else fs2.open(url2, "wb", **kw) - ) - while f1.size is None or f2.tell() < f1.size: - data = await maybe_await(f1.read(blocksize)) - if f1.size is None and not data: - break - await maybe_await(f2.write(data)) - callback.absolute_update(f2.tell()) - finally: - try: - await maybe_await(f2.close()) - await maybe_await(f1.close()) - except NameError: - # fail while opening f1 or f2 - pass + return fs.copy(url, url2, **kwargs) + await copy_file_op(fs, [url], fs2, [url2], tempdir, 1, on_error="raise") async def _make_many_dirs(self, urls, exist_ok=True): fs = _resolve_fs(urls[0], self.method) @@ -347,17 +324,22 @@ async def _copy( tempdir: Optional[str] = None, **kwargs, ): + # TODO: special case for one FS being local, which can use get/put + # TODO: special case for one being memFS, which can use cat/pipe if recursive: - raise NotImplementedError - fs = _resolve_fs(path1[0], self.method) - fs2 = _resolve_fs(path2[0], self.method) - # not expanding paths atm., assume call is from rsync() + raise NotImplementedError("Please use fsspec.generic.rsync") + path1 = [path1] if isinstance(path1, str) else path1 + path2 = [path2] if isinstance(path2, str) else path2 + + fs = _resolve_fs(path1, self.method) + fs2 = _resolve_fs(path2, self.method) + if fs is fs2: - # pure remote if fs.async_impl: return await fs._copy(path1, path2, **kwargs) else: return fs.copy(path1, path2, **kwargs) + await copy_file_op( fs, path1, fs2, path2, tempdir, batch_size, on_error=on_error ) @@ -377,31 +359,33 @@ async def copy_file_op( fs2, u2, os.path.join(tempdir, uuid.uuid4().hex), - on_error=on_error, ) for u1, u2 in zip(url1, url2) ] - await _run_coros_in_chunks(coros, batch_size=batch_size) + out = await _run_coros_in_chunks( + coros, batch_size=batch_size, return_exceptions=True + ) finally: shutil.rmtree(tempdir) + if on_error == "return": + return out + elif on_error == "raise": + for o in out: + if isinstance(o, Exception): + raise o async def _copy_file_op(fs1, url1, fs2, url2, local, on_error="ignore"): - ex = () if on_error == "raise" else Exception - logger.debug("Copy %s -> %s", url1, url2) - try: - if fs1.async_impl: - await fs1._get_file(url1, local) - else: - fs1.get_file(url1, local) - if fs2.async_impl: - await fs2._put_file(local, url2) - else: - fs2.put_file(local, url2) - os.unlink(local) - logger.debug("Copy %s -> %s; done", url1, url2) - except ex as e: - logger.debug("ignoring cp exception for %s: %s", url1, e) + if fs1.async_impl: + await fs1._get_file(url1, local) + else: + fs1.get_file(url1, local) + if fs2.async_impl: + await fs2._put_file(local, url2) + else: + fs2.put_file(local, url2) + os.unlink(local) + logger.debug("Copy %s -> %s; done", url1, url2) async def maybe_await(cor): diff --git a/fsspec/tests/test_generic.py b/fsspec/tests/test_generic.py index bd9745f0c..6e2ba0469 100644 --- a/fsspec/tests/test_generic.py +++ b/fsspec/tests/test_generic.py @@ -48,6 +48,34 @@ def test_cat_async(server): assert fs.cat(server.realfile) == data +def test_cp_one(server, tmpdir): + fsspec.filesystem("http", headers={"give_length": "true", "head_ok": "true"}) + local = fsspec.filesystem("file") + fn = f"file://{tmpdir}/afile" + + fs = fsspec.filesystem("generic", default_method="current") + + fs.copy([server.realfile], [fn]) + assert local.cat(fn) == data + fs.rm(fn) + assert not fs.exists(fn) + + fs.copy(server.realfile, fn) + assert local.cat(fn) == data + fs.rm(fn) + assert not fs.exists(fn) + + fs.cp([server.realfile], [fn]) + assert local.cat(fn) == data + fs.rm(fn) + assert not fs.exists(fn) + + fs.cp_file(server.realfile, fn) + assert local.cat(fn) == data + fs.rm(fn) + assert not fs.exists(fn) + + def test_rsync(tmpdir, m): from fsspec.generic import GenericFileSystem, rsync