Skip to content

Commit dc64e12

Browse files
committed
Merge branch 'enh/conversation-read' of github.com:odeimaiz/osparc-simcore into enh/conversation-read
2 parents c91e674 + 645a601 commit dc64e12

File tree

37 files changed

+1491
-124
lines changed

37 files changed

+1491
-124
lines changed

api/specs/web-server/_long_running_tasks.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
from simcore_service_webserver.tasks._controller._rest_exceptions import (
1616
_TO_HTTP_ERROR_MAP,
1717
)
18+
from simcore_service_webserver.tasks._controller._rest_schemas import (
19+
TaskStreamQueryParams,
20+
TaskStreamResponse,
21+
)
1822

1923
router = APIRouter(
2024
prefix=f"/{API_VTAG}",
@@ -63,3 +67,14 @@ def get_async_job_result(
6367
_path_params: Annotated[_PathParam, Depends()],
6468
):
6569
"""Retrieves the result of a task"""
70+
71+
72+
@router.get(
73+
"/tasks/{task_id}/stream",
74+
response_model=Envelope[TaskStreamResponse],
75+
)
76+
def get_async_job_stream(
77+
_path_params: Annotated[_PathParam, Depends()],
78+
_query_params: Annotated[TaskStreamQueryParams, Depends()],
79+
):
80+
"""Retrieves the stream of a task"""

api/specs/web-server/_storage.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,10 @@
44
# pylint: disable=too-many-arguments
55

66

7-
from typing import Annotated, TypeAlias
7+
from typing import Annotated, Any, Final, TypeAlias
88

99
from fastapi import APIRouter, Depends, Query, status
10-
from models_library.api_schemas_long_running_tasks.tasks import (
11-
TaskGet,
12-
)
10+
from models_library.api_schemas_long_running_tasks.tasks import TaskGet
1311
from models_library.api_schemas_storage.storage_schemas import (
1412
FileLocation,
1513
FileMetaDataGet,
@@ -25,6 +23,7 @@
2523
BatchDeletePathsBodyParams,
2624
DataExportPost,
2725
ListPathsQueryParams,
26+
SearchBodyParams,
2827
StorageLocationPathParams,
2928
StoragePathComputeSizeParams,
3029
)
@@ -220,14 +219,33 @@ async def is_completed_upload_file(
220219
"""Returns state of upload completion"""
221220

222221

222+
_RESPONSES: Final[dict[int | str, dict[str, Any]]] = {
223+
i.status_code: {"model": EnvelopedError} for i in _TO_HTTP_ERROR_MAP.values()
224+
}
225+
226+
223227
@router.post(
224-
"/storage/locations/{location_id}/export-data",
228+
"/storage/locations/{location_id}:export-data",
229+
status_code=status.HTTP_202_ACCEPTED,
225230
response_model=Envelope[TaskGet],
226231
name="export_data",
227232
description="Export data",
228-
responses={
229-
i.status_code: {"model": EnvelopedError} for i in _TO_HTTP_ERROR_MAP.values()
230-
},
233+
responses=_RESPONSES,
231234
)
232235
async def export_data(export_data: DataExportPost, location_id: LocationID):
233236
"""Trigger data export. Returns async job id for getting status and results"""
237+
238+
239+
@router.post(
240+
"/storage/locations/{location_id}:search",
241+
status_code=status.HTTP_202_ACCEPTED,
242+
response_model=Envelope[TaskGet],
243+
name="search",
244+
description="Starts a files/folders search",
245+
responses=_RESPONSES,
246+
)
247+
async def search(
248+
_path: Annotated[StorageLocationPathParams, Depends()],
249+
_body: SearchBodyParams,
250+
):
251+
"""Trigger search. Returns async job id for getting status and results"""

packages/celery-library/src/celery_library/backends/redis.py

Lines changed: 81 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import contextlib
22
import logging
33
from dataclasses import dataclass
4-
from datetime import timedelta
4+
from datetime import UTC, datetime, timedelta
55
from typing import TYPE_CHECKING, Final
66

77
from models_library.progress_bar import ProgressReport
@@ -13,21 +13,38 @@
1313
Task,
1414
TaskKey,
1515
TaskStore,
16+
TaskStreamItem,
1617
)
1718
from servicelib.redis import RedisClientSDK, handle_redis_returns_union_types
1819

20+
_CELERY_TASK_DELIMTATOR: Final[str] = ":"
21+
1922
_CELERY_TASK_PREFIX: Final[str] = "celery-task-"
2023
_CELERY_TASK_ID_KEY_ENCODING = "utf-8"
2124
_CELERY_TASK_SCAN_COUNT_PER_BATCH: Final[int] = 1000
22-
_CELERY_TASK_METADATA_KEY: Final[str] = "metadata"
25+
_CELERY_TASK_EXEC_METADATA_KEY: Final[str] = "exec-meta"
2326
_CELERY_TASK_PROGRESS_KEY: Final[str] = "progress"
2427

28+
### Redis list to store streamed results
29+
_CELERY_TASK_STREAM_PREFIX: Final[str] = "celery-task-stream-"
30+
_CELERY_TASK_STREAM_EXPIRY: Final[timedelta] = timedelta(minutes=3)
31+
_CELERY_TASK_STREAM_METADATA: Final[str] = "meta"
32+
_CELERY_TASK_STREAM_DONE_KEY: Final[str] = "done"
33+
_CELERY_TASK_STREAM_LAST_UPDATE_KEY: Final[str] = "last_update"
2534

2635
_logger = logging.getLogger(__name__)
2736

2837

2938
def _build_redis_task_key(task_key: TaskKey) -> str:
30-
return _CELERY_TASK_PREFIX + task_key
39+
return f"{_CELERY_TASK_PREFIX}{task_key}"
40+
41+
42+
def _build_redis_stream_key(task_key: TaskKey) -> str:
43+
return f"{_CELERY_TASK_STREAM_PREFIX}{task_key}"
44+
45+
46+
def _build_redis_stream_meta_key(task_key: TaskKey) -> str:
47+
return f"{_build_redis_stream_key(task_key)}{_CELERY_TASK_DELIMTATOR}{_CELERY_TASK_STREAM_METADATA}"
3148

3249

3350
@dataclass(frozen=True)
@@ -44,7 +61,7 @@ async def create_task(
4461
await handle_redis_returns_union_types(
4562
self._redis_client_sdk.redis.hset(
4663
name=redis_key,
47-
key=_CELERY_TASK_METADATA_KEY,
64+
key=_CELERY_TASK_EXEC_METADATA_KEY,
4865
value=execution_metadata.model_dump_json(),
4966
)
5067
)
@@ -57,7 +74,7 @@ async def get_task_metadata(self, task_key: TaskKey) -> ExecutionMetadata | None
5774
raw_result = await handle_redis_returns_union_types(
5875
self._redis_client_sdk.redis.hget(
5976
_build_redis_task_key(task_key),
60-
_CELERY_TASK_METADATA_KEY,
77+
_CELERY_TASK_EXEC_METADATA_KEY,
6178
)
6279
)
6380
if not raw_result:
@@ -99,7 +116,7 @@ async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
99116
)
100117

101118
keys: list[str] = []
102-
pipeline = self._redis_client_sdk.redis.pipeline()
119+
pipe = self._redis_client_sdk.redis.pipeline()
103120
async for key in self._redis_client_sdk.redis.scan_iter(
104121
match=search_key, count=_CELERY_TASK_SCAN_COUNT_PER_BATCH
105122
):
@@ -110,9 +127,9 @@ async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
110127
else key
111128
)
112129
keys.append(_key)
113-
pipeline.hget(_key, _CELERY_TASK_METADATA_KEY)
130+
pipe.hget(_key, _CELERY_TASK_EXEC_METADATA_KEY)
114131

115-
results = await pipeline.execute()
132+
results = await pipe.execute()
116133

117134
tasks = []
118135
for key, raw_metadata in zip(keys, results, strict=True):
@@ -153,6 +170,62 @@ async def task_exists(self, task_key: TaskKey) -> bool:
153170
assert isinstance(n, int) # nosec
154171
return n > 0
155172

173+
async def push_task_stream_items(
174+
self, task_key: TaskKey, *result: TaskStreamItem
175+
) -> None:
176+
stream_key = _build_redis_stream_key(task_key)
177+
stream_meta_key = _build_redis_stream_meta_key(task_key)
178+
179+
pipe = self._redis_client_sdk.redis.pipeline()
180+
pipe.rpush(stream_key, *(r.model_dump_json(by_alias=True) for r in result))
181+
pipe.hset(
182+
stream_meta_key, mapping={"last_update": datetime.now(UTC).isoformat()}
183+
)
184+
pipe.expire(stream_key, _CELERY_TASK_STREAM_EXPIRY)
185+
pipe.expire(stream_meta_key, _CELERY_TASK_STREAM_EXPIRY)
186+
await pipe.execute()
187+
188+
async def set_task_stream_done(self, task_key: TaskKey) -> None:
189+
stream_meta_key = _build_redis_stream_meta_key(task_key)
190+
await handle_redis_returns_union_types(
191+
self._redis_client_sdk.redis.hset(
192+
name=stream_meta_key,
193+
key=_CELERY_TASK_STREAM_DONE_KEY,
194+
value="1",
195+
)
196+
)
197+
198+
async def pull_task_stream_items(
199+
self, task_key: TaskKey, limit: int = 20
200+
) -> tuple[list[TaskStreamItem], bool, datetime | None]:
201+
stream_key = _build_redis_stream_key(task_key)
202+
meta_key = _build_redis_stream_meta_key(task_key)
203+
204+
async with self._redis_client_sdk.redis.pipeline(transaction=True) as pipe:
205+
pipe.lpop(stream_key, limit)
206+
pipe.hget(meta_key, _CELERY_TASK_STREAM_DONE_KEY)
207+
pipe.hget(meta_key, _CELERY_TASK_STREAM_LAST_UPDATE_KEY)
208+
raw_items, done, last_update = await pipe.execute()
209+
210+
stream_items = (
211+
[TaskStreamItem.model_validate_json(item) for item in raw_items]
212+
if raw_items
213+
else []
214+
)
215+
216+
empty = (
217+
await handle_redis_returns_union_types(
218+
self._redis_client_sdk.redis.llen(stream_key)
219+
)
220+
== 0
221+
)
222+
223+
return (
224+
stream_items,
225+
done == "1" and empty,
226+
datetime.fromisoformat(last_update) if last_update else None,
227+
)
228+
156229

157230
if TYPE_CHECKING:
158231
_: type[TaskStore] = RedisTaskStore

packages/celery-library/src/celery_library/rpc/_async_jobs.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import logging
44

5-
from celery.exceptions import CeleryError # type: ignore[import-untyped]
65
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
76
AsyncJobGet,
87
AsyncJobId,
@@ -22,6 +21,7 @@
2221
from servicelib.rabbitmq import RPCRouter
2322

2423
from ..errors import (
24+
TaskManagerError,
2525
TaskNotFoundError,
2626
TransferrableCeleryError,
2727
decode_celery_transferrable_error,
@@ -44,7 +44,7 @@ async def cancel(
4444
)
4545
except TaskNotFoundError as exc:
4646
raise JobMissingError(job_id=job_id) from exc
47-
except CeleryError as exc:
47+
except TaskManagerError as exc:
4848
raise JobSchedulerError(exc=f"{exc}") from exc
4949

5050

@@ -62,7 +62,7 @@ async def status(
6262
)
6363
except TaskNotFoundError as exc:
6464
raise JobMissingError(job_id=job_id) from exc
65-
except CeleryError as exc:
65+
except TaskManagerError as exc:
6666
raise JobSchedulerError(exc=f"{exc}") from exc
6767

6868
return AsyncJobStatus(
@@ -101,7 +101,7 @@ async def result(
101101
)
102102
except TaskNotFoundError as exc:
103103
raise JobMissingError(job_id=job_id) from exc
104-
except CeleryError as exc:
104+
except TaskManagerError as exc:
105105
raise JobSchedulerError(exc=f"{exc}") from exc
106106

107107
if _status.task_state == TaskState.FAILURE:
@@ -136,7 +136,7 @@ async def list_jobs(
136136
tasks = await task_manager.list_tasks(
137137
owner_metadata=owner_metadata,
138138
)
139-
except CeleryError as exc:
139+
except TaskManagerError as exc:
140140
raise JobSchedulerError(exc=f"{exc}") from exc
141141

142142
return [

0 commit comments

Comments
 (0)