Skip to content

Commit deaf620

Browse files
authored
WorkspacePath to implement pathlib.Path API (#1509)
1 parent 5bbd870 commit deaf620

File tree

3 files changed

+392
-0
lines changed

3 files changed

+392
-0
lines changed
Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
import abc
2+
import locale
3+
import logging
4+
import os
5+
import pathlib
6+
from functools import cached_property
7+
8+
# pylint: disable-next=import-private-name
9+
from pathlib import Path, _PosixFlavour, _Accessor # type: ignore
10+
from urllib.parse import quote_from_bytes as urlquote_from_bytes
11+
from io import BytesIO, StringIO
12+
13+
from databricks.sdk import WorkspaceClient
14+
from databricks.sdk.errors import NotFound
15+
from databricks.sdk.service.workspace import ObjectInfo, ObjectType, ExportFormat, ImportFormat
16+
17+
18+
logger = logging.getLogger(__name__)
19+
20+
21+
class _DatabricksFlavour(_PosixFlavour):
22+
def __init__(self, ws: WorkspaceClient):
23+
super().__init__()
24+
self._ws = ws
25+
26+
def make_uri(self, path):
27+
return self._ws.config.host + '#workspace' + urlquote_from_bytes(bytes(path))
28+
29+
def __repr__(self):
30+
return f"<{self.__class__.__name__} for {self._ws}>"
31+
32+
33+
def _na(fn: str):
34+
def _inner(*_, **__):
35+
__tracebackhide__ = True # pylint: disable=unused-variable
36+
raise NotImplementedError(f"{fn}() is not available for Databricks Workspace")
37+
38+
return _inner
39+
40+
41+
class _ScandirItem:
42+
def __init__(self, object_info):
43+
self._object_info = object_info
44+
45+
def __fspath__(self):
46+
return self._object_info.path
47+
48+
def is_dir(self):
49+
return self._object_info.object_type == ObjectType.DIRECTORY
50+
51+
def is_file(self):
52+
# TODO: check if we want to show notebooks as files
53+
return self._object_info.object_type == ObjectType.FILE
54+
55+
def is_symlink(self):
56+
return False
57+
58+
@property
59+
def name(self):
60+
return os.path.basename(self._object_info.path)
61+
62+
63+
class _ScandirIterator:
64+
def __init__(self, objects):
65+
self._it = objects
66+
67+
def __iter__(self):
68+
for object_info in self._it:
69+
yield _ScandirItem(object_info)
70+
71+
def __enter__(self):
72+
return self
73+
74+
def __exit__(self, *args):
75+
pass
76+
77+
78+
class _DatabricksAccessor(_Accessor):
79+
chmod = _na('accessor.chmod')
80+
getcwd = _na('accessor.getcwd')
81+
group = _na('accessor.group')
82+
link = _na('accessor.link')
83+
mkdir = _na('accessor.mkdir')
84+
owner = _na('accessor.owner')
85+
readlink = _na('accessor.readlink')
86+
realpath = _na('accessor.realpath')
87+
rename = _na('accessor.rename')
88+
replace = _na('accessor.replace')
89+
rmdir = _na('accessor.rmdir')
90+
stat = _na('accessor.stat')
91+
symlink = _na('accessor.symlink')
92+
unlink = _na('accessor.unlink')
93+
94+
def __init__(self, ws: WorkspaceClient):
95+
self._ws = ws
96+
97+
def expanduser(self, path):
98+
home = f"/Users/{self._ws.current_user.me().user_name}"
99+
return path.replace("~", home)
100+
101+
def __repr__(self):
102+
return f"<{self.__class__.__name__} for {self._ws}>"
103+
104+
def scandir(self, path):
105+
return _ScandirIterator(self._ws.workspace.list(path))
106+
107+
def listdir(self, path):
108+
return [item.name for item in self.scandir(path)]
109+
110+
111+
class _UploadIO(abc.ABC):
112+
def __init__(self, ws: WorkspaceClient, path: str):
113+
self._ws = ws
114+
self._path = path
115+
116+
def close(self):
117+
# pylint: disable-next=no-member
118+
io_stream = self.getvalue() # noqa
119+
self._ws.workspace.upload(self._path, io_stream, format=ImportFormat.AUTO)
120+
121+
def __repr__(self):
122+
return f"<{self.__class__.__name__} for {self._path} on {self._ws}>"
123+
124+
125+
class _BinaryUploadIO(_UploadIO, BytesIO): # type: ignore
126+
def __init__(self, ws: WorkspaceClient, path: str):
127+
_UploadIO.__init__(self, ws, path)
128+
BytesIO.__init__(self)
129+
130+
131+
class _TextUploadIO(_UploadIO, StringIO): # type: ignore
132+
def __init__(self, ws: WorkspaceClient, path: str):
133+
_UploadIO.__init__(self, ws, path)
134+
StringIO.__init__(self)
135+
136+
137+
class WorkspacePath(Path):
138+
"""Experimental implementation of pathlib.Path for Databricks Workspace."""
139+
140+
_ws: WorkspaceClient
141+
_flavour: _DatabricksFlavour
142+
_accessor: _DatabricksAccessor
143+
144+
cwd = _na('cwd')
145+
resolve = _na('resolve')
146+
stat = _na('stat')
147+
chmod = _na('chmod')
148+
lchmod = _na('lchmod')
149+
lstat = _na('lstat')
150+
owner = _na('owner')
151+
group = _na('group')
152+
readlink = _na('readlink')
153+
symlink_to = _na('symlink_to')
154+
hardlink_to = _na('hardlink_to')
155+
touch = _na('touch')
156+
link_to = _na('link_to')
157+
samefile = _na('samefile')
158+
159+
def __new__(cls, ws: WorkspaceClient, path: str | Path):
160+
this = object.__new__(cls)
161+
# pathlib does a lot of clever performance tricks, and it's not designed to be subclassed,
162+
# so we need to set the attributes directly, bypassing the most of a common sense.
163+
this._flavour = _DatabricksFlavour(ws)
164+
drv, root, parts = this._parse_args([path])
165+
return this.__from_raw_parts(this, ws, this._flavour, drv, root, parts)
166+
167+
@staticmethod
168+
def __from_raw_parts(this, ws: WorkspaceClient, flavour: _DatabricksFlavour, drv, root, parts) -> 'WorkspacePath':
169+
# pylint: disable=protected-access
170+
this._accessor = _DatabricksAccessor(ws)
171+
this._flavour = flavour
172+
this._drv = drv
173+
this._root = root
174+
this._parts = parts
175+
this._ws = ws
176+
return this
177+
178+
def _make_child_relpath(self, part):
179+
# used in dir walking
180+
path = self._flavour.join(self._parts + [part])
181+
return WorkspacePath(self._ws, path)
182+
183+
def _parse_args(self, args): # pylint: disable=arguments-differ
184+
# instance method adapted from pathlib.Path
185+
parts = []
186+
for a in args:
187+
if isinstance(a, pathlib.PurePath):
188+
parts += a._parts # pylint: disable=protected-access
189+
continue
190+
parts.append(str(a))
191+
return self._flavour.parse_parts(parts)
192+
193+
def _format_parsed_parts(self, drv, root, parts): # pylint: disable=arguments-differ
194+
# instance method adapted from pathlib.Path
195+
if drv or root:
196+
return drv + root + self._flavour.join(parts[1:])
197+
return self._flavour.join(parts)
198+
199+
def _from_parsed_parts(self, drv, root, parts): # pylint: disable=arguments-differ
200+
# instance method adapted from pathlib.Path
201+
this = object.__new__(self.__class__)
202+
return self.__from_raw_parts(this, self._ws, self._flavour, drv, root, parts)
203+
204+
def _from_parts(self, args): # pylint: disable=arguments-differ
205+
# instance method adapted from pathlib.Path
206+
drv, root, parts = self._parse_args(args)
207+
return self._from_parsed_parts(drv, root, parts)
208+
209+
def relative_to(self, *other) -> pathlib.PurePath: # type: ignore
210+
"""Databricks Workspace works only with absolute paths, so we make sure to
211+
return pathlib.Path instead of WorkspacePath to avoid confusion."""
212+
return pathlib.PurePath(super().relative_to(*other))
213+
214+
def as_fuse(self):
215+
"""Return FUSE-mounted path in Databricks Runtime."""
216+
if 'DATABRICKS_RUNTIME_VERSION' not in os.environ:
217+
logger.warning("This method is only available in Databricks Runtime")
218+
return Path('/Workspace', self.as_posix())
219+
220+
def home(self): # pylint: disable=arguments-differ
221+
# instance method adapted from pathlib.Path
222+
return WorkspacePath(self._ws, "~").expanduser()
223+
224+
def exists(self, *, follow_symlinks=True):
225+
if not follow_symlinks:
226+
raise NotImplementedError("follow_symlinks=False is not supported for Databricks Workspace")
227+
try:
228+
self._ws.workspace.get_status(self.as_posix())
229+
return True
230+
except NotFound:
231+
return False
232+
233+
def mkdir(self, mode=0o600, parents=True, exist_ok=True):
234+
if not exist_ok:
235+
raise ValueError("exist_ok must be True for Databricks Workspace")
236+
if not parents:
237+
raise ValueError("parents must be True for Databricks Workspace")
238+
if mode != 0o600:
239+
raise ValueError("other modes than 0o600 are not yet supported")
240+
self._ws.workspace.mkdirs(self.as_posix())
241+
242+
def rmdir(self, recursive=False):
243+
self._ws.workspace.delete(self.as_posix(), recursive=recursive)
244+
245+
def rename(self, target, overwrite=False):
246+
dst = WorkspacePath(self._ws, target)
247+
with self._ws.workspace.download(self.as_posix(), format=ExportFormat.AUTO) as f:
248+
self._ws.workspace.upload(dst.as_posix(), f.read(), format=ImportFormat.AUTO, overwrite=overwrite)
249+
self.unlink()
250+
251+
def replace(self, target):
252+
return self.rename(target, overwrite=True)
253+
254+
def unlink(self, missing_ok=False):
255+
if not missing_ok and not self.exists():
256+
raise FileNotFoundError(f"{self.as_posix()} does not exist")
257+
self._ws.workspace.delete(self.as_posix())
258+
259+
def open(self, mode="r", buffering=-1, encoding=None, errors=None, newline=None):
260+
if encoding is None or encoding == "locale":
261+
encoding = locale.getpreferredencoding(False)
262+
if "b" in mode and "r" in mode:
263+
return self._ws.workspace.download(self.as_posix(), format=ExportFormat.AUTO)
264+
if "b" in mode and "w" in mode:
265+
return _BinaryUploadIO(self._ws, self.as_posix())
266+
if "r" in mode:
267+
with self._ws.workspace.download(self.as_posix(), format=ExportFormat.AUTO) as f:
268+
return StringIO(f.read().decode(encoding))
269+
if "w" in mode:
270+
return _TextUploadIO(self._ws, self.as_posix())
271+
raise ValueError(f"invalid mode: {mode}")
272+
273+
@cached_property
274+
def _object_info(self) -> ObjectInfo:
275+
# this method is cached because it is used in multiple is_* methods.
276+
# DO NOT use this method in methods, where fresh result is required.
277+
return self._ws.workspace.get_status(self.as_posix())
278+
279+
def _return_false(self) -> bool:
280+
return False
281+
282+
is_symlink = _return_false
283+
is_block_device = _return_false
284+
is_char_device = _return_false
285+
is_fifo = _return_false
286+
is_socket = _return_false
287+
is_mount = _return_false
288+
is_junction = _return_false
289+
290+
def is_dir(self):
291+
return self._object_info.object_type == ObjectType.DIRECTORY
292+
293+
def is_file(self):
294+
return self._object_info.object_type == ObjectType.FILE
295+
296+
def is_notebook(self):
297+
return self._object_info.object_type == ObjectType.NOTEBOOK

tests/integration/mixins/__init__.py

Whitespace-only changes.
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
from pathlib import Path
2+
3+
import pytest
4+
from databricks.sdk.errors import BadRequest
5+
6+
from databricks.labs.ucx.mixins.wspath import WorkspacePath
7+
8+
9+
def test_exists(ws):
10+
wsp = WorkspacePath(ws, "/Users")
11+
assert wsp.exists()
12+
13+
14+
def test_mkdirs(ws, make_random):
15+
name = make_random()
16+
wsp = WorkspacePath(ws, f"~/{name}/foo/bar/baz")
17+
assert not wsp.is_absolute()
18+
19+
with pytest.raises(NotImplementedError):
20+
wsp.absolute()
21+
22+
with_user = wsp.expanduser()
23+
with_user.mkdir()
24+
25+
home = WorkspacePath(ws, "~").expanduser()
26+
relative_name = with_user.relative_to(home)
27+
assert relative_name.as_posix() == f'{name}/foo/bar/baz'
28+
29+
assert with_user.is_absolute()
30+
assert with_user.absolute() == with_user
31+
assert with_user.as_fuse() == Path('/Workspace') / with_user.as_posix()
32+
33+
user_name = ws.current_user.me().user_name
34+
browser_uri = f'{ws.config.host}#workspace/Users/{user_name.replace("@", "%40")}/{name}/foo/bar/baz'
35+
assert with_user.as_uri() == browser_uri
36+
37+
wsp_check = WorkspacePath(ws, f"/Users/{user_name}/{name}/foo/bar/baz")
38+
assert wsp_check.is_dir()
39+
40+
with pytest.raises(BadRequest):
41+
wsp_check.parent.rmdir()
42+
wsp_check.parent.rmdir(recursive=True)
43+
44+
assert not wsp_check.exists()
45+
46+
47+
def test_open_text_io(ws, make_random):
48+
name = make_random()
49+
wsp = WorkspacePath(ws, f"~/{name}/a/b/c")
50+
with_user = wsp.expanduser()
51+
with_user.mkdir(parents=True)
52+
53+
hello_txt = with_user / "hello.txt"
54+
hello_txt.write_text("Hello, World!")
55+
assert hello_txt.read_text() == 'Hello, World!'
56+
57+
files = list(with_user.glob("**/*.txt"))
58+
assert len(files) == 1
59+
assert hello_txt == files[0]
60+
assert files[0].name == 'hello.txt'
61+
62+
with_user.joinpath("hello.txt").unlink()
63+
64+
assert not hello_txt.exists()
65+
66+
67+
def test_open_binary_io(ws, make_random):
68+
name = make_random()
69+
wsp = WorkspacePath(ws, f"~/{name}")
70+
with_user = wsp.expanduser()
71+
with_user.mkdir(parents=True)
72+
73+
hello_bin = with_user.joinpath("hello.bin")
74+
hello_bin.write_bytes(b"Hello, World!")
75+
76+
assert hello_bin.read_bytes() == b'Hello, World!'
77+
78+
with_user.joinpath("hello.bin").unlink()
79+
80+
assert not hello_bin.exists()
81+
82+
83+
def test_replace(ws, make_random):
84+
name = make_random()
85+
wsp = WorkspacePath(ws, f"~/{name}")
86+
with_user = wsp.expanduser()
87+
with_user.mkdir(parents=True)
88+
89+
hello_txt = with_user / "hello.txt"
90+
hello_txt.write_text("Hello, World!")
91+
92+
hello_txt.replace(with_user / "hello2.txt")
93+
94+
assert not hello_txt.exists()
95+
assert (with_user / "hello2.txt").read_text() == 'Hello, World!'

0 commit comments

Comments
 (0)