Skip to content

Commit cabe727

Browse files
committed
adds hdfs and s3 tests
1 parent 169b11e commit cabe727

File tree

4 files changed

+234
-129
lines changed

4 files changed

+234
-129
lines changed

environment.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ dependencies:
1414
- pylint
1515
- flake8
1616
- pyarrow
17-
- libhdfs3
17+
- moto
1818
- pip
1919
- pip:
2020
- hadoop-test-cluster

upath/core.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@
1010
from upath.errors import NotDirectoryError
1111

1212

13-
14-
15-
1613
class _FSSpecAccessor:
1714

1815
def __init__(self, parsed_url, *args, **kwargs):
@@ -44,6 +41,14 @@ def wrapper(*args, **kwargs):
4441
else:
4542
if not self._fs.root_marker and kwargs['path'].startswith('/'):
4643
kwargs['path'] = kwargs['path'][1:]
44+
if self._url.scheme == 'hdfs':
45+
if 'trunicate' in kwargs:
46+
kwargs.pop('trunicate')
47+
if func.__name__ == 'mkdir':
48+
args = args[:1]
49+
50+
print('args:', args)
51+
print('kwargs:', kwargs)
4752
return func(*args, **kwargs)
4853
return wrapper
4954

@@ -225,6 +230,7 @@ def is_dir(self):
225230

226231
def is_file(self):
227232
info = self._accessor.info(self)
233+
print(info)
228234
if info['type'] == 'file':
229235
return True
230236
return False
@@ -264,6 +270,10 @@ def rmdir(self, recursive=True):
264270
except:
265271
raise NotDirectoryError
266272
self._accessor.rm(self, recursive=recursive)
273+
274+
# def mkdir(self, mode=0o777, parents=False, exist_ok=False):
275+
# super().mkdir(mode=mode, parents=parents, exist_ok=exist_ok)
276+
267277

268278

269279

upath/tests/conftest.py

Lines changed: 103 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
1+
import os
12
import tempfile
23
import shutil
34
from pathlib import Path
5+
import subprocess
6+
import shlex
7+
import time
48

59

610
import pytest
11+
from fsspec import filesystem
712
from fsspec.implementations.local import LocalFileSystem
813
from fsspec.registry import (
914
register_implementation,
@@ -23,11 +28,25 @@ def clear_registry():
2328
finally:
2429
_registry.clear()
2530

31+
32+
# folder_structure = {
33+
# 'folders': {'folder1': {'folders': {},
34+
# 'files': {'file1.txt': 'file1.txt',
35+
# 'file2.txt': 'file2.txt'}}
36+
# 'files': {'file1.txt': 'hello_world',
37+
# 'file2.txt': 'hello_world'}
38+
39+
40+
# }
2641

2742
@pytest.fixture()
28-
def testingdir(clear_registry):
43+
def tempdir(clear_registry):
2944
tempdir = tempfile.TemporaryDirectory()
3045
tempdir = tempdir.name
46+
return tempdir
47+
48+
@pytest.fixture()
49+
def local_testdir(tempdir, clear_registry):
3150
tmp = Path(tempdir)
3251
tmp.mkdir()
3352
folder1 = tmp.joinpath('folder1')
@@ -47,19 +66,90 @@ def testingdir(clear_registry):
4766
yield tempdir
4867
shutil.rmtree(tempdir)
4968

69+
@pytest.fixture(scope='session')
70+
def htcluster():
71+
proc = subprocess.Popen(shlex.split("htcluster startup"),
72+
stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL)
73+
time.sleep(30)
74+
yield
75+
proc.terminate()
76+
proc.wait()
77+
proc1 = subprocess.Popen(shlex.split("htcluster shutdown"),
78+
stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL)
79+
proc1.terminate()
80+
proc1.wait()
81+
time.sleep(10)
82+
5083
@pytest.fixture()
51-
def hdfs_test_client():
52-
import os
84+
def hdfs(htcluster, tempdir, local_testdir):
85+
5386
pyarrow = pytest.importorskip('pyarrow')
54-
host = os.environ.get('ARROW_HDFS_TEST_HOST', '0.0.0.0')
55-
user = os.environ.get('ARROW_HDFS_TEST_USER', 'hdfs')
56-
try:
57-
port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 9000))
58-
except ValueError:
59-
raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
60-
'an integer')
87+
host, user, port = '0.0.0.0', 'hdfs', 9000
88+
#hdfs = filesystem('hdfs', host=host, user=user, port=port, driver='libhdfs3')
89+
hdfs = pyarrow.hdfs.connect(host='0.0.0.0', port=9000, user=user)
90+
print(tempdir)
91+
hdfs.mkdir(tempdir, create_parents=True)
92+
for x in Path(local_testdir).glob('**/*'):
93+
print(x)
94+
if x.is_file():
95+
text = x.read_text().encode('utf8')
96+
print(text)
97+
if not hdfs.exists(str(x.parent)):
98+
hdfs.mkdir(str(x.parent), create_parents=True)
99+
print(hdfs.exists(str(x.parent)))
100+
with hdfs.open(str(x), 'wb') as f:
101+
f.write(text)
102+
else:
103+
hdfs.mkdir(str(x))
104+
hdfs.close()
105+
yield host, user, port
106+
107+
108+
@pytest.fixture()
109+
def s3(tempdir, local_testdir):
110+
# writable local S3 system
111+
if "BOTO_CONFIG" not in os.environ: # pragma: no cover
112+
os.environ["BOTO_CONFIG"] = "/dev/null"
113+
if "AWS_ACCESS_KEY_ID" not in os.environ: # pragma: no cover
114+
os.environ["AWS_ACCESS_KEY_ID"] = "foo"
115+
if "AWS_SECRET_ACCESS_KEY" not in os.environ: # pragma: no cover
116+
os.environ["AWS_SECRET_ACCESS_KEY"] = "bar"
117+
requests = pytest.importorskip("requests")
118+
s3fs = pytest.importorskip("s3fs")
119+
pytest.importorskip("moto")
61120

62-
hdfs = pyarrow.hdfs.connect(host='0.0.0.0', port=9000, user=user, driver='libhdfs3')
63-
hdfs.mkdir('/folder1')
64-
return host, user, port
121+
port = 5555
122+
endpoint_uri = 'http://127.0.0.1:%s/' % port
123+
proc = subprocess.Popen(shlex.split("moto_server s3 -p %s" % port),
124+
stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL)
65125

126+
timeout = 5
127+
while timeout > 0:
128+
try:
129+
r = requests.get(endpoint_uri)
130+
if r.ok:
131+
break
132+
except Exception: # pragma: no cover
133+
pass
134+
timeout -= 0.1 # pragma: no cover
135+
time.sleep(0.1) # pragma: no cover
136+
anon = False
137+
s3so = dict(client_kwargs={'endpoint_url': endpoint_uri},
138+
use_listings_cache=False)
139+
s3 = s3fs.S3FileSystem(anon=False, **s3so)
140+
s3.mkdir(tempdir, create_parents=True)
141+
for x in Path(local_testdir).glob('**/*'):
142+
print(x)
143+
if x.is_file():
144+
text = x.read_text().encode('utf8')
145+
print(text)
146+
if not s3.exists(str(x.parent)):
147+
s3.mkdir(str(x.parent), create_parents=True)
148+
print(s3.exists(str(x.parent)))
149+
with s3.open(str(x), 'wb') as f:
150+
f.write(text)
151+
else:
152+
s3.mkdir(str(x))
153+
yield anon, s3so
154+
proc.terminate()
155+
proc.wait()

0 commit comments

Comments
 (0)