Skip to content

Commit e3c893f

Browse files
committed
tests passing for hdfs and s3
1 parent cabe727 commit e3c893f

File tree

3 files changed

+128
-73
lines changed

3 files changed

+128
-73
lines changed

upath/core.py

Lines changed: 65 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
from pathlib import *
44
import urllib
55
import re
6+
import asyncio
67

8+
import fsspec
9+
from fsspec.asyn import AsyncFileSystem
710
from fsspec.core import url_to_fs
811
from fsspec.registry import filesystem, get_filesystem_class
912

@@ -47,8 +50,6 @@ def wrapper(*args, **kwargs):
4750
if func.__name__ == 'mkdir':
4851
args = args[:1]
4952

50-
print('args:', args)
51-
print('kwargs:', kwargs)
5253
return func(*args, **kwargs)
5354
return wrapper
5455

@@ -80,8 +81,10 @@ class PureUniversalPath(PurePath):
8081
class UPath(pathlib.Path):
8182

8283
def __new__(cls, *args, **kwargs):
84+
print('new')
8385
if cls is UPath:
8486
new_args = list(args)
87+
print(new_args)
8588
first_arg = new_args.pop(0)
8689
parsed_url = urllib.parse.urlparse(first_arg)
8790
for key in ['scheme', 'netloc']:
@@ -92,13 +95,12 @@ def __new__(cls, *args, **kwargs):
9295
cls = WindowsPath if os.name == 'nt' else PosixPath
9396
else:
9497
cls = UniversalPath
95-
cls._url = parsed_url
96-
#kwargs['_url'] = parsed_url
97-
cls._kwargs = kwargs
98+
# cls._url = parsed_url
99+
kwargs['_url'] = parsed_url
98100
new_args.insert(0, parsed_url.path)
99101
args = tuple(new_args)
100102

101-
self = cls._from_parts(args, init=False)
103+
self = cls._from_parts_init(args, init=False)
102104
if not self._flavour.is_supported:
103105
raise NotImplementedError("cannot instantiate %r on your system"
104106
% (cls.__name__,))
@@ -109,68 +111,50 @@ def __new__(cls, *args, **kwargs):
109111
return self
110112

111113

112-
class UniversalPath(Path, PureUniversalPath):
114+
class UniversalPath(UPath, PureUniversalPath):
113115

114-
__slots__ = ('_url', '_kwargs', '_closed')
116+
__slots__ = ('_url', '_kwargs', '_closed', 'fs')
115117

116118
not_implemented = ['cwd', 'home', 'expanduser', 'group', 'is_mount',
117119
'is_symlink', 'is_socket', 'is_fifo', 'is_block_device',
118-
'is_char_device', 'lchmod', 'lstat', 'owner', 'readlink',
119-
]
120-
120+
'is_char_device', 'lchmod', 'lstat', 'owner', 'readlink']
121121

122122
def _init(self, *args, template=None, **kwargs):
123123
self._closed = False
124+
if not kwargs:
125+
kwargs = dict(**self._kwargs)
126+
else:
127+
self._kwargs = dict(**kwargs)
128+
self._url = kwargs.pop('_url') if kwargs.get('_url') else None
129+
124130
if not self._root:
125131
if not self._parts:
126132
self._root = '/'
127133
elif self._parts[0] == '/':
128134
self._root = self._parts.pop(0)
129135
if getattr(self, '_str', None):
130136
delattr(self, '_str')
131-
132137
if template is not None:
133138
self._accessor = template._accessor
134139
else:
135-
self._accessor = _FSSpecAccessor(self._url, *args, **self._kwargs)
136-
137-
@classmethod
138-
def _parse_args(cls, args):
139-
# This is useful when you don't want to create an instance, just
140-
# canonicalize some constructor arguments.
141-
parts = []
142-
for a in args:
143-
if isinstance(a, PurePath):
144-
parts += a._parts
145-
else:
146-
a = os.fspath(a)
147-
if isinstance(a, str):
148-
# Force-cast str subclasses to str (issue #21127)
149-
parts.append(str(a))
150-
else:
151-
raise TypeError(
152-
"argument should be a str object or an os.PathLike "
153-
"object returning str, not %r"
154-
% type(a))
155-
return cls._flavour.parse_parts(parts)
140+
self._accessor = _FSSpecAccessor(self._url, *args, **kwargs)
141+
self.fs = self._accessor._fs
156142

157143
def __getattribute__(self, item):
158144
if item == '__class__':
159145
return UniversalPath
160-
not_implemented = getattr(UniversalPath, 'not_implemented')
161-
if item in not_implemented:
146+
if item in getattr(UniversalPath, 'not_implemented'):
162147
raise NotImplementedError(f'UniversalPath has no attribute {item}')
163148
else:
164149
return super().__getattribute__(item)
165150

166-
@classmethod
167-
def _format_parsed_parts(cls, drv, root, parts):
151+
def _format_parsed_parts(self, drv, root, parts):
168152
join_parts = parts[1:] if parts[0] == '/' else parts
169153
if (drv or root):
170-
path = drv + root + cls._flavour.join(join_parts)
154+
path = drv + root + self._flavour.join(join_parts)
171155
else:
172-
path = cls._flavour.join(join_parts)
173-
scheme, netloc = cls._url.scheme, cls._url.netloc
156+
path = self._flavour.join(join_parts)
157+
scheme, netloc = self._url.scheme, self._url.netloc
174158
scheme = scheme + ':'
175159
netloc = '//' + netloc if netloc else ''
176160
formatted = scheme + netloc + path
@@ -230,7 +214,6 @@ def is_dir(self):
230214

231215
def is_file(self):
232216
info = self._accessor.info(self)
233-
print(info)
234217
if info['type'] == 'file':
235218
return True
236219
return False
@@ -251,15 +234,26 @@ def touch(self, trunicate=True, **kwargs):
251234
self._accessor.touch(self, trunicate=trunicate, **kwargs)
252235

253236
def unlink(self, missing_ok=False):
237+
async def async_unlink():
238+
async def rm():
239+
try:
240+
await self._accessor.rm_file(self)
241+
except:
242+
await self._accessor.rm(self, recursive=False)
243+
244+
245+
coro = rm()
246+
done, pending = await asyncio.wait({coro})
247+
if coro is done:
248+
return
249+
250+
# print('exists:', self.exists())
254251
if not self.exists():
255252
if not missing_ok:
256253
raise FileNotFoundError
257254
else:
258255
return
259-
try:
260-
self._accessor.rm_file(self)
261-
except:
262-
self._accessor.rm(self, recursive=False)
256+
asyncio.run(async_unlink())
263257

264258
def rmdir(self, recursive=True):
265259
'''Add warning if directory not empty
@@ -271,9 +265,30 @@ def rmdir(self, recursive=True):
271265
raise NotDirectoryError
272266
self._accessor.rm(self, recursive=recursive)
273267

274-
# def mkdir(self, mode=0o777, parents=False, exist_ok=False):
275-
# super().mkdir(mode=mode, parents=parents, exist_ok=exist_ok)
276-
277-
278-
268+
@classmethod
269+
def _from_parts_init(cls, args, init=False):
270+
print(args)
271+
print(init)
272+
return super()._from_parts(args, init=init)
273+
274+
def _from_parts(self, args, init=True):
275+
# We need to call _parse_args on the instance, so as to get the
276+
# right flavour.
277+
obj = object.__new__(UniversalPath)
278+
drv, root, parts = self._parse_args(args)
279+
obj._drv = drv
280+
obj._root = root
281+
obj._parts = parts
282+
if init:
283+
obj._init(**self._kwargs)
284+
return obj
285+
286+
def _from_parsed_parts(self, drv, root, parts, init=True):
287+
obj = object.__new__(UniversalPath)
288+
obj._drv = drv
289+
obj._root = root
290+
obj._parts = parts
291+
if init:
292+
obj._init(**self._kwargs)
293+
return obj
279294

upath/tests/conftest.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -87,16 +87,12 @@ def hdfs(htcluster, tempdir, local_testdir):
8787
host, user, port = '0.0.0.0', 'hdfs', 9000
8888
#hdfs = filesystem('hdfs', host=host, user=user, port=port, driver='libhdfs3')
8989
hdfs = pyarrow.hdfs.connect(host='0.0.0.0', port=9000, user=user)
90-
print(tempdir)
9190
hdfs.mkdir(tempdir, create_parents=True)
9291
for x in Path(local_testdir).glob('**/*'):
93-
print(x)
9492
if x.is_file():
9593
text = x.read_text().encode('utf8')
96-
print(text)
9794
if not hdfs.exists(str(x.parent)):
9895
hdfs.mkdir(str(x.parent), create_parents=True)
99-
print(hdfs.exists(str(x.parent)))
10096
with hdfs.open(str(x), 'wb') as f:
10197
f.write(text)
10298
else:
@@ -105,8 +101,8 @@ def hdfs(htcluster, tempdir, local_testdir):
105101
yield host, user, port
106102

107103

108-
@pytest.fixture()
109-
def s3(tempdir, local_testdir):
104+
@pytest.fixture(scope='session')
105+
def s3_server():
110106
# writable local S3 system
111107
if "BOTO_CONFIG" not in os.environ: # pragma: no cover
112108
os.environ["BOTO_CONFIG"] = "/dev/null"
@@ -115,7 +111,7 @@ def s3(tempdir, local_testdir):
115111
if "AWS_SECRET_ACCESS_KEY" not in os.environ: # pragma: no cover
116112
os.environ["AWS_SECRET_ACCESS_KEY"] = "bar"
117113
requests = pytest.importorskip("requests")
118-
s3fs = pytest.importorskip("s3fs")
114+
119115
pytest.importorskip("moto")
120116

121117
port = 5555
@@ -136,20 +132,24 @@ def s3(tempdir, local_testdir):
136132
anon = False
137133
s3so = dict(client_kwargs={'endpoint_url': endpoint_uri},
138134
use_listings_cache=False)
135+
136+
yield anon, s3so
137+
proc.terminate()
138+
proc.wait()
139+
140+
@pytest.fixture
141+
def s3(s3_server, tempdir, local_testdir):
142+
s3fs = pytest.importorskip("s3fs")
143+
anon, s3so = s3_server
139144
s3 = s3fs.S3FileSystem(anon=False, **s3so)
140145
s3.mkdir(tempdir, create_parents=True)
141146
for x in Path(local_testdir).glob('**/*'):
142-
print(x)
143147
if x.is_file():
144148
text = x.read_text().encode('utf8')
145-
print(text)
146149
if not s3.exists(str(x.parent)):
147150
s3.mkdir(str(x.parent), create_parents=True)
148-
print(s3.exists(str(x.parent)))
149151
with s3.open(str(x), 'wb') as f:
150152
f.write(text)
151153
else:
152-
s3.mkdir(str(x))
154+
s3.mkdir(str(x))
153155
yield anon, s3so
154-
proc.terminate()
155-
proc.wait()

upath/tests/test_core.py

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ def test_lstat(self):
116116
def test_mkdir(self):
117117
new_dir = self.path.joinpath('new_dir')
118118
new_dir.mkdir()
119+
print(new_dir._accessor.info(new_dir))
119120
assert new_dir.exists()
120121

121122
def test_open(self):
@@ -152,15 +153,7 @@ def test_resolve(self):
152153
def test_rglob(self):
153154
pass
154155

155-
def test_rmdir(self, local_testdir):
156-
dirname = 'rmdir_test'
157-
mock_dir = self.path.joinpath(dirname)
158-
mock_dir.mkdir()
159-
mock_dir.rmdir()
160-
assert not mock_dir.exists()
161156

162-
with pytest.raises(NotDirectoryError):
163-
self.path.joinpath('file1.txt').rmdir()
164157

165158
def test_samefile(self):
166159
pass
@@ -205,9 +198,7 @@ class TestUPathHDFS(TestUpath):
205198
@pytest.fixture(autouse=True)
206199
def path(self, local_testdir, hdfs):
207200
host, user, port = hdfs
208-
print(local_testdir)
209201
path = f'hdfs:{local_testdir}'
210-
print(path)
211202
self.path = UPath(path,
212203
host=host,
213204
user=user,
@@ -225,3 +216,52 @@ def path(self, local_testdir, s3):
225216
anon, s3so = s3
226217
path = f's3:{local_testdir}'
227218
self.path = UPath(path, anon=anon, **s3so)
219+
220+
def test_chmod(self):
221+
# todo
222+
pass
223+
224+
def test_mkdir(self):
225+
new_dir = self.path.joinpath('new_dir')
226+
#new_dir.mkdir()
227+
# mkdir doesnt really do anything. A directory only exists in s3
228+
# if some file or something is written to it
229+
f = new_dir.joinpath('test.txt').touch()
230+
assert new_dir.exists()
231+
232+
def test_rmdir(self, local_testdir):
233+
dirname = 'rmdir_test'
234+
mock_dir = self.path.joinpath(dirname)
235+
f = mock_dir.joinpath('test.txt').touch()
236+
mock_dir.rmdir()
237+
assert not mock_dir.exists()
238+
with pytest.raises(NotDirectoryError):
239+
self.path.joinpath('file1.txt').rmdir()
240+
241+
def test_touch_unlink(self):
242+
path = self.path.joinpath('test_touch.txt')
243+
path.touch()
244+
assert path.exists()
245+
path.unlink()
246+
assert not path.exists()
247+
248+
# should raise FileNotFoundError since file is missing
249+
with pytest.raises(FileNotFoundError):
250+
path.unlink()
251+
252+
# file doesn't exists, but missing_ok is True
253+
path.unlink(missing_ok=True)
254+
255+
def test_multiple_backend_paths(local_testdir, s3, hdfs):
256+
anon, s3so = s3
257+
path = f's3:{local_testdir}'
258+
s3_path = UPath(path, anon=anon, **s3so)
259+
assert s3_path.joinpath('text.txt')._url.scheme == 's3'
260+
host, user, port = hdfs
261+
path = f'hdfs:{local_testdir}'
262+
hdfs_path = UPath(path,
263+
host=host,
264+
user=user,
265+
port=port)
266+
assert s3_path.joinpath('text1.txt')._url.scheme == 's3'
267+

0 commit comments

Comments
 (0)