diff --git a/packages/aws-library/src/aws_library/s3/_client.py b/packages/aws-library/src/aws_library/s3/_client.py index b51ba3a9632..389cf867599 100644 --- a/packages/aws-library/src/aws_library/s3/_client.py +++ b/packages/aws-library/src/aws_library/s3/_client.py @@ -3,6 +3,7 @@ import functools import logging import urllib.parse +from collections import deque from collections.abc import AsyncGenerator, Sequence from dataclasses import dataclass, field from pathlib import Path @@ -317,6 +318,60 @@ async def _list_all_objects( for obj in s3_objects: yield obj + @s3_exception_handler_async_gen(_logger) + async def list_entries_paginated( + self, + bucket: S3BucketName, + prefix: str, + *, + items_per_page: int = _MAX_ITEMS_PER_PAGE, + ) -> AsyncGenerator[list[S3MetaData | S3DirectoryMetaData], None]: + """Breadth-first recursive listing of S3 entries (files + directories). + + Yields: + A list of S3MetaData and S3DirectoryMetaData per page, exploring + directories level by level. + """ + if items_per_page > _AWS_MAX_ITEMS_PER_PAGE: + msg = f"items_per_page must be <= {_AWS_MAX_ITEMS_PER_PAGE}" + raise ValueError(msg) + + paginator = self._client.get_paginator("list_objects_v2") + queue = deque([prefix]) # Breadth-first traversal queue + + while queue: + current_prefix = queue.popleft() + + async for page in paginator.paginate( + Bucket=bucket, + Prefix=current_prefix, + Delimiter=S3_OBJECT_DELIMITER, + PaginationConfig={"PageSize": items_per_page}, + ): + entries: list[S3MetaData | S3DirectoryMetaData] = [] + + # Add subdirectories + for subfolder in page.get("CommonPrefixes", []): + if "Prefix" in subfolder: + sub_prefix = subfolder["Prefix"] + entries.append( + S3DirectoryMetaData.model_construct( + prefix=S3ObjectPrefix(sub_prefix, size=None) + ) + ) + queue.append(sub_prefix) # BFS traversal + + # Add files in the current prefix + entries.extend( + [ + S3MetaData.from_botocore_list_objects(obj) + for obj in page.get("Contents", []) + ] + ) + + if entries: + yield entries + @s3_exception_handler(_logger) async def delete_objects_recursively( self, *, bucket: S3BucketName, prefix: str diff --git a/services/storage/src/simcore_service_storage/api/_worker_tasks/_simcore_s3.py b/services/storage/src/simcore_service_storage/api/_worker_tasks/_simcore_s3.py index 85b3912c022..f7a74bcc100 100644 --- a/services/storage/src/simcore_service_storage/api/_worker_tasks/_simcore_s3.py +++ b/services/storage/src/simcore_service_storage/api/_worker_tasks/_simcore_s3.py @@ -178,6 +178,13 @@ async def search( for item in items ] + _logger.info( + "'%s' pushing %d search results items: %s", + task_key, + len(data), + data, + ) + await app_server.task_manager.push_task_stream_items( task_key, *data, diff --git a/services/storage/src/simcore_service_storage/simcore_s3_dsm.py b/services/storage/src/simcore_service_storage/simcore_s3_dsm.py index 4f274d1b571..534b86c9051 100644 --- a/services/storage/src/simcore_service_storage/simcore_s3_dsm.py +++ b/services/storage/src/simcore_service_storage/simcore_s3_dsm.py @@ -1002,17 +1002,32 @@ async def _search_project_s3_files( try: name_pattern_lower = name_pattern.lower() - async for s3_objects in s3_client.list_objects_paginated( + async for s3_entries in s3_client.list_entries_paginated( bucket=self.simcore_bucket_name, prefix=f"{proj_id}/", items_per_page=500, # fetch larger batches for efficiency ): - for s3_obj in s3_objects: - filename = Path(s3_obj.object_key).name + _logger.info( + "Searching S3 files in project %s with pattern '%s' in batch of %d items: %s", + proj_id, + name_pattern, + len(s3_entries), + s3_entries, + ) + for s3_entry in s3_entries: + is_directory = isinstance(s3_entry, S3DirectoryMetaData) + if is_directory: + filename = Path(s3_entry.prefix).name + else: + filename = Path(s3_entry.object_key).name if not ( fnmatch.fnmatch(filename.lower(), name_pattern_lower) - and len(s3_obj.object_key.split("/")) + and ( + len(s3_entry.object_key.split("/")) + if not is_directory + else len(f"{s3_entry.prefix}".split("/")) + ) >= min_parts_for_valid_s3_object ): continue @@ -1023,30 +1038,31 @@ async def _search_project_s3_files( ) if ( last_modified_from - and s3_obj.last_modified - and s3_obj.last_modified < last_modified_from + and s3_entry.last_modified + and s3_entry.last_modified < last_modified_from ): continue if ( last_modified_until - and s3_obj.last_modified - and s3_obj.last_modified > last_modified_until + and s3_entry.last_modified + and s3_entry.last_modified > last_modified_until ): continue file_meta = FileMetaData.from_simcore_node( user_id=user_id, file_id=TypeAdapter(SimcoreS3FileID).validate_python( - s3_obj.object_key + s3_entry.object_key ), bucket=self.simcore_bucket_name, location_id=self.get_location_id(), location_name=self.get_location_name(), sha256_checksum=None, - file_size=s3_obj.size, - last_modified=s3_obj.last_modified, - entity_tag=s3_obj.e_tag, + file_size=s3_entry.size, + last_modified=s3_entry.last_modified, + entity_tag=s3_entry.e_tag, + is_directory=is_directory, ) yield file_meta diff --git a/services/storage/tests/unit/test_simcore_s3_dsm.py b/services/storage/tests/unit/test_simcore_s3_dsm.py index 3477edef6a8..694d14894d8 100644 --- a/services/storage/tests/unit/test_simcore_s3_dsm.py +++ b/services/storage/tests/unit/test_simcore_s3_dsm.py @@ -524,3 +524,139 @@ async def test_create_s3_export_abort_upload_upon_error( user_id, [], progress_bar=progress_bar ) await _assert_meta_data_entries_count(sqlalchemy_async_engine, count=0) + + +@pytest.mark.parametrize( + "location_id", + [SimcoreS3DataManager.get_location_id()], + ids=[SimcoreS3DataManager.get_location_name()], + indirect=True, +) +async def test_search_directories( + simcore_s3_dsm: SimcoreS3DataManager, + create_directory_with_files: Callable[ + [str, ByteSize, int, int, ProjectID, NodeID], + Awaitable[ + tuple[SimcoreS3FileID, tuple[NodeID, dict[SimcoreS3FileID, FileIDDict]]] + ], + ], + upload_file: Callable[..., Awaitable[tuple[Path, SimcoreS3FileID]]], + file_size: ByteSize, + user_id: UserID, + project_id: ProjectID, + node_id: NodeID, + faker: Faker, +): + """Test that search functionality can find directories.""" + + # Create directories with different naming patterns + test_directories = [ + ("test_dir_1", 3, 2), # directory name, subdir_count, file_count + ("test_dir_2", 2, 3), + ("data_folder", 1, 2), + ("backup_directory", 2, 1), + ("config_dir", 1, 1), + ("temp_folder", 3, 2), + ] + + created_directories = [] + + # Create the directories + for dir_name, subdir_count, file_count in test_directories: + directory_file_id, _ = await create_directory_with_files( + dir_name, + file_size, + subdir_count, + file_count, + project_id, + node_id, + ) + created_directories.append((dir_name, directory_file_id)) + + # Also upload some regular files with similar patterns for contrast + regular_files = [ + ("test_file.txt", "test_*"), + ("data_document.pdf", "data_*"), + ("backup_config.json", "backup_*"), + ("temp_settings.xml", "temp_*"), + ] + + for file_name, _ in regular_files: + checksum: SHA256Str = TypeAdapter(SHA256Str).validate_python(faker.sha256()) + await upload_file(file_size, file_name, sha256_checksum=checksum) + + # Test 1: Search for directories with "test_dir" pattern + test_dir_results = await _search_files_by_pattern( + simcore_s3_dsm, user_id, "test_dir*", project_id + ) + # Should find 2 directories: test_dir_1 and test_dir_2 + directory_results = [ + f for f in test_dir_results if f.file_name and f.file_name.endswith("/") + ] + assert len(directory_results) == 2 + dir_names = {f.file_name.rstrip("/") for f in directory_results} + assert dir_names == {"test_dir_1", "test_dir_2"} + + # Test 2: Search for directories with "_dir" suffix + dir_suffix_results = await _search_files_by_pattern( + simcore_s3_dsm, user_id, "*_dir", project_id + ) + directory_results = [ + f for f in dir_suffix_results if f.file_name and f.file_name.endswith("/") + ] + assert len(directory_results) == 3 # test_dir_1, test_dir_2, config_dir + dir_names = {f.file_name.rstrip("/") for f in directory_results} + assert dir_names == {"test_dir_1", "test_dir_2", "config_dir"} + + # Test 3: Search for directories with "folder" in name + folder_results = await _search_files_by_pattern( + simcore_s3_dsm, user_id, "*folder*", project_id + ) + directory_results = [ + f for f in folder_results if f.file_name and f.file_name.endswith("/") + ] + assert len(directory_results) == 2 # data_folder, temp_folder + dir_names = {f.file_name.rstrip("/") for f in directory_results} + assert dir_names == {"data_folder", "temp_folder"} + + # Test 4: Search with pattern that matches both files and directories + data_results = await _search_files_by_pattern( + simcore_s3_dsm, user_id, "data_*", project_id + ) + # Should find both data_folder (directory) and data_document.pdf (file) + assert len(data_results) >= 2 + file_names = {f.file_name for f in data_results if f.file_name} + # Check that we have both directory and file + has_directory = any(name.endswith("/") for name in file_names) + has_file = any(not name.endswith("/") for name in file_names) + assert has_directory + assert has_file + + # Test 5: Search for backup pattern (should find both directory and file) + backup_results = await _search_files_by_pattern( + simcore_s3_dsm, user_id, "backup_*", project_id + ) + assert len(backup_results) >= 2 + file_names = {f.file_name for f in backup_results if f.file_name} + # Should find backup_directory/ and backup_config.json + directory_names = {name.rstrip("/") for name in file_names if name.endswith("/")} + file_names_only = {name for name in file_names if not name.endswith("/")} + assert "backup_directory" in directory_names + assert "backup_config.json" in file_names_only + + # Test 6: Verify directory metadata properties + for file_meta in test_dir_results: + if file_meta.file_name and file_meta.file_name.endswith("/"): + assert isinstance(file_meta, FileMetaData) + assert file_meta.user_id == user_id + assert file_meta.project_id == project_id + assert file_meta.file_id is not None + # Directory should have size information + assert file_meta.file_size is not None + + # Test 7: Search without project restriction should still find directories + all_results = await _search_files_by_pattern(simcore_s3_dsm, user_id, "*") + all_directory_results = [ + f for f in all_results if f.file_name and f.file_name.endswith("/") + ] + assert len(all_directory_results) >= len(test_directories)