|
2 | 2 | import json |
3 | 3 | import logging |
4 | 4 | from typing import Dict, List, Optional, Tuple, Union, Any |
| 5 | +from datetime import datetime |
5 | 6 |
|
6 | 7 | import os |
7 | 8 | import tornado |
@@ -274,7 +275,6 @@ async def new_file(self, drive_name, path, is_dir): |
274 | 275 | try: |
275 | 276 | # eliminate leading and trailing backslashes |
276 | 277 | path = path.strip('/') |
277 | | - print('isDir: ', is_dir) |
278 | 278 |
|
279 | 279 | if is_dir == False or self._config.provider != 's3': |
280 | 280 | # TO DO: switch to mode "created", which is not implemented yet |
@@ -367,18 +367,39 @@ async def rename_file(self, drive_name, path, new_path): |
367 | 367 | new_path: path of new file name |
368 | 368 | """ |
369 | 369 | data = {} |
| 370 | + finished = False |
370 | 371 | try: |
371 | 372 | # eliminate leading and trailing backslashes |
372 | 373 | path = path.strip('/') |
373 | | - |
374 | | - await obs.rename_async(self._content_managers[drive_name]["store"], path, new_path) |
375 | | - metadata = await obs.head_async(self._content_managers[drive_name]["store"], new_path) |
376 | 374 |
|
377 | | - data = { |
378 | | - "path": new_path, |
379 | | - "last_modified": metadata["last_modified"].isoformat(), |
380 | | - "size": metadata["size"] |
381 | | - } |
| 375 | + # get list of contents with given prefix (path) |
| 376 | + stream = obs.list(self._content_managers[drive_name]["store"], path, chunk_size=100, return_arrow=True) |
| 377 | + async for batch in stream: |
| 378 | + contents_list = pyarrow.record_batch(batch).to_pylist() |
| 379 | + # rename each object within directory |
| 380 | + for object in contents_list: |
| 381 | + finished = True |
| 382 | + remaining_path = object["path"][len(path)+1:] |
| 383 | + old_path = path if remaining_path == '' else os.path.join(path, remaining_path) |
| 384 | + formatted_new_path = new_path if remaining_path == '' else os.path.join(new_path, remaining_path) |
| 385 | + try: |
| 386 | + await obs.rename_async(self._content_managers[drive_name]["store"], old_path, formatted_new_path) |
| 387 | + except Exception as e: |
| 388 | + # we are dealing with a directory rename in S3 and obstore doesn't find the object |
| 389 | + if self._config.provider == 's3': |
| 390 | + self._rename_directory(drive_name, old_path, formatted_new_path) |
| 391 | + else: |
| 392 | + raise tornado.web.HTTPError( |
| 393 | + status_code= httpx.codes.BAD_REQUEST, |
| 394 | + reason=f"The following error occured when renaming the object: {e}", |
| 395 | + ) |
| 396 | + |
| 397 | + # no extra S3 directories to rename |
| 398 | + if data == {} and finished == False: |
| 399 | + # rename single file from root(won't be listed above) |
| 400 | + await obs.rename_async(self._content_managers[drive_name]["store"], path, new_path) |
| 401 | + |
| 402 | + data = await self._get_metadata(drive_name, new_path) |
382 | 403 | except Exception as e: |
383 | 404 | raise tornado.web.HTTPError( |
384 | 405 | status_code= httpx.codes.BAD_REQUEST, |
@@ -573,13 +594,69 @@ async def _delete_directories(self, drive_name, path): |
573 | 594 | self._s3_clients[location].delete_object(Bucket=drive_name, Key=path+'/') |
574 | 595 |
|
575 | 596 | except Exception as e: |
576 | | - raise tornado.web.HTTPError( |
| 597 | + raise tornado.web.HTTPError( |
577 | 598 | status_code= httpx.codes.BAD_REQUEST, |
578 | 599 | reason=f"The following error occured when deleting the directory: {e}", |
579 | 600 | ) |
580 | 601 |
|
581 | 602 | return |
582 | 603 |
|
| 604 | + def _rename_directory(self, drive_name, path, new_path): |
| 605 | + """Helping function to rename directories, when dealing with S3 buckets. |
| 606 | +
|
| 607 | + Args: |
| 608 | + drive_name: name of drive where to create object |
| 609 | + path: path of object |
| 610 | + new_path: new path of object |
| 611 | + """ |
| 612 | + try: |
| 613 | + location = self._content_managers[drive_name]["location"] |
| 614 | + if location not in self._s3_clients: |
| 615 | + self._s3_clients[location] = self._s3_session.client('s3', location) |
| 616 | + |
| 617 | + self._s3_clients[location].copy_object(Bucket=drive_name, CopySource=os.path.join(drive_name, path)+'/', Key=new_path + '/') |
| 618 | + self._s3_clients[location].delete_object(Bucket=drive_name, Key = path + '/') |
| 619 | + except Exception: |
| 620 | + # object is not found if we are not dealing with directory |
| 621 | + pass |
| 622 | + |
| 623 | + return |
| 624 | + |
| 625 | + async def _get_metadata(self, drive_name, path): |
| 626 | + """Helping function to get metadata of object. |
| 627 | +
|
| 628 | + Args: |
| 629 | + drive_name: name of drive where to create object |
| 630 | + path: path of object |
| 631 | + """ |
| 632 | + try: |
| 633 | + metadata = await obs.head_async(self._content_managers[drive_name]["store"], path) |
| 634 | + data = { |
| 635 | + "path": path, |
| 636 | + "last_modified": metadata["last_modified"].isoformat(), |
| 637 | + "size": metadata["size"] |
| 638 | + } |
| 639 | + except Exception: |
| 640 | + try: |
| 641 | + location = self._content_managers[drive_name]["location"] |
| 642 | + if location not in self._s3_clients: |
| 643 | + self._s3_clients[location] = self._s3_session.client('s3', location) |
| 644 | + |
| 645 | + metadata = self._s3_clients[location].head_object(Bucket=drive_name, Key=path + '/') |
| 646 | + data = { |
| 647 | + "path": path, |
| 648 | + "last_modified": metadata["last_modified"].isoformat(), |
| 649 | + "size": metadata["size"] |
| 650 | + } |
| 651 | + except Exception: |
| 652 | + data = { |
| 653 | + "path": path, |
| 654 | + "last_modified": datetime.now().isoformat(), |
| 655 | + "size": 0 |
| 656 | + } |
| 657 | + |
| 658 | + return data |
| 659 | + |
583 | 660 | async def _call_provider( |
584 | 661 | self, |
585 | 662 | url: str, |
|
0 commit comments