|
3 | 3 | import logging |
4 | 4 | from typing import Dict, List, Optional, Tuple, Union, Any |
5 | 5 |
|
| 6 | +import os |
6 | 7 | import tornado |
7 | 8 | import httpx |
8 | 9 | import traitlets |
|
11 | 12 | import obstore as obs |
12 | 13 | from libcloud.storage.types import Provider |
13 | 14 | from libcloud.storage.providers import get_driver |
| 15 | +import pyarrow |
14 | 16 |
|
15 | 17 | from .log import get_logger |
16 | 18 | from .base import DrivesConfig |
@@ -153,14 +155,63 @@ async def unmount_drive(self, drive_name: str): |
153 | 155 |
|
154 | 156 | return |
155 | 157 |
|
156 | | - async def get_contents(self, drive_name, path, **kwargs): |
| 158 | + async def get_contents(self, drive_name, path): |
157 | 159 | """Get contents of a file or directory. |
158 | 160 |
|
159 | 161 | Args: |
160 | 162 | drive_name: name of drive to get the contents of |
161 | | - path: path to file or directory |
| 163 | + path: path to file or directory (empty string for root listing) |
162 | 164 | """ |
163 | | - print('Get contents function called.') |
| 165 | + print('!!!!!!!!!!!!!!!!!!!', drive_name, 'path: ', path) |
| 166 | + if path == '/': |
| 167 | + path = '' |
| 168 | + drive_name = 'jupyter-drives-test-bucket-1' |
| 169 | + try : |
| 170 | + currentObject = os.path.basename(path) if os.path.basename(path) is not None else '' |
| 171 | + print('currentObject: ', currentObject) |
| 172 | + # check if we are listing contents of a directory |
| 173 | + if currentObject.find('.') == -1: |
| 174 | + print('in if') |
| 175 | + print('store: ', self._content_managers) |
| 176 | + data = [] |
| 177 | + # using Arrow lists as they are recommended for large results |
| 178 | + # sream will be an async iterable of RecordBatch |
| 179 | + stream = obs.list(self._content_managers[drive_name], path, chunk_size=100, return_arrow=True) |
| 180 | + async for batch in stream: |
| 181 | + contents_list = pyarrow.record_batch(batch).to_pylist() |
| 182 | + for object in contents_list: |
| 183 | + data.append({ |
| 184 | + "path": object["path"], |
| 185 | + "last_modified": object["last_modified"].isoformat(), |
| 186 | + "size": object["size"], |
| 187 | + }) |
| 188 | + else: |
| 189 | + content = b"" |
| 190 | + # retrieve contents of object |
| 191 | + obj = await obs.get_async(self._content_managers[drive_name], path) |
| 192 | + stream = obj.stream(min_chunk_size=5 * 1024 * 1024) # 5MB sized chunks |
| 193 | + async for buf in stream: |
| 194 | + content += buf |
| 195 | + |
| 196 | + # retrieve metadata of object |
| 197 | + metadata = await obs.head_async(self._content_managers[drive_name], path) |
| 198 | + data = { |
| 199 | + "path": path, |
| 200 | + "content": content, |
| 201 | + "last_modified": metadata["last_modified"].isoformat(), |
| 202 | + "size": metadata["size"] |
| 203 | + } |
| 204 | + print(data) |
| 205 | + response = { |
| 206 | + "data": data |
| 207 | + } |
| 208 | + except Exception as e: |
| 209 | + raise tornado.web.HTTPError( |
| 210 | + status_code= httpx.codes.BAD_REQUEST, |
| 211 | + reason=f"The following error occured when retrieving the contents: {e}", |
| 212 | + ) |
| 213 | + |
| 214 | + return response |
164 | 215 |
|
165 | 216 | async def new_file(self, drive_name, path, **kwargs): |
166 | 217 | """Create a new file or directory at the given path. |
|
0 commit comments