Skip to content

Commit adb330d

Browse files
Merge pull request #40 from andrewfulton9/pr36
Adds GCSPath
2 parents 0b44a7b + 4ca8353 commit adb330d

File tree

9 files changed

+218
-17
lines changed

9 files changed

+218
-17
lines changed

environment.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,21 @@ channels:
44
- conda-forge
55
dependencies:
66
- python==3.8
7-
- fsspec==0.8.4
7+
- fsspec==2021.11.1
88
# optional
99
- requests
1010
- s3fs
1111
- jupyter
1212
- ipython
1313
- pytest
14+
- vcrpy
1415
- pylint
1516
- flake8
1617
- pyarrow
1718
- moto
1819
- pip
1920
- pip:
2021
- hadoop-test-cluster
22+
- gcsfs
2123
- nox
2224

noxfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def install(session):
2727

2828
@nox.session(python=False)
2929
def smoke(session):
30-
session.install(*"pytest aiohttp requests".split())
30+
session.install(*"pytest aiohttp requests gcsfs".split())
3131
session.run(*"pytest --skiphdfs upath".split())
3232

3333

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ test = [
3636
"pylint",
3737
"pytest",
3838
"requests",
39-
"s3fs",
39+
"s3fs"
4040
]
4141

4242
[tool.flit.scripts]

upath/core.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def wrapper(*args, **kwargs):
4040
return wrapper
4141

4242
def _transform_arg_paths(self, args, kwargs):
43-
"""formats the path properly for the filesystem backend."""
43+
"""Formats the path properly for the filesystem backend."""
4444
args = list(args)
4545
first_arg = args.pop(0)
4646
if not kwargs.get("path"):
@@ -53,7 +53,7 @@ def _transform_arg_paths(self, args, kwargs):
5353
return args, kwargs
5454

5555
def _format_path(self, s):
56-
"""placeholder method for subclassed filesystems"""
56+
"""Placeholder method for subclassed filesystems"""
5757
return s
5858

5959
def __getattribute__(self, item):
@@ -273,6 +273,9 @@ def is_file(self):
273273
return True
274274
return False
275275

276+
def chmod(self, mod):
277+
raise NotImplementedError
278+
276279
def rename(self, target):
277280
# can be implemented, but may be tricky
278281
raise NotImplementedError

upath/implementations/gcs.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import upath.core
2+
import os
3+
import re
4+
5+
6+
class _GCSAccessor(upath.core._FSSpecAccessor):
7+
def __init__(self, parsed_url, *args, **kwargs):
8+
super().__init__(parsed_url, *args, **kwargs)
9+
10+
def _format_path(self, s):
11+
"""
12+
netloc has already been set to project via `GCSPath._init`
13+
"""
14+
s = os.path.join(self._url.netloc, s.lstrip("/"))
15+
return s
16+
17+
18+
# project is not part of the path, but is part of the credentials
19+
class GCSPath(upath.core.UPath):
20+
_default_accessor = _GCSAccessor
21+
22+
def _init(self, *args, template=None, **kwargs):
23+
# ensure that the bucket is part of the netloc path
24+
if kwargs.get("bucket") and kwargs.get("_url"):
25+
bucket = kwargs.pop("bucket")
26+
kwargs["_url"] = kwargs["_url"]._replace(netloc=bucket)
27+
super()._init(*args, template=template, **kwargs)
28+
29+
def _sub_path(self, name):
30+
"""gcs returns path as `{bucket}/<path>` with listdir
31+
and glob, so here we can add the netloc to the sub string
32+
so it gets subbed out as well
33+
"""
34+
sp = self.path
35+
subed = re.sub(f"^({self._url.netloc})?/?({sp}|{sp[1:]})/?", "", name)
36+
return subed
37+
38+
def joinpath(self, *args):
39+
if self._url.netloc:
40+
return super().joinpath(*args)
41+
# handles a bucket in the path
42+
else:
43+
path = args[0]
44+
if isinstance(path, list):
45+
args_list = list(*args)
46+
else:
47+
args_list = path.split(self._flavour.sep)
48+
bucket = args_list.pop(0)
49+
self._kwargs["bucket"] = bucket
50+
return super().joinpath(*tuple(args_list))

upath/implementations/s3.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,28 +9,33 @@ def __init__(self, parsed_url, *args, **kwargs):
99
super().__init__(parsed_url, *args, **kwargs)
1010

1111
def _format_path(self, s):
12+
"""If the filesystem backend doesn't have a root_marker, strip the
13+
leading slash of a path and add the bucket
14+
"""
1215
s = os.path.join(self._url.netloc, s.lstrip("/"))
1316
return s
1417

1518

1619
class S3Path(upath.core.UPath):
1720
_default_accessor = _S3Accessor
1821

22+
def _init(self, *args, template=None, **kwargs):
23+
# ensure that the bucket is part of the netloc path
24+
if kwargs.get("bucket") and kwargs.get("_url"):
25+
bucket = kwargs.pop("bucket")
26+
kwargs["_url"] = kwargs["_url"]._replace(netloc=bucket)
27+
28+
super()._init(*args, template=template, **kwargs)
29+
1930
def _sub_path(self, name):
2031
"""s3fs returns path as `{bucket}/<path>` with listdir
2132
and glob, so here we can add the netloc to the sub string
2233
so it gets subbed out as well
2334
"""
2435
sp = self.path
25-
subed = re.sub(f"^{self._url.netloc}/({sp}|{sp[1:]})/?", "", name)
36+
subed = re.sub(f"^({self._url.netloc})?/?({sp}|{sp[1:]})/?", "", name)
2637
return subed
2738

28-
def _init(self, *args, template=None, **kwargs):
29-
if kwargs.get("bucket") and kwargs.get("_url"):
30-
bucket = kwargs.pop("bucket")
31-
kwargs["_url"] = kwargs["_url"]._replace(netloc=bucket)
32-
super()._init(*args, template=template, **kwargs)
33-
3439
def joinpath(self, *args):
3540
if self._url.netloc:
3641
return super().joinpath(*args)

upath/registry.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,15 @@
44

55

66
class _Registry:
7-
from upath.implementations import hdfs, http, memory, s3
7+
from upath.implementations import hdfs, http, memory, s3, gcs
88

99
http = http.HTTPPath
1010
hdfs = hdfs.HDFSPath
11+
s3a = s3.S3Path
1112
s3 = s3.S3Path
1213
memory = memory.MemoryPath
14+
gs = gcs.GCSPath
15+
gcs = gcs.GCSPath
1316

1417
def __getitem__(self, item):
1518
implemented_path = getattr(self, item, None)

upath/tests/conftest.py

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@
55
import shlex
66
import time
77
import sys
8-
8+
from gcsfs.core import GCSFileSystem
99

1010
import pytest
1111
from fsspec.implementations.local import LocalFileSystem
1212
from fsspec.registry import register_implementation, _registry
1313

14+
import fsspec
15+
import requests
16+
1417

1518
def pytest_addoption(parser):
1619
parser.addoption(
@@ -45,13 +48,13 @@ def clear_registry():
4548
_registry.clear()
4649

4750

48-
@pytest.fixture()
51+
@pytest.fixture(scope="function")
4952
def tempdir(clear_registry):
5053
with tempfile.TemporaryDirectory() as tempdir:
5154
yield tempdir
5255

5356

54-
@pytest.fixture()
57+
@pytest.fixture(scope="function")
5558
def local_testdir(tempdir, clear_registry):
5659
tmp = Path(tempdir)
5760
tmp.mkdir(exist_ok=True)
@@ -153,7 +156,7 @@ def s3_server():
153156
time.sleep(0.1) # pragma: no cover
154157
anon = False
155158
s3so = dict(
156-
client_kwargs={"endpoint_url": endpoint_uri}, use_listings_cache=False
159+
client_kwargs={"endpoint_url": endpoint_uri}, use_listings_cache=True
157160
)
158161
yield anon, s3so
159162
proc.terminate()
@@ -176,3 +179,74 @@ def s3(s3_server, tempdir, local_testdir):
176179
else:
177180
s3.mkdir(str(x))
178181
yield anon, s3so
182+
183+
184+
def stop_docker(container):
185+
cmd = shlex.split('docker ps -a -q --filter "name=%s"' % container)
186+
cid = subprocess.check_output(cmd).strip().decode()
187+
if cid:
188+
subprocess.call(["docker", "rm", "-f", "-v", cid])
189+
190+
191+
TEST_PROJECT = os.environ.get("GCSFS_TEST_PROJECT", "test_project")
192+
193+
194+
@pytest.fixture(scope="module")
195+
def docker_gcs():
196+
if "STORAGE_EMULATOR_HOST" in os.environ:
197+
# assume using real API or otherwise have a server already set up
198+
yield os.environ["STORAGE_EMULATOR_HOST"]
199+
return
200+
container = "gcsfs_test"
201+
cmd = (
202+
"docker run -d -p 4443:4443 --name gcsfs_test fsouza/fake-gcs-server:latest -scheme " # noqa: E501
203+
"http -public-host http://localhost:4443 -external-url http://localhost:4443" # noqa: E501
204+
)
205+
stop_docker(container)
206+
subprocess.check_output(shlex.split(cmd))
207+
url = "http://0.0.0.0:4443"
208+
timeout = 10
209+
while True:
210+
try:
211+
r = requests.get(url + "/storage/v1/b")
212+
if r.ok:
213+
print("url: ", url)
214+
yield url
215+
break
216+
except Exception as e: # noqa: E722
217+
timeout -= 1
218+
if timeout < 0:
219+
raise SystemError from e
220+
time.sleep(1)
221+
stop_docker(container)
222+
223+
224+
@pytest.fixture
225+
def gcs(docker_gcs, tempdir, local_testdir, populate=True):
226+
# from gcsfs.credentials import GoogleCredentials
227+
GCSFileSystem.clear_instance_cache()
228+
gcs = fsspec.filesystem("gcs", endpoint_url=docker_gcs)
229+
try:
230+
# ensure we're empty.
231+
try:
232+
gcs.rm("tmp", recursive=True)
233+
except FileNotFoundError:
234+
pass
235+
try:
236+
gcs.mkdir("tmp")
237+
print("made tmp dir")
238+
except Exception:
239+
pass
240+
if populate:
241+
for x in Path(local_testdir).glob("**/*"):
242+
if x.is_file():
243+
gcs.upload(str(x), str(x))
244+
else:
245+
gcs.mkdir(str(x))
246+
gcs.invalidate_cache()
247+
yield docker_gcs
248+
finally:
249+
try:
250+
gcs.rm(gcs.find("tmp"))
251+
except: # noqa: E722
252+
pass
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import pytest
2+
import sys
3+
4+
from upath import UPath
5+
from upath.implementations.gcs import GCSPath
6+
from upath.errors import NotDirectoryError
7+
from upath.tests.cases import BaseTests
8+
9+
10+
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Windows bad")
11+
@pytest.mark.usefixtures("path")
12+
class TestGCSPath(BaseTests):
13+
@pytest.fixture(autouse=True, scope="function")
14+
def path(self, local_testdir, gcs):
15+
scheme = "gs:/"
16+
self.path = UPath(f"{scheme}{local_testdir}", endpoint_url=gcs)
17+
self.endpoint_url = gcs
18+
19+
def test_is_GCSPath(self):
20+
assert isinstance(self.path, GCSPath)
21+
22+
def test_mkdir(self):
23+
new_dir = self.path.joinpath("new_dir")
24+
new_dir.joinpath("test.txt").touch()
25+
assert new_dir.exists()
26+
27+
def test_glob(self, pathlib_base):
28+
mock_glob = list(self.path.glob("**.txt"))
29+
path_glob = list(pathlib_base.glob("**/*.txt"))
30+
31+
assert len(mock_glob) == len(path_glob)
32+
assert all(
33+
map(lambda m: m.path in [str(p)[4:] for p in path_glob], mock_glob)
34+
)
35+
36+
def test_rmdir(self, local_testdir):
37+
dirname = "rmdir_test"
38+
mock_dir = self.path.joinpath(dirname)
39+
mock_dir.joinpath("test.txt").write_text("hello")
40+
mock_dir.fs.invalidate_cache()
41+
mock_dir.rmdir()
42+
assert not mock_dir.exists()
43+
with pytest.raises(NotDirectoryError):
44+
self.path.joinpath("file1.txt").rmdir()
45+
46+
def test_fsspec_compat(self):
47+
fs = self.path.fs
48+
scheme = self.path._url.scheme
49+
content = b"a,b,c\n1,2,3\n4,5,6"
50+
51+
p1 = f"{scheme}:///tmp/output1.csv"
52+
upath1 = UPath(p1, endpoint_url=self.endpoint_url)
53+
upath1.write_bytes(content)
54+
with fs.open(p1) as f:
55+
assert f.read() == content
56+
upath1.unlink()
57+
58+
# write with fsspec, read with upath
59+
p2 = f"{scheme}:///tmp/output2.csv"
60+
with fs.open(p2, "wb") as f:
61+
f.write(content)
62+
upath2 = UPath(p2, endpoint_url=self.endpoint_url)
63+
assert upath2.read_bytes() == content
64+
upath2.unlink()

0 commit comments

Comments
 (0)