|
1 | 1 | import contextlib |
2 | 2 | import datetime |
| 3 | +import fnmatch |
3 | 4 | import logging |
4 | 5 | import tempfile |
5 | 6 | import urllib.parse |
6 | | -from collections.abc import Coroutine |
| 7 | +from collections.abc import AsyncGenerator, Coroutine |
7 | 8 | from contextlib import suppress |
8 | 9 | from dataclasses import dataclass |
9 | 10 | from pathlib import Path |
@@ -959,6 +960,136 @@ async def search_owned_files( |
959 | 960 | resolved_fmds.append(convert_db_to_model(updated_fmd)) |
960 | 961 | return resolved_fmds |
961 | 962 |
|
| 963 | + def _create_file_metadata_from_s3_object( |
| 964 | + self, s3_obj: S3MetaData, user_id: UserID |
| 965 | + ) -> FileMetaData | None: |
| 966 | + """Create FileMetaData from S3 object, return None if invalid.""" |
| 967 | + try: |
| 968 | + return FileMetaData.from_simcore_node( |
| 969 | + user_id=user_id, |
| 970 | + file_id=TypeAdapter(SimcoreS3FileID).validate_python(s3_obj.object_key), |
| 971 | + bucket=self.simcore_bucket_name, |
| 972 | + location_id=self.get_location_id(), |
| 973 | + location_name=self.get_location_name(), |
| 974 | + sha256_checksum=None, |
| 975 | + file_size=s3_obj.size, |
| 976 | + last_modified=s3_obj.last_modified, |
| 977 | + entity_tag=s3_obj.e_tag, |
| 978 | + ) |
| 979 | + except (ValidationError, ValueError): |
| 980 | + return None |
| 981 | + |
| 982 | + async def _process_s3_page_results( |
| 983 | + self, |
| 984 | + current_page_results: list[FileMetaData], |
| 985 | + ) -> list[FileMetaData]: |
| 986 | + current_page_results.sort( |
| 987 | + key=lambda x: x.last_modified |
| 988 | + or datetime.datetime.min.replace(tzinfo=datetime.UTC), |
| 989 | + reverse=True, |
| 990 | + ) |
| 991 | + |
| 992 | + result_project_ids = list( |
| 993 | + { |
| 994 | + result.project_id |
| 995 | + for result in current_page_results |
| 996 | + if result.project_id is not None |
| 997 | + } |
| 998 | + ) |
| 999 | + |
| 1000 | + if result_project_ids: |
| 1001 | + current_page_results = await _add_frontend_needed_data( |
| 1002 | + get_db_engine(self.app), |
| 1003 | + project_ids=result_project_ids, |
| 1004 | + data=current_page_results, |
| 1005 | + ) |
| 1006 | + |
| 1007 | + return current_page_results |
| 1008 | + |
| 1009 | + async def _search_project_s3_files( |
| 1010 | + self, |
| 1011 | + proj_id: ProjectID, |
| 1012 | + filename_pattern: str, |
| 1013 | + user_id: UserID, |
| 1014 | + items_per_page: NonNegativeInt, |
| 1015 | + ) -> AsyncGenerator[list[FileMetaData], None]: |
| 1016 | + """Search S3 files in a specific project and yield results page by page.""" |
| 1017 | + s3_client = get_s3_client(self.app) |
| 1018 | + min_parts_for_valid_s3_object = 2 |
| 1019 | + current_page_results: list[FileMetaData] = [] |
| 1020 | + |
| 1021 | + try: |
| 1022 | + async for s3_objects in s3_client.list_objects_paginated( |
| 1023 | + bucket=self.simcore_bucket_name, |
| 1024 | + prefix=f"{proj_id}/", |
| 1025 | + items_per_page=items_per_page * 2, |
| 1026 | + ): |
| 1027 | + for s3_obj in s3_objects: |
| 1028 | + filename = Path(s3_obj.object_key).name |
| 1029 | + |
| 1030 | + if ( |
| 1031 | + fnmatch.fnmatch(filename, filename_pattern) |
| 1032 | + and len(s3_obj.object_key.split("/")) |
| 1033 | + >= min_parts_for_valid_s3_object |
| 1034 | + ): |
| 1035 | + file_meta = self._create_file_metadata_from_s3_object( |
| 1036 | + s3_obj, user_id |
| 1037 | + ) |
| 1038 | + if file_meta: |
| 1039 | + current_page_results.append(file_meta) |
| 1040 | + |
| 1041 | + if len(current_page_results) >= items_per_page: |
| 1042 | + processed_results = await self._process_s3_page_results( |
| 1043 | + current_page_results |
| 1044 | + ) |
| 1045 | + yield processed_results |
| 1046 | + current_page_results = [] |
| 1047 | + |
| 1048 | + if current_page_results: |
| 1049 | + processed_results = await self._process_s3_page_results( |
| 1050 | + current_page_results |
| 1051 | + ) |
| 1052 | + yield processed_results |
| 1053 | + |
| 1054 | + except S3KeyNotFoundError: |
| 1055 | + with log_context( |
| 1056 | + _logger, logging.DEBUG, f"Failed to search S3 for project {proj_id}" |
| 1057 | + ): |
| 1058 | + return |
| 1059 | + |
| 1060 | + async def search_files( |
| 1061 | + self, |
| 1062 | + user_id: UserID, |
| 1063 | + *, |
| 1064 | + filename_pattern: str, |
| 1065 | + project_id: ProjectID | None = None, |
| 1066 | + items_per_page: NonNegativeInt = 100, |
| 1067 | + ) -> AsyncGenerator[list[FileMetaData], None]: |
| 1068 | + """ |
| 1069 | + Search for files in S3 using a wildcard pattern for filenames. |
| 1070 | + Returns results as an async generator that yields pages of results. |
| 1071 | +
|
| 1072 | + Args: |
| 1073 | + user_id: The user requesting the search |
| 1074 | + filename_pattern: Wildcard pattern for filename matching (e.g., "*.txt", "test_*.json") |
| 1075 | + project_id: Optional project ID to limit search to specific project |
| 1076 | + items_per_page: Number of items to return per page |
| 1077 | +
|
| 1078 | + Yields: |
| 1079 | + List of FileMetaData objects for each page |
| 1080 | + """ |
| 1081 | + # Validate access rights |
| 1082 | + accessible_projects_ids = await get_accessible_project_ids( |
| 1083 | + get_db_engine(self.app), user_id=user_id, project_id=project_id |
| 1084 | + ) |
| 1085 | + |
| 1086 | + # Search each accessible project |
| 1087 | + for proj_id in accessible_projects_ids: |
| 1088 | + async for page_results in self._search_project_s3_files( |
| 1089 | + proj_id, filename_pattern, user_id, items_per_page |
| 1090 | + ): |
| 1091 | + yield page_results |
| 1092 | + |
962 | 1093 | async def create_soft_link( |
963 | 1094 | self, user_id: int, target_file_id: StorageFileID, link_file_id: StorageFileID |
964 | 1095 | ) -> FileMetaData: |
|
0 commit comments