diff --git a/singlestoredb/__init__.py b/singlestoredb/__init__.py index 5700d9729..155ac24b3 100644 --- a/singlestoredb/__init__.py +++ b/singlestoredb/__init__.py @@ -25,7 +25,7 @@ DataError, ManagementError, ) from .management import ( - manage_cluster, manage_workspaces, + manage_cluster, manage_workspaces, manage_files, ) from .types import ( Date, Time, Timestamp, DateFromTicks, TimeFromTicks, TimestampFromTicks, diff --git a/singlestoredb/management/__init__.py b/singlestoredb/management/__init__.py index 3a4deeb68..0f4887fcb 100644 --- a/singlestoredb/management/__init__.py +++ b/singlestoredb/management/__init__.py @@ -1,5 +1,6 @@ #!/usr/bin/env python from .cluster import manage_cluster +from .files import manage_files from .manager import get_token from .workspace import get_organization from .workspace import get_secret diff --git a/singlestoredb/management/files.py b/singlestoredb/management/files.py index 1f60592a3..48ea7e5a2 100644 --- a/singlestoredb/management/files.py +++ b/singlestoredb/management/files.py @@ -16,12 +16,18 @@ from typing import TextIO from typing import Union +from .. import config from ..exceptions import ManagementError +from .manager import Manager from .utils import PathLike from .utils import to_datetime from .utils import vars_to_str +PERSONAL_SPACE = 'personal' +SHARED_SPACE = 'shared' + + class FilesObject(object): """ File / folder object. @@ -29,7 +35,8 @@ class FilesObject(object): It can belong to either a workspace stage or personal/shared space. This object is not instantiated directly. It is used in the results - of various operations in ``WorkspaceGroup.stage`` methods. + of various operations in ``WorkspaceGroup.stage``, ``FilesManager.personal_space`` + and ``FilesManager.shared_space`` methods. """ @@ -463,3 +470,577 @@ def __str__(self) -> str: @abstractmethod def __repr__(self) -> str: pass + + +class FilesManager(Manager): + """ + SingleStoreDB files manager. + + This class should be instantiated using :func:`singlestoredb.manage_files`. + + Parameters + ---------- + access_token : str, optional + The API key or other access token for the files management API + version : str, optional + Version of the API to use + base_url : str, optional + Base URL of the files management API + + See Also + -------- + :func:`singlestoredb.manage_files` + + """ + + #: Management API version if none is specified. + default_version = config.get_option('management.version') + + #: Base URL if none is specified. + default_base_url = config.get_option('management.base_url') + + #: Object type + obj_type = 'file' + + @property + def personal_space(self) -> FileSpace: + """Return the personal file space.""" + return FileSpace(PERSONAL_SPACE, self) + + @property + def shared_space(self) -> FileSpace: + """Return the shared file space.""" + return FileSpace(SHARED_SPACE, self) + + +def manage_files( + access_token: Optional[str] = None, + version: Optional[str] = None, + base_url: Optional[str] = None, + *, + organization_id: Optional[str] = None, +) -> FilesManager: + """ + Retrieve a SingleStoreDB files manager. + + Parameters + ---------- + access_token : str, optional + The API key or other access token for the files management API + version : str, optional + Version of the API to use + base_url : str, optional + Base URL of the files management API + organization_id : str, optional + ID of organization, if using a JWT for authentication + + Returns + ------- + :class:`FilesManager` + + """ + return FilesManager( + access_token=access_token, base_url=base_url, + version=version, organization_id=organization_id, + ) + + +class FileSpace(FileLocation): + """ + FileSpace manager. + + This object is not instantiated directly. + It is returned by ``FilesManager.personal_space`` or ``FilesManager.shared_space``. + + """ + + def __init__(self, location: str, manager: FilesManager): + self._location = location + self._manager = manager + + def open( + self, + path: PathLike, + mode: str = 'r', + encoding: Optional[str] = None, + ) -> Union[io.StringIO, io.BytesIO]: + """ + Open a file path for reading or writing. + + Parameters + ---------- + path : Path or str + The file path to read / write + mode : str, optional + The read / write mode. The following modes are supported: + * 'r' open for reading (default) + * 'w' open for writing, truncating the file first + * 'x' create a new file and open it for writing + The data type can be specified by adding one of the following: + * 'b' binary mode + * 't' text mode (default) + encoding : str, optional + The string encoding to use for text + + Returns + ------- + FilesObjectBytesReader - 'rb' or 'b' mode + FilesObjectBytesWriter - 'wb' or 'xb' mode + FilesObjectTextReader - 'r' or 'rt' mode + FilesObjectTextWriter - 'w', 'x', 'wt' or 'xt' mode + + """ + if '+' in mode or 'a' in mode: + raise ManagementError(msg='modifying an existing file is not supported') + + if 'w' in mode or 'x' in mode: + exists = self.exists(path) + if exists: + if 'x' in mode: + raise FileExistsError(f'file path already exists: {path}') + self.remove(path) + if 'b' in mode: + return FilesObjectBytesWriter(b'', self, path) + return FilesObjectTextWriter('', self, path) + + if 'r' in mode: + content = self.download_file(path) + if isinstance(content, bytes): + if 'b' in mode: + return FilesObjectBytesReader(content) + encoding = 'utf-8' if encoding is None else encoding + return FilesObjectTextReader(content.decode(encoding)) + + if isinstance(content, str): + return FilesObjectTextReader(content) + + raise ValueError(f'unrecognized file content type: {type(content)}') + + raise ValueError(f'must have one of create/read/write mode specified: {mode}') + + def upload_file( + self, + local_path: Union[PathLike, TextIO, BinaryIO], + path: PathLike, + *, + overwrite: bool = False, + ) -> FilesObject: + """ + Upload a local file. + + Parameters + ---------- + local_path : Path or str or file-like + Path to the local file or an open file object + path : Path or str + Path to the file + overwrite : bool, optional + Should the ``path`` be overwritten if it exists already? + + """ + if isinstance(local_path, (TextIO, BinaryIO)): + pass + elif not os.path.isfile(local_path): + raise IsADirectoryError(f'local path is not a file: {local_path}') + + if self.exists(path): + if not overwrite: + raise OSError(f'file path already exists: {path}') + + self.remove(path) + + if isinstance(local_path, (TextIO, BinaryIO)): + return self._upload(local_path, path, overwrite=overwrite) + return self._upload(open(local_path, 'rb'), path, overwrite=overwrite) + + # TODO: remove from FileLocation? + def upload_folder( + self, + local_path: PathLike, + path: PathLike, + *, + overwrite: bool = False, + recursive: bool = True, + include_root: bool = False, + ignore: Optional[Union[PathLike, List[PathLike]]] = None, + ) -> FilesObject: + """ + Upload a folder recursively. + + Only the contents of the folder are uploaded. To include the + folder name itself in the target path use ``include_root=True``. + + Parameters + ---------- + local_path : Path or str + Local directory to upload + path : Path or str + Path of folder to upload to + overwrite : bool, optional + If a file already exists, should it be overwritten? + recursive : bool, optional + Should nested folders be uploaded? + include_root : bool, optional + Should the local root folder itself be uploaded as the top folder? + ignore : Path or str or List[Path] or List[str], optional + Glob patterns of files to ignore, for example, '**/*.pyc` will + ignore all '*.pyc' files in the directory tree + + """ + raise ManagementError( + msg='Operation not supported: directories are currently not allowed ' + 'in Files API', + ) + + def _upload( + self, + content: Union[str, bytes, TextIO, BinaryIO], + path: PathLike, + *, + overwrite: bool = False, + ) -> FilesObject: + """ + Upload content to a file. + + Parameters + ---------- + content : str or bytes or file-like + Content to upload + path : Path or str + Path to the file + overwrite : bool, optional + Should the ``path`` be overwritten if it exists already? + + """ + if self.exists(path): + if not overwrite: + raise OSError(f'file path already exists: {path}') + self.remove(path) + + self._manager._put( + f'files/fs/{self._location}/{path}', + files={'file': content}, + headers={'Content-Type': None}, + ) + + return self.info(path) + + # TODO: remove from FileLocation? + def mkdir(self, path: PathLike, overwrite: bool = False) -> FilesObject: + """ + Make a directory in the file space. + + Parameters + ---------- + path : Path or str + Path of the folder to create + overwrite : bool, optional + Should the file path be overwritten if it exists already? + + Returns + ------- + FilesObject + + """ + raise ManagementError( + msg='Operation not supported: directories are currently not allowed ' + 'in Files API', + ) + + mkdirs = mkdir + + def rename( + self, + old_path: PathLike, + new_path: PathLike, + *, + overwrite: bool = False, + ) -> FilesObject: + """ + Move the file to a new location. + + Parameters + ----------- + old_path : Path or str + Original location of the path + new_path : Path or str + New location of the path + overwrite : bool, optional + Should the ``new_path`` be overwritten if it exists already? + + """ + if not self.exists(old_path): + raise OSError(f'file path does not exist: {old_path}') + + if str(old_path).endswith('/') or str(new_path).endswith('/'): + raise ManagementError( + msg='Operation not supported: directories are currently not allowed ' + 'in Files API', + ) + + if self.exists(new_path): + if not overwrite: + raise OSError(f'file path already exists: {new_path}') + + self.remove(new_path) + + self._manager._patch( + f'files/fs/{self._location}/{old_path}', + json=dict(newPath=new_path), + ) + + return self.info(new_path) + + def info(self, path: PathLike) -> FilesObject: + """ + Return information about a file location. + + Parameters + ---------- + path : Path or str + Path to the file + + Returns + ------- + FilesObject + + """ + res = self._manager._get( + re.sub(r'/+$', r'/', f'files/fs/{self._location}/{path}'), + params=dict(metadata=1), + ).json() + + return FilesObject.from_dict(res, self) + + def exists(self, path: PathLike) -> bool: + """ + Does the given file path exist? + + Parameters + ---------- + path : Path or str + Path to file object + + Returns + ------- + bool + + """ + try: + self.info(path) + return True + except ManagementError as exc: + if exc.errno == 404: + return False + raise + + # TODO: remove from FileLocation? + def is_dir(self, path: PathLike) -> bool: + """ + Is the given file path a directory? + + Parameters + ---------- + path : Path or str + Path to file object + + Returns + ------- + bool + + """ + try: + return self.info(path).type == 'directory' + except ManagementError as exc: + if exc.errno == 404: + return False + raise + + # TODO: remove from FileLocation? + def is_file(self, path: PathLike) -> bool: + """ + Is the given file path a file? + + Parameters + ---------- + path : Path or str + Path to file object + + Returns + ------- + bool + + """ + try: + return self.info(path).type != 'directory' + except ManagementError as exc: + if exc.errno == 404: + return False + raise + + def _list_root_dir(self) -> List[str]: + """ + Return the names of files in the root directory. + Parameters + ---------- + """ + res = self._manager._get( + f'files/fs/{self._location}', + ).json() + return [x['path'] for x in res['content'] or []] + + # TODO: remove from FileLocation? + def listdir( + self, + path: PathLike = '/', + *, + recursive: bool = False, + ) -> List[str]: + """ + List the files / folders at the given path. + + Parameters + ---------- + path : Path or str, optional + Path to the file location + + Returns + ------- + List[str] + + """ + if path == '' or path == '/': + return self._list_root_dir() + + raise ManagementError( + msg='Operation not supported: directories are currently not allowed ' + 'in Files API', + ) + + def download_file( + self, + path: PathLike, + local_path: Optional[PathLike] = None, + *, + overwrite: bool = False, + encoding: Optional[str] = None, + ) -> Optional[Union[bytes, str]]: + """ + Download the content of a file path. + + Parameters + ---------- + path : Path or str + Path to the file + local_path : Path or str + Path to local file target location + overwrite : bool, optional + Should an existing file be overwritten if it exists? + encoding : str, optional + Encoding used to convert the resulting data + + Returns + ------- + bytes or str - ``local_path`` is None + None - ``local_path`` is a Path or str + + """ + if local_path is not None and not overwrite and os.path.exists(local_path): + raise OSError('target file already exists; use overwrite=True to replace') + if self.is_dir(path): + raise IsADirectoryError(f'file path is a directory: {path}') + + out = self._manager._get( + f'files/fs/{self._location}/{path}', + ).content + + if local_path is not None: + with open(local_path, 'wb') as outfile: + outfile.write(out) + return None + + if encoding: + return out.decode(encoding) + + return out + + # TODO: remove from FileLocation? + def download_folder( + self, + path: PathLike, + local_path: PathLike = '.', + *, + overwrite: bool = False, + ) -> None: + """ + Download a FileSpace folder to a local directory. + + Parameters + ---------- + path : Path or str + Path to the file + local_path : Path or str + Path to local directory target location + overwrite : bool, optional + Should an existing directory / files be overwritten if they exist? + + """ + raise ManagementError( + msg='Operation not supported: directories are currently not allowed ' + 'in Files API', + ) + + def remove(self, path: PathLike) -> None: + """ + Delete a file location. + + Parameters + ---------- + path : Path or str + Path to the location + + """ + if self.is_dir(path): + raise IsADirectoryError('file path is a directory') + + self._manager._delete(f'files/fs/{self._location}/{path}') + + # TODO: remove from FileLocation? + def removedirs(self, path: PathLike) -> None: + """ + Delete a folder recursively. + + Parameters + ---------- + path : Path or str + Path to the file location + + """ + raise ManagementError( + msg='Operation not supported: directories are currently not allowed ' + 'in Files API', + ) + + # TODO: remove from FileLocation? + def rmdir(self, path: PathLike) -> None: + """ + Delete a folder. + + Parameters + ---------- + path : Path or str + Path to the file location + + """ + raise ManagementError( + msg='Operation not supported: directories are currently not allowed ' + 'in Files API', + ) + + def __str__(self) -> str: + """Return string representation.""" + return vars_to_str(self) + + def __repr__(self) -> str: + """Return string representation.""" + return str(self) diff --git a/singlestoredb/tests/test.ipynb b/singlestoredb/tests/test.ipynb new file mode 100644 index 000000000..5f664f402 --- /dev/null +++ b/singlestoredb/tests/test.ipynb @@ -0,0 +1,18 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Test Notebook" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/singlestoredb/tests/test2.ipynb b/singlestoredb/tests/test2.ipynb new file mode 100644 index 000000000..4991bc6bc --- /dev/null +++ b/singlestoredb/tests/test2.ipynb @@ -0,0 +1,18 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Test Notebook 2" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/singlestoredb/tests/test_management.py b/singlestoredb/tests/test_management.py index 6d6a9b0e9..fabbb1c10 100755 --- a/singlestoredb/tests/test_management.py +++ b/singlestoredb/tests/test_management.py @@ -740,7 +740,7 @@ def test_os_rename(self): 'rename_test_2/nest_1/nested_rename_test_3.sql', overwrite=True, ) - def test_stage_object(self): + def test_file_object(self): st = self.wg.stage st.mkdir('obj_test') @@ -1028,3 +1028,290 @@ def test_job_with_database_target(self): assert deleted job = job_manager.get(job.job_id) assert job.terminated_at is not None + + +@pytest.mark.management +class TestFileSpaces(unittest.TestCase): + + manager = None + personal_space = None + shared_space = None + + @classmethod + def setUpClass(cls): + cls.manager = s2.manage_files() + cls.personal_space = cls.manager.personal_space + cls.shared_space = cls.manager.shared_space + + @classmethod + def tearDownClass(cls): + cls.manager = None + cls.personal_space = None + cls.shared_space = None + + def test_upload_file(self): + for space in [self.personal_space, self.shared_space]: + root = space.info('/') + assert str(root.path) == '/' + assert root.type == 'directory' + + # Upload files + f = space.upload_file( + TEST_DIR / 'test.ipynb', + 'upload_test.ipynb', + ) + assert str(f.path) == 'upload_test.ipynb' + assert f.type == 'notebook' + + if space is self.personal_space: + space.upload_file( + TEST_DIR / 'test.sql', + 'upload_test.sql', + ) + + # Cleanup + space.remove('upload_test.sql') + elif space is self.shared_space: + with self.assertRaises(s2.ManagementError): + space.upload_file( + TEST_DIR / 'test.sql', + 'upload_test.sql', + ) + + # Download and compare to original + txt = f.download(encoding='utf-8') + assert txt == open(TEST_DIR / 'test.ipynb').read() + + # Make sure we can't overwrite + with self.assertRaises(OSError): + space.upload_file( + TEST_DIR / 'test.ipynb', + 'upload_test.ipynb', + ) + + # Force overwrite with new content + f = space.upload_file( + TEST_DIR / 'test2.ipynb', + 'upload_test.ipynb', overwrite=True, + ) + assert str(f.path) == 'upload_test.ipynb' + assert f.type == 'notebook' + + # Verify new content + txt = f.download(encoding='utf-8') + assert txt == open(TEST_DIR / 'test2.ipynb').read() + + # Make sure we can't upload a folder + with self.assertRaises(s2.ManagementError): + space.upload_folder(TEST_DIR, 'test') + + # Cleanup + space.remove('upload_test.ipynb') + + def test_open(self): + for space in [self.personal_space, self.shared_space]: + # See if error is raised for non-existent file + with self.assertRaises(s2.ManagementError): + space.open('open_test.ipynb', 'r') + + # Load test file + space.upload_file(TEST_DIR / 'test.ipynb', 'open_test.ipynb') + + # Read file using `open` + with space.open('open_test.ipynb', 'r') as rfile: + assert rfile.read() == open(TEST_DIR / 'test.ipynb').read() + + # Read file using `open` with 'rt' mode + with space.open('open_test.ipynb', 'rt') as rfile: + assert rfile.read() == open(TEST_DIR / 'test.ipynb').read() + + # Read file using `open` with 'rb' mode + with space.open('open_test.ipynb', 'rb') as rfile: + assert rfile.read() == open(TEST_DIR / 'test.ipynb', 'rb').read() + + # Read file using `open` with 'rb' mode + with self.assertRaises(ValueError): + with space.open('open_test.ipynb', 'b') as rfile: + pass + + # Attempt overwrite file using `open` with mode 'x' + with self.assertRaises(OSError): + with space.open('open_test.ipynb', 'x') as wfile: + pass + + # Attempt overwrite file using `open` with mode 'w' + with space.open('open_test.ipynb', 'w') as wfile: + wfile.write(open(TEST_DIR / 'test2.ipynb').read()) + + txt = space.download_file('open_test.ipynb', encoding='utf-8') + + assert txt == open(TEST_DIR / 'test2.ipynb').read() + + # Test writer without context manager + wfile = space.open('open_raw_test.ipynb', 'w') + for line in open(TEST_DIR / 'test.ipynb'): + wfile.write(line) + wfile.close() + + txt = space.download_file( + 'open_raw_test.ipynb', + encoding='utf-8', + ) + + assert txt == open(TEST_DIR / 'test.ipynb').read() + + # Test reader without context manager + rfile = space.open('open_raw_test.ipynb', 'r') + txt = '' + for line in rfile: + txt += line + rfile.close() + + assert txt == open(TEST_DIR / 'test.ipynb').read() + + # Cleanup + space.remove('open_test.ipynb') + space.remove('open_raw_test.ipynb') + + def test_obj_open(self): + for space in [self.personal_space, self.shared_space]: + # Load test file + f = space.upload_file( + TEST_DIR / 'test.ipynb', + 'obj_open_test.ipynb', + ) + + # Read file using `open` + with f.open() as rfile: + assert rfile.read() == open(TEST_DIR / 'test.ipynb').read() + + # Make sure directories error out + with self.assertRaises(s2.ManagementError): + space.mkdir('obj_open_dir') + + # Write file using `open` + with f.open('w', encoding='utf-8') as wfile: + wfile.write(open(TEST_DIR / 'test2.ipynb').read()) + + assert f.download(encoding='utf-8') == open(TEST_DIR / 'test2.ipynb').read() + + # Test writer without context manager + wfile = f.open('w') + for line in open(TEST_DIR / 'test.ipynb'): + wfile.write(line) + wfile.close() + + txt = space.download_file(f.path, encoding='utf-8') + + assert txt == open(TEST_DIR / 'test.ipynb').read() + + # Test reader without context manager + rfile = f.open('r') + txt = '' + for line in rfile: + txt += line + rfile.close() + + assert txt == open(TEST_DIR / 'test.ipynb').read() + + # Cleanup + space.remove('obj_open_test.ipynb') + + def test_os_directories(self): + for space in [self.personal_space, self.shared_space]: + # Make sure directories error out + with self.assertRaises(s2.ManagementError): + space.mkdir('mkdir_test_1') + + with self.assertRaises(s2.ManagementError): + space.exists('mkdir_test_1/') + + out = space.listdir('/') + assert 'mkdir_test_1/' not in out + + with self.assertRaises(s2.ManagementError): + space.rmdir('mkdir_test_1/') + + def test_os_rename(self): + for space in [self.personal_space, self.shared_space]: + space.upload_file( + TEST_DIR / 'test.ipynb', + 'rename_test.ipynb', + ) + assert 'rename_test.ipynb' in space.listdir('/') + assert 'rename_test_2.ipynb' not in space.listdir('/') + + space.rename( + 'rename_test.ipynb', + 'rename_test_2.ipynb', + ) + assert 'rename_test.ipynb' not in space.listdir('/') + assert 'rename_test_2.ipynb' in space.listdir('/') + + # non-existent file + with self.assertRaises(OSError): + space.rename('rename_foo.ipynb', 'rename_foo_2.ipynb') + + space.upload_file( + TEST_DIR / 'test.ipynb', + 'rename_test_3.ipynb', + ) + + # overwrite + with self.assertRaises(OSError): + space.rename( + 'rename_test_2.ipynb', + 'rename_test_3.ipynb', + ) + + space.rename( + 'rename_test_2.ipynb', + 'rename_test_3.ipynb', overwrite=True, + ) + + # Cleanup + space.remove('rename_test_3.ipynb') + + def test_file_object(self): + for space in [self.personal_space, self.shared_space]: + f = space.upload_file( + TEST_DIR / 'test.ipynb', + 'obj_test.ipynb', + ) + + assert not f.is_dir() + assert f.is_file() + + # abspath / basename / dirname / exists + assert f.abspath() == 'obj_test.ipynb' + assert f.basename() == 'obj_test.ipynb' + assert f.dirname() == '/' + assert f.exists() + + # download + assert f.download(encoding='utf-8') == \ + open(TEST_DIR / 'test.ipynb', 'r').read() + assert f.download() == open(TEST_DIR / 'test.ipynb', 'rb').read() + + assert space.is_file('obj_test.ipynb') + f.remove() + assert not space.is_file('obj_test.ipynb') + + # mtime / ctime + assert f.getmtime() > 0 + assert f.getctime() > 0 + + # rename + f = space.upload_file( + TEST_DIR / 'test.ipynb', + 'obj_test.ipynb', + ) + assert space.exists('obj_test.ipynb') + assert not space.exists('obj_test_2.ipynb') + f.rename('obj_test_2.ipynb') + assert not space.exists('obj_test.ipynb') + assert space.exists('obj_test_2.ipynb') + assert f.abspath() == 'obj_test_2.ipynb' + + # Cleanup + space.remove('obj_test_2.ipynb')