|
3 | 3 | import os.path |
4 | 4 | import re |
5 | 5 | from pathlib import Path |
6 | | -from typing import Any, NamedTuple, Optional |
| 6 | +from typing import Any, Iterable, NamedTuple, Optional |
7 | 7 |
|
8 | 8 | from sqlalchemy import Column, Integer, cast, func, or_, select |
9 | 9 |
|
@@ -75,24 +75,50 @@ def _get_hid_fo(self, uid: str, root_uid: str | None = None) -> str | None: |
75 | 75 |
|
76 | 76 | # --- "nice list" --- |
77 | 77 |
|
78 | | - def get_data_for_nice_list(self, uid_list: list[str], root_uid: str | None) -> list[dict]: |
| 78 | + def _get_some_path_for_uid_list(self, uid_list: Iterable[str], root_uid: str | None = None) -> dict[str, str]: |
79 | 79 | with self.get_read_only_session() as session: |
80 | | - mime_dict = self._get_mime_types_for_uid_list(session, uid_list) |
81 | | - query = select(FileObjectEntry.uid, FileObjectEntry.size, FileObjectEntry.file_name).filter( |
82 | | - FileObjectEntry.uid.in_(uid_list) |
| 80 | + query = ( |
| 81 | + select(VirtualFilePath.file_uid, VirtualFilePath.file_path) |
| 82 | + .filter(VirtualFilePath.file_uid.in_(uid_list)) |
| 83 | + .distinct(VirtualFilePath.file_uid) # only one path per file |
| 84 | + ) |
| 85 | + if root_uid: # if root_uid is set, only return paths contained in that FW |
| 86 | + query = query.join(fw_files_table, VirtualFilePath.file_uid == fw_files_table.c.file_uid).filter( |
| 87 | + fw_files_table.c.root_uid == root_uid |
| 88 | + ) |
| 89 | + return {uid: path for uid, path in session.execute(query)} # noqa: C416 # dict() does not work here |
| 90 | + |
| 91 | + def get_data_for_nice_list( |
| 92 | + self, uid_list: Iterable[str], root_uid: str | None, include_vfp: bool = True |
| 93 | + ) -> list[dict]: |
| 94 | + with self.get_read_only_session() as session: |
| 95 | + query = ( |
| 96 | + select( |
| 97 | + FileObjectEntry.uid, |
| 98 | + FileObjectEntry.size, |
| 99 | + FileObjectEntry.file_name, |
| 100 | + AnalysisEntry.result.op('->>')('mime'), |
| 101 | + ) |
| 102 | + .filter(FileObjectEntry.uid.in_(uid_list)) |
| 103 | + .outerjoin(AnalysisEntry, AnalysisEntry.uid == FileObjectEntry.uid) |
| 104 | + .filter(AnalysisEntry.plugin == 'file_type') |
83 | 105 | ) |
84 | | - file_tree_data = self.get_file_tree_path_for_uid_list(uid_list, root_uid=root_uid) |
| 106 | + path_dict = self._get_some_path_for_uid_list(uid_list) |
| 107 | + |
85 | 108 | nice_list_data = [ |
86 | 109 | { |
87 | 110 | 'uid': uid, |
88 | 111 | 'size': size, |
89 | | - 'file_name': file_name, |
90 | | - 'mime-type': mime_dict.get(uid, 'file-type-plugin/not-run-yet'), |
91 | | - 'current_virtual_path': file_tree_data[uid], |
| 112 | + 'file_name': path_dict.get(uid, file_name), |
| 113 | + 'mime-type': mime or 'N/A', |
92 | 114 | } |
93 | | - for uid, size, file_name in session.execute(query) |
| 115 | + for uid, size, file_name, mime in session.execute(query) |
94 | 116 | ] |
95 | | - self._replace_uids_in_nice_list(nice_list_data, root_uid) |
| 117 | + if include_vfp: |
| 118 | + file_tree_data = self.get_file_tree_path_for_uid_list(uid_list, root_uid=root_uid) |
| 119 | + for dict_ in nice_list_data: |
| 120 | + dict_['current_virtual_path'] = file_tree_data.get(dict_['uid'], [[dict_['file_name']]]) |
| 121 | + self._replace_uids_in_nice_list(nice_list_data, root_uid) |
96 | 122 | return nice_list_data |
97 | 123 |
|
98 | 124 | def _replace_uids_in_nice_list(self, nice_list_data: list[dict], root_uid: str): |
|
0 commit comments