|
15 | 15 | import pprint |
16 | 16 | import shutil |
17 | 17 | import tempfile |
| 18 | +import typing |
18 | 19 |
|
19 | 20 | import concurrent.futures |
20 | 21 |
|
@@ -621,77 +622,14 @@ def download_file_async(mc, project_dir, file_path, output_file, version): |
621 | 622 | Starts background download project file at specified version. |
622 | 623 | Returns handle to the pending download. |
623 | 624 | """ |
624 | | - mp = MerginProject(project_dir) |
625 | | - project_path = mp.project_full_name() |
626 | | - ver_info = f"at version {version}" if version is not None else "at latest version" |
627 | | - mp.log.info(f"Getting {file_path} {ver_info}") |
628 | | - latest_proj_info = mc.project_info(project_path) |
629 | | - if version: |
630 | | - project_info = mc.project_info(project_path, version=version) |
631 | | - else: |
632 | | - project_info = latest_proj_info |
633 | | - mp.log.info(f"Got project info. version {project_info['version']}") |
634 | | - |
635 | | - # set temporary directory for download |
636 | | - temp_dir = tempfile.mkdtemp(prefix="mergin-py-client-") |
637 | | - |
638 | | - download_list = [] |
639 | | - update_tasks = [] |
640 | | - total_size = 0 |
641 | | - # None can not be used to indicate latest version of the file, so |
642 | | - # it is necessary to pass actual version. |
643 | | - if version is None: |
644 | | - version = latest_proj_info["version"] |
645 | | - for file in project_info["files"]: |
646 | | - if file["path"] == file_path: |
647 | | - file["version"] = version |
648 | | - items = _download_items(file, temp_dir) |
649 | | - is_latest_version = version == latest_proj_info["version"] |
650 | | - task = UpdateTask(file["path"], items, output_file, latest_version=is_latest_version) |
651 | | - download_list.extend(task.download_queue_items) |
652 | | - for item in task.download_queue_items: |
653 | | - total_size += item.size |
654 | | - update_tasks.append(task) |
655 | | - break |
656 | | - if not download_list: |
657 | | - warn = f"No {file_path} exists at version {version}" |
658 | | - mp.log.warning(warn) |
659 | | - shutil.rmtree(temp_dir) |
660 | | - raise ClientError(warn) |
661 | | - |
662 | | - mp.log.info(f"will download file {file_path} in {len(download_list)} chunks, total size {total_size}") |
663 | | - job = DownloadJob(project_path, total_size, version, update_tasks, download_list, temp_dir, mp, project_info) |
664 | | - job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) |
665 | | - job.futures = [] |
666 | | - for item in download_list: |
667 | | - future = job.executor.submit(_do_download, item, mc, mp, project_path, job) |
668 | | - job.futures.append(future) |
669 | | - |
670 | | - return job |
| 625 | + return download_files_async(mc, project_dir, [file_path], [output_file], version) |
671 | 626 |
|
672 | 627 |
|
673 | 628 | def download_file_finalize(job): |
674 | 629 | """ |
675 | 630 | To be called when download_file_async is finished |
676 | 631 | """ |
677 | | - job.executor.shutdown(wait=True) |
678 | | - |
679 | | - # make sure any exceptions from threads are not lost |
680 | | - for future in job.futures: |
681 | | - if future.exception() is not None: |
682 | | - raise future.exception() |
683 | | - |
684 | | - job.mp.log.info("--- download finished") |
685 | | - |
686 | | - temp_dir = None |
687 | | - for task in job.update_tasks: |
688 | | - task.apply(job.directory, job.mp) |
689 | | - if task.download_queue_items: |
690 | | - temp_dir = os.path.dirname(task.download_queue_items[0].download_file_path) |
691 | | - |
692 | | - # Remove temporary download directory |
693 | | - if temp_dir is not None: |
694 | | - shutil.rmtree(temp_dir) |
| 632 | + download_files_finalize(job) |
695 | 633 |
|
696 | 634 |
|
697 | 635 | def download_diffs_async(mc, project_directory, file_path, versions): |
@@ -804,3 +742,103 @@ def download_diffs_finalize(job): |
804 | 742 |
|
805 | 743 | job.mp.log.info("--- diffs pull finished") |
806 | 744 | return diffs |
| 745 | + |
| 746 | + |
| 747 | +def download_files_async( |
| 748 | + mc, project_dir: str, file_paths: typing.List[str], output_paths: typing.List[str], version: str |
| 749 | +): |
| 750 | + """ |
| 751 | + Starts background download project files at specified version. |
| 752 | + Returns handle to the pending download. |
| 753 | + """ |
| 754 | + mp = MerginProject(project_dir) |
| 755 | + project_path = mp.project_full_name() |
| 756 | + ver_info = f"at version {version}" if version is not None else "at latest version" |
| 757 | + mp.log.info(f"Getting [{', '.join(file_paths)}] {ver_info}") |
| 758 | + latest_proj_info = mc.project_info(project_path) |
| 759 | + if version: |
| 760 | + project_info = mc.project_info(project_path, version=version) |
| 761 | + else: |
| 762 | + project_info = latest_proj_info |
| 763 | + mp.log.info(f"Got project info. version {project_info['version']}") |
| 764 | + |
| 765 | + # set temporary directory for download |
| 766 | + temp_dir = tempfile.mkdtemp(prefix="mergin-py-client-") |
| 767 | + |
| 768 | + if output_paths is None: |
| 769 | + output_paths = [] |
| 770 | + for file in file_paths: |
| 771 | + output_paths.append(mp.fpath(file)) |
| 772 | + |
| 773 | + if len(output_paths) != len(file_paths): |
| 774 | + warn = "Output file paths are not of the same length as file paths. Cannot store required files." |
| 775 | + mp.log.warning(warn) |
| 776 | + shutil.rmtree(temp_dir) |
| 777 | + raise ClientError(warn) |
| 778 | + |
| 779 | + download_list = [] |
| 780 | + update_tasks = [] |
| 781 | + total_size = 0 |
| 782 | + # None can not be used to indicate latest version of the file, so |
| 783 | + # it is necessary to pass actual version. |
| 784 | + if version is None: |
| 785 | + version = latest_proj_info["version"] |
| 786 | + for file in project_info["files"]: |
| 787 | + if file["path"] in file_paths: |
| 788 | + index = file_paths.index(file["path"]) |
| 789 | + file["version"] = version |
| 790 | + items = _download_items(file, temp_dir) |
| 791 | + is_latest_version = version == latest_proj_info["version"] |
| 792 | + task = UpdateTask(file["path"], items, output_paths[index], latest_version=is_latest_version) |
| 793 | + download_list.extend(task.download_queue_items) |
| 794 | + for item in task.download_queue_items: |
| 795 | + total_size += item.size |
| 796 | + update_tasks.append(task) |
| 797 | + |
| 798 | + missing_files = [] |
| 799 | + files_to_download = [] |
| 800 | + project_file_paths = [file["path"] for file in project_info["files"]] |
| 801 | + for file in file_paths: |
| 802 | + if file not in project_file_paths: |
| 803 | + missing_files.append(file) |
| 804 | + else: |
| 805 | + files_to_download.append(file) |
| 806 | + |
| 807 | + if not download_list or missing_files: |
| 808 | + warn = f"No [{', '.join(missing_files)}] exists at version {version}" |
| 809 | + mp.log.warning(warn) |
| 810 | + shutil.rmtree(temp_dir) |
| 811 | + raise ClientError(warn) |
| 812 | + |
| 813 | + mp.log.info( |
| 814 | + f"will download files [{', '.join(files_to_download)}] in {len(download_list)} chunks, total size {total_size}" |
| 815 | + ) |
| 816 | + job = DownloadJob(project_path, total_size, version, update_tasks, download_list, temp_dir, mp, project_info) |
| 817 | + job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) |
| 818 | + job.futures = [] |
| 819 | + for item in download_list: |
| 820 | + future = job.executor.submit(_do_download, item, mc, mp, project_path, job) |
| 821 | + job.futures.append(future) |
| 822 | + |
| 823 | + return job |
| 824 | + |
| 825 | + |
| 826 | +def download_files_finalize(job): |
| 827 | + """ |
| 828 | + To be called when download_file_async is finished |
| 829 | + """ |
| 830 | + job.executor.shutdown(wait=True) |
| 831 | + |
| 832 | + # make sure any exceptions from threads are not lost |
| 833 | + for future in job.futures: |
| 834 | + if future.exception() is not None: |
| 835 | + raise future.exception() |
| 836 | + |
| 837 | + job.mp.log.info("--- download finished") |
| 838 | + |
| 839 | + for task in job.update_tasks: |
| 840 | + task.apply(job.directory, job.mp) |
| 841 | + |
| 842 | + # Remove temporary download directory |
| 843 | + if job.directory is not None and os.path.exists(job.directory): |
| 844 | + shutil.rmtree(job.directory) |
0 commit comments