Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions packages/aws-library/src/aws_library/s3/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import functools
import logging
import urllib.parse
from collections import deque
from collections.abc import AsyncGenerator, Sequence
from dataclasses import dataclass, field
from pathlib import Path
Expand Down Expand Up @@ -317,6 +318,60 @@ async def _list_all_objects(
for obj in s3_objects:
yield obj

@s3_exception_handler_async_gen(_logger)
async def list_entries_paginated(
self,
bucket: S3BucketName,
prefix: str,
*,
items_per_page: int = _MAX_ITEMS_PER_PAGE,
) -> AsyncGenerator[list[S3MetaData | S3DirectoryMetaData], None]:
"""Breadth-first recursive listing of S3 entries (files + directories).

Yields:
A list of S3MetaData and S3DirectoryMetaData per page, exploring
directories level by level.
"""
if items_per_page > _AWS_MAX_ITEMS_PER_PAGE:
msg = f"items_per_page must be <= {_AWS_MAX_ITEMS_PER_PAGE}"
raise ValueError(msg)

paginator = self._client.get_paginator("list_objects_v2")
queue = deque([prefix]) # Breadth-first traversal queue

while queue:
current_prefix = queue.popleft()

async for page in paginator.paginate(
Bucket=bucket,
Prefix=current_prefix,
Delimiter=S3_OBJECT_DELIMITER,
PaginationConfig={"PageSize": items_per_page},
):
entries: list[S3MetaData | S3DirectoryMetaData] = []

# Add subdirectories
for subfolder in page.get("CommonPrefixes", []):
if "Prefix" in subfolder:
sub_prefix = subfolder["Prefix"]
entries.append(
S3DirectoryMetaData.model_construct(
prefix=S3ObjectPrefix(sub_prefix, size=None)
)
)
queue.append(sub_prefix) # BFS traversal

# Add files in the current prefix
entries.extend(
[
S3MetaData.from_botocore_list_objects(obj)
for obj in page.get("Contents", [])
]
)

if entries:
yield entries

@s3_exception_handler(_logger)
async def delete_objects_recursively(
self, *, bucket: S3BucketName, prefix: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ async def search(
for item in items
]

_logger.info(
"'%s' pushing %d search results items: %s",
task_key,
len(data),
data,
)

await app_server.task_manager.push_task_stream_items(
task_key,
*data,
Expand Down
40 changes: 28 additions & 12 deletions services/storage/src/simcore_service_storage/simcore_s3_dsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -1002,17 +1002,32 @@ async def _search_project_s3_files(

try:
name_pattern_lower = name_pattern.lower()
async for s3_objects in s3_client.list_objects_paginated(
async for s3_entries in s3_client.list_entries_paginated(
bucket=self.simcore_bucket_name,
prefix=f"{proj_id}/",
items_per_page=500, # fetch larger batches for efficiency
):
for s3_obj in s3_objects:
filename = Path(s3_obj.object_key).name
_logger.info(
"Searching S3 files in project %s with pattern '%s' in batch of %d items: %s",
proj_id,
name_pattern,
len(s3_entries),
s3_entries,
)
for s3_entry in s3_entries:
is_directory = isinstance(s3_entry, S3DirectoryMetaData)
if is_directory:
filename = Path(s3_entry.prefix).name
else:
filename = Path(s3_entry.object_key).name

if not (
fnmatch.fnmatch(filename.lower(), name_pattern_lower)
and len(s3_obj.object_key.split("/"))
and (
len(s3_entry.object_key.split("/"))
if not is_directory
else len(f"{s3_entry.prefix}".split("/"))
)
>= min_parts_for_valid_s3_object
):
continue
Expand All @@ -1023,30 +1038,31 @@ async def _search_project_s3_files(
)
if (
last_modified_from
and s3_obj.last_modified
and s3_obj.last_modified < last_modified_from
and s3_entry.last_modified
and s3_entry.last_modified < last_modified_from
):
continue

if (
last_modified_until
and s3_obj.last_modified
and s3_obj.last_modified > last_modified_until
and s3_entry.last_modified
and s3_entry.last_modified > last_modified_until
):
continue

file_meta = FileMetaData.from_simcore_node(
user_id=user_id,
file_id=TypeAdapter(SimcoreS3FileID).validate_python(
s3_obj.object_key
s3_entry.object_key
),
bucket=self.simcore_bucket_name,
location_id=self.get_location_id(),
location_name=self.get_location_name(),
sha256_checksum=None,
file_size=s3_obj.size,
last_modified=s3_obj.last_modified,
entity_tag=s3_obj.e_tag,
file_size=s3_entry.size,
last_modified=s3_entry.last_modified,
entity_tag=s3_entry.e_tag,
is_directory=is_directory,
)
yield file_meta

Expand Down
136 changes: 136 additions & 0 deletions services/storage/tests/unit/test_simcore_s3_dsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,3 +524,139 @@ async def test_create_s3_export_abort_upload_upon_error(
user_id, [], progress_bar=progress_bar
)
await _assert_meta_data_entries_count(sqlalchemy_async_engine, count=0)


@pytest.mark.parametrize(
"location_id",
[SimcoreS3DataManager.get_location_id()],
ids=[SimcoreS3DataManager.get_location_name()],
indirect=True,
)
async def test_search_directories(
simcore_s3_dsm: SimcoreS3DataManager,
create_directory_with_files: Callable[
[str, ByteSize, int, int, ProjectID, NodeID],
Awaitable[
tuple[SimcoreS3FileID, tuple[NodeID, dict[SimcoreS3FileID, FileIDDict]]]
],
],
upload_file: Callable[..., Awaitable[tuple[Path, SimcoreS3FileID]]],
file_size: ByteSize,
user_id: UserID,
project_id: ProjectID,
node_id: NodeID,
faker: Faker,
):
"""Test that search functionality can find directories."""

# Create directories with different naming patterns
test_directories = [
("test_dir_1", 3, 2), # directory name, subdir_count, file_count
("test_dir_2", 2, 3),
("data_folder", 1, 2),
("backup_directory", 2, 1),
("config_dir", 1, 1),
("temp_folder", 3, 2),
]

created_directories = []

# Create the directories
for dir_name, subdir_count, file_count in test_directories:
directory_file_id, _ = await create_directory_with_files(
dir_name,
file_size,
subdir_count,
file_count,
project_id,
node_id,
)
created_directories.append((dir_name, directory_file_id))

# Also upload some regular files with similar patterns for contrast
regular_files = [
("test_file.txt", "test_*"),
("data_document.pdf", "data_*"),
("backup_config.json", "backup_*"),
("temp_settings.xml", "temp_*"),
]

for file_name, _ in regular_files:
checksum: SHA256Str = TypeAdapter(SHA256Str).validate_python(faker.sha256())
await upload_file(file_size, file_name, sha256_checksum=checksum)

# Test 1: Search for directories with "test_dir" pattern
test_dir_results = await _search_files_by_pattern(
simcore_s3_dsm, user_id, "test_dir*", project_id
)
# Should find 2 directories: test_dir_1 and test_dir_2
directory_results = [
f for f in test_dir_results if f.file_name and f.file_name.endswith("/")
]
assert len(directory_results) == 2
dir_names = {f.file_name.rstrip("/") for f in directory_results}
assert dir_names == {"test_dir_1", "test_dir_2"}

# Test 2: Search for directories with "_dir" suffix
dir_suffix_results = await _search_files_by_pattern(
simcore_s3_dsm, user_id, "*_dir", project_id
)
directory_results = [
f for f in dir_suffix_results if f.file_name and f.file_name.endswith("/")
]
assert len(directory_results) == 3 # test_dir_1, test_dir_2, config_dir
dir_names = {f.file_name.rstrip("/") for f in directory_results}
assert dir_names == {"test_dir_1", "test_dir_2", "config_dir"}

# Test 3: Search for directories with "folder" in name
folder_results = await _search_files_by_pattern(
simcore_s3_dsm, user_id, "*folder*", project_id
)
directory_results = [
f for f in folder_results if f.file_name and f.file_name.endswith("/")
]
assert len(directory_results) == 2 # data_folder, temp_folder
dir_names = {f.file_name.rstrip("/") for f in directory_results}
assert dir_names == {"data_folder", "temp_folder"}

# Test 4: Search with pattern that matches both files and directories
data_results = await _search_files_by_pattern(
simcore_s3_dsm, user_id, "data_*", project_id
)
# Should find both data_folder (directory) and data_document.pdf (file)
assert len(data_results) >= 2
file_names = {f.file_name for f in data_results if f.file_name}
# Check that we have both directory and file
has_directory = any(name.endswith("/") for name in file_names)
has_file = any(not name.endswith("/") for name in file_names)
assert has_directory
assert has_file

# Test 5: Search for backup pattern (should find both directory and file)
backup_results = await _search_files_by_pattern(
simcore_s3_dsm, user_id, "backup_*", project_id
)
assert len(backup_results) >= 2
file_names = {f.file_name for f in backup_results if f.file_name}
# Should find backup_directory/ and backup_config.json
directory_names = {name.rstrip("/") for name in file_names if name.endswith("/")}
file_names_only = {name for name in file_names if not name.endswith("/")}
assert "backup_directory" in directory_names
assert "backup_config.json" in file_names_only

# Test 6: Verify directory metadata properties
for file_meta in test_dir_results:
if file_meta.file_name and file_meta.file_name.endswith("/"):
assert isinstance(file_meta, FileMetaData)
assert file_meta.user_id == user_id
assert file_meta.project_id == project_id
assert file_meta.file_id is not None
# Directory should have size information
assert file_meta.file_size is not None

# Test 7: Search without project restriction should still find directories
all_results = await _search_files_by_pattern(simcore_s3_dsm, user_id, "*")
all_directory_results = [
f for f in all_results if f.file_name and f.file_name.endswith("/")
]
assert len(all_directory_results) >= len(test_directories)
Loading