Skip to content

Commit f8b5729

Browse files
fix: no-stream
1 parent f48694a commit f8b5729

File tree

3 files changed

+15
-53
lines changed

3 files changed

+15
-53
lines changed

packages/celery-library/src/celery_library/backends/redis.py

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
import contextlib
2-
import json
32
import logging
4-
from collections.abc import AsyncIterable
53
from datetime import timedelta
64
from typing import TYPE_CHECKING, Final
75

@@ -127,36 +125,6 @@ async def list_tasks(self, task_filter: TaskFilter) -> list[Task]:
127125
async def remove_task(self, task_id: TaskID) -> None:
128126
await self._redis_client_sdk.redis.delete(_build_key(task_id))
129127

130-
async def append_task_result(self, task_id: TaskID, data: dict) -> None:
131-
stream_key = f"task:{task_id}"
132-
await self._redis_client_sdk.redis.xadd(
133-
stream_key,
134-
data,
135-
)
136-
137-
async def stream_task_result(
138-
self, task_id: str, last_id: str = "0-0"
139-
) -> AsyncIterable[dict]:
140-
stream_key = f"task:{task_id}"
141-
while True:
142-
result = await self._redis_client_sdk.redis.xread(
143-
{stream_key: last_id}, block=5000
144-
)
145-
if not result:
146-
continue
147-
148-
for _, entries in result:
149-
for entry_id, fields in entries:
150-
last_id = entry_id
151-
data = {
152-
k: json.loads(v) if k == "data" else v
153-
for k, v in fields.items()
154-
}
155-
yield data
156-
157-
if data.get("type") == "done":
158-
return
159-
160128
async def set_task_progress(self, task_id: TaskID, report: ProgressReport) -> None:
161129
await handle_redis_returns_union_types(
162130
self._redis_client_sdk.redis.hset(

services/storage/src/simcore_service_storage/api/_worker_tasks/_simcore_s3.py

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -151,24 +151,18 @@ async def search(
151151

152152
assert isinstance(dsm, SimcoreS3DataManager) # nosec
153153

154-
pages: list[SearchResult] = []
155-
async for page in dsm.search(
156-
user_id=user_id,
157-
project_id=project_id,
158-
name_pattern=name_pattern,
159-
):
160-
# TODO: publish temporary result
161-
pages.extend(
162-
[
163-
SearchResult(
164-
name=item.file_name,
165-
project_id=item.project_id,
166-
created_at=item.created_at,
167-
modified_at=item.last_modified,
168-
is_directory=item.is_directory,
169-
)
170-
for item in page
171-
]
154+
return [
155+
SearchResult(
156+
name=item.file_name,
157+
project_id=item.project_id,
158+
created_at=item.created_at,
159+
modified_at=item.last_modified,
160+
is_directory=item.is_directory,
172161
)
173-
174-
return pages
162+
async for page in dsm.search(
163+
user_id=user_id,
164+
project_id=project_id,
165+
name_pattern=name_pattern,
166+
)
167+
for item in page
168+
]

services/storage/src/simcore_service_storage/simcore_s3_dsm.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1061,7 +1061,7 @@ async def search(
10611061
10621062
Args:
10631063
user_id: The user requesting the search
1064-
filename_pattern: Wildcard pattern for filename matching (e.g., "*.txt", "test_*.json")
1064+
name_pattern: Wildcard pattern for filename matching (e.g., "*.txt", "test_*.json")
10651065
project_id: Optional project ID to limit search to specific project
10661066
items_per_page: Number of items to return per page
10671067

0 commit comments

Comments
 (0)