|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +from fsspec import filesystem |
| 4 | +from fsspec.implementations.chained import ChainedFileSystem |
| 5 | + |
| 6 | +__all__ = ("UnionFileSystem",) |
| 7 | + |
| 8 | + |
| 9 | +class UnionFileSystem(ChainedFileSystem): |
| 10 | + """Union filesystem""" |
| 11 | + |
| 12 | + def __init__(self, target_protocol=None, target_options=None, fs=None, **kwargs): |
| 13 | + """ |
| 14 | + Args: |
| 15 | + target_protocol: str (optional) Target filesystem protocol. Provide either this or ``fs``. |
| 16 | + target_options: dict or None Passed to the instantiation of the FS, if fs is None. |
| 17 | + fs: filesystem instance The target filesystem to run against. Provide this or ``protocol``. |
| 18 | + """ |
| 19 | + super().__init__(**kwargs) |
| 20 | + if fs is None and target_protocol is None: |
| 21 | + raise ValueError("Please provide filesystem instance(fs) or target_protocol") |
| 22 | + if not (fs is None) ^ (target_protocol is None): |
| 23 | + raise ValueError("Both filesystems (fs) and target_protocol may not be both given.") |
| 24 | + |
| 25 | + if target_protocol: |
| 26 | + # unpack the targets and then instantiate in reverse order |
| 27 | + fs_options = [{"target_protocol": target_protocol, "target_options": kwargs}] |
| 28 | + fss = [] |
| 29 | + |
| 30 | + while "target_options" in target_options: |
| 31 | + target_protocol = target_options.pop("target_protocol") |
| 32 | + new_target_options = target_options.pop("target_options") |
| 33 | + kwargs = target_options |
| 34 | + fs_options.append({"target_protocol": target_protocol, "target_options": kwargs}) |
| 35 | + target_options = new_target_options |
| 36 | + |
| 37 | + # instantiate in reverse order |
| 38 | + for fspec in reversed(fs_options): |
| 39 | + target_protocol = fspec["target_protocol"] |
| 40 | + target_options = fspec["target_options"] |
| 41 | + fss.append(filesystem(target_protocol, fs=fss[-1] if fss else None, **target_options)) |
| 42 | + fss.reverse() |
| 43 | + self.fss = fss |
| 44 | + self.fs = fss[0] |
| 45 | + else: |
| 46 | + self.fss = [fs] |
| 47 | + self.fs = fs |
| 48 | + |
| 49 | + def exit(self): |
| 50 | + for fs in self.fss: |
| 51 | + if hasattr(fs, "exit"): |
| 52 | + fs.exit() |
0 commit comments