Skip to content
Merged
Show file tree
Hide file tree
Changes from 196 commits
Commits
Show all changes
215 commits
Select commit Hold shift + click to select a range
6c3da0b
feat: add S3 search_files method
giancarloromeo Aug 11, 2025
5ad9976
fix: typecheck
giancarloromeo Aug 11, 2025
ae61013
feat: add celery task
giancarloromeo Aug 11, 2025
ba8cf52
feat: add rpc interface
giancarloromeo Aug 11, 2025
8dd2b47
feat: add web server rest
giancarloromeo Aug 11, 2025
9471862
fix: search result schema
giancarloromeo Aug 11, 2025
86f9bbb
fix: register pydantic type
giancarloromeo Aug 11, 2025
2d564fc
Merge branch 'master' into is8102/add-search-api-in-storage
giancarloromeo Aug 19, 2025
a00bdfa
Merge branch 'master' into is8102/add-search-api-in-storage
giancarloromeo Aug 22, 2025
a84876c
Merge branch 'master' into is8102/add-search-api-in-storage
giancarloromeo Aug 25, 2025
69fbd61
Merge remote-tracking branch 'upstream/master' into is8102/add-search…
giancarloromeo Aug 25, 2025
ffb9c5c
add Redis stream
giancarloromeo Aug 26, 2025
09cb3dd
Merge branch 'master' into is8102/add-search-api-in-storage
giancarloromeo Sep 12, 2025
d364c74
Merge remote-tracking branch 'upstream/master' into is8102/add-search…
giancarloromeo Sep 15, 2025
02e73ba
fix: job_filter
giancarloromeo Sep 15, 2025
f48694a
Merge remote-tracking branch 'upstream/master' into is8102/add-search…
giancarloromeo Sep 16, 2025
f8b5729
fix: no-stream
giancarloromeo Sep 16, 2025
06c4640
fix: oas
giancarloromeo Sep 16, 2025
42957be
fix: add searchresult
giancarloromeo Sep 16, 2025
06ac008
fix: add validator
giancarloromeo Sep 16, 2025
8d99f15
fix: indent
giancarloromeo Sep 16, 2025
a3cc0cb
fix: search is ephemeral
giancarloromeo Sep 16, 2025
35d57fe
Merge branch 'master' into is8102/add-search-api-in-storage
giancarloromeo Sep 16, 2025
4eeda4f
Merge branch 'master' into is8102/add-search-api-in-storage
giancarloromeo Sep 17, 2025
39d3300
fix: revert
giancarloromeo Sep 17, 2025
b4a3ca1
Merge branch 'is8102/add-search-api-in-storage' of github.com:giancar…
giancarloromeo Sep 17, 2025
692fec7
Merge branch 'master' into is8102/add-search-api-in-storage
giancarloromeo Sep 17, 2025
a322f6e
fix: constant
giancarloromeo Sep 17, 2025
57ba337
fix: include v
giancarloromeo Sep 17, 2025
05bdb76
fix: move validator to common
giancarloromeo Sep 18, 2025
b7d032e
fix: add quotes
giancarloromeo Sep 18, 2025
a37cce7
Merge branch 'master' into is8102/add-search-api-in-storage
giancarloromeo Sep 18, 2025
834c250
fix: add celery-library
giancarloromeo Sep 19, 2025
8e58365
feat: add streaming
giancarloromeo Sep 22, 2025
a2cb434
fix: typecheck
giancarloromeo Sep 22, 2025
fda7808
fix: add default
giancarloromeo Sep 22, 2025
7d24f6b
fix: default
giancarloromeo Sep 22, 2025
03c52b4
fix: import
giancarloromeo Sep 22, 2025
74814c4
fix: typecheck
giancarloromeo Sep 22, 2025
3c2dae1
Merge branch 'master' into is8102/add-search-api-in-storage
giancarloromeo Sep 22, 2025
626b908
fix: constants
giancarloromeo Sep 22, 2025
5cd9e64
fix: add file size
giancarloromeo Sep 22, 2025
e761d74
fix: add items_per_page param
giancarloromeo Sep 22, 2025
974e005
fix: items per page
giancarloromeo Sep 22, 2025
f62433c
fix: param
giancarloromeo Sep 22, 2025
710c4bc
fix: remove stream
giancarloromeo Sep 22, 2025
5f5c4e0
fix: add stream expire
giancarloromeo Sep 22, 2025
fd0416f
fix: schema
giancarloromeo Sep 22, 2025
4919dbe
fix: param name
giancarloromeo Sep 22, 2025
04d7447
fix: revert expiry
giancarloromeo Sep 22, 2025
67cce39
fix: add path
giancarloromeo Sep 23, 2025
10d5952
fix: refactor
giancarloromeo Sep 23, 2025
2cbe462
fix: remove unused
giancarloromeo Sep 23, 2025
3d3e42d
fix: sse content
giancarloromeo Sep 23, 2025
0380294
fix: refactoring
giancarloromeo Sep 23, 2025
e654fcb
fix: max items per page
giancarloromeo Sep 23, 2025
26442ba
enhance test
giancarloromeo Sep 23, 2025
59a2d5b
fix: typecheck
giancarloromeo Sep 23, 2025
40f9210
fix: celery
giancarloromeo Sep 23, 2025
3abb4d3
fix: import
giancarloromeo Sep 23, 2025
32d1955
fix: relative import
giancarloromeo Sep 23, 2025
c800650
fix: import
giancarloromeo Sep 23, 2025
6ffa9dc
fix: add constant
giancarloromeo Sep 23, 2025
c5ee995
fix: async_job_get_result
giancarloromeo Sep 24, 2025
85bf2f2
fix: async_job_get_status
giancarloromeo Sep 24, 2025
61d89b0
fix: async_job_get_status
giancarloromeo Sep 24, 2025
5b6e59e
fix: async_job_list
giancarloromeo Sep 24, 2025
2a2282a
fix: format
giancarloromeo Sep 24, 2025
add0184
fix: format
giancarloromeo Sep 24, 2025
d030517
fix: remove rpc routes
giancarloromeo Sep 24, 2025
108f3cc
fix: minor
giancarloromeo Sep 24, 2025
de0f838
fix: last modified filter
giancarloromeo Sep 25, 2025
409d6f4
fix: comment
giancarloromeo Sep 25, 2025
ce0c86b
fix: remote method name
giancarloromeo Sep 25, 2025
fd598b5
fix: result stream href
giancarloromeo Sep 25, 2025
28fdffb
fix: make openapi-spec
giancarloromeo Sep 25, 2025
04e9a11
fix: validator
giancarloromeo Sep 25, 2025
994b52d
fix: no need for permissions
giancarloromeo Sep 25, 2025
cf6e618
fix: filters
giancarloromeo Sep 25, 2025
2ea58ab
fix: output schema
giancarloromeo Sep 25, 2025
d87ab39
feat: projectId filter
giancarloromeo Sep 25, 2025
cb4c620
fix: capped redis stream
giancarloromeo Sep 26, 2025
4cbefc6
fix: stream
giancarloromeo Sep 26, 2025
1692edb
fix: raise max stream len
giancarloromeo Sep 26, 2025
b48b9c1
fix: add service
giancarloromeo Sep 26, 2025
ac6e866
Merge remote-tracking branch 'upstream/master' into is8102/add-search…
giancarloromeo Sep 26, 2025
8aa2fe7
fix: plugin
giancarloromeo Sep 26, 2025
826e60c
fix: result
giancarloromeo Sep 26, 2025
50a4c5d
fix: typecheck
giancarloromeo Sep 26, 2025
78a32b2
fix test
giancarloromeo Sep 26, 2025
05b1793
fix: test
giancarloromeo Sep 26, 2025
2254e8d
fix: param worker side
giancarloromeo Sep 26, 2025
8adb0f4
fix: openapi-spec
giancarloromeo Sep 26, 2025
36c2ec1
fix: typecheck
giancarloromeo Sep 26, 2025
21f3a9e
Update services/storage/src/simcore_service_storage/simcore_s3_dsm.py
giancarloromeo Sep 26, 2025
7d787ba
Update services/web/server/src/simcore_service_webserver/tasks/_rest.py
giancarloromeo Sep 26, 2025
7471f91
Update services/storage/src/simcore_service_storage/simcore_s3_dsm.py
giancarloromeo Sep 26, 2025
80c8050
Merge branch 'master' into is8102/add-search-api-in-storage
giancarloromeo Sep 26, 2025
4c4d7c4
fix: test
giancarloromeo Sep 26, 2025
84b2262
fix: test
giancarloromeo Sep 26, 2025
9d518ee
fix: test
giancarloromeo Sep 29, 2025
ef18c4a
fix: type
giancarloromeo Sep 29, 2025
1130b59
fix: param
giancarloromeo Sep 29, 2025
2395f18
fix: remove rpc
giancarloromeo Sep 29, 2025
7310621
fix: async iter
giancarloromeo Sep 29, 2025
ee35013
fix: typecheck
giancarloromeo Sep 29, 2025
86d2954
Merge remote-tracking branch 'upstream/master' into is8102/add-search…
giancarloromeo Sep 29, 2025
e552c15
fix: settings
giancarloromeo Sep 29, 2025
df6c8e8
fix: api
giancarloromeo Sep 29, 2025
e4a5322
fix: openapi-spec
giancarloromeo Sep 29, 2025
bb8ed3e
fix: async_jobs router
giancarloromeo Sep 29, 2025
6dc7187
Merge remote-tracking branch 'upstream/master' into is8102/add-search…
giancarloromeo Sep 30, 2025
9bc9c2e
add Celery to requirements
giancarloromeo Sep 30, 2025
1771429
add Celery settings
giancarloromeo Sep 30, 2025
4eea09e
add Celery to Redis client
giancarloromeo Sep 30, 2025
68dce90
add Celery plugin
giancarloromeo Sep 30, 2025
10a74ae
add event-stream support
giancarloromeo Sep 30, 2025
27909c3
fix long-running-tasks api
giancarloromeo Sep 30, 2025
56a8623
fix REST interface in tasks plugin
giancarloromeo Sep 30, 2025
482dc74
fix tasks endpoints
giancarloromeo Sep 30, 2025
c2b6ecb
fix exception
giancarloromeo Sep 30, 2025
c74830b
fix typecheck
giancarloromeo Sep 30, 2025
495a403
fix path op names
giancarloromeo Sep 30, 2025
abd4bd4
fix path op names
giancarloromeo Sep 30, 2025
3a7560b
fix openapi-spec
giancarloromeo Sep 30, 2025
d9617e7
fix async job path op
giancarloromeo Sep 30, 2025
09ed1b2
fix test
giancarloromeo Sep 30, 2025
5536db0
fix test
giancarloromeo Sep 30, 2025
758594c
Merge branch 'master' into is8102/add-celery-task-manager-to-webserver
giancarloromeo Sep 30, 2025
f1b8bc1
Merge remote-tracking branch 'upstream/master' into is8102/add-celery…
giancarloromeo Sep 30, 2025
4471da5
rename
giancarloromeo Sep 30, 2025
8ae38a0
Merge branch 'is8102/add-celery-task-manager-to-webserver' of github.…
giancarloromeo Sep 30, 2025
9b5c02b
fix property name
giancarloromeo Sep 30, 2025
7ef024a
fix mock
giancarloromeo Sep 30, 2025
2b2f64b
disable Celery in wb_auth tests
giancarloromeo Sep 30, 2025
19b1ed8
add async jobs stream
giancarloromeo Sep 30, 2025
71b3a21
change task key prefix
giancarloromeo Sep 30, 2025
80dba0a
fix mock
giancarloromeo Sep 30, 2025
d7bea56
disable Celery
giancarloromeo Sep 30, 2025
f81d9ca
move event_generator logic down to service
giancarloromeo Oct 1, 2025
5e46968
move sse to models
giancarloromeo Oct 1, 2025
5b9642f
remove unused
giancarloromeo Oct 1, 2025
821d1f0
add test
giancarloromeo Oct 1, 2025
f6676de
fix name
giancarloromeo Oct 1, 2025
6ac1607
move tasks tests
giancarloromeo Oct 1, 2025
73cec60
add tests
giancarloromeo Oct 1, 2025
1057281
Merge branch 'master' into is8102/add-celery-task-manager-to-webserver
giancarloromeo Oct 1, 2025
2db3c93
revert
giancarloromeo Oct 1, 2025
ada49e3
fix fixture
giancarloromeo Oct 1, 2025
b4a5c95
typecheck
giancarloromeo Oct 1, 2025
f3bb46c
add cleanup
giancarloromeo Oct 1, 2025
3776869
add exception handling
giancarloromeo Oct 1, 2025
48c107b
reraise
giancarloromeo Oct 1, 2025
aba2a54
relative import
giancarloromeo Oct 2, 2025
97fa75b
add tests
giancarloromeo Oct 2, 2025
f1e2ce3
remove streaming
giancarloromeo Oct 2, 2025
a0a2319
fix test
giancarloromeo Oct 2, 2025
51fd3c2
Merge branch 'master' into is8102/add-celery-task-manager-to-webserver
giancarloromeo Oct 2, 2025
7df82ae
fix mock
giancarloromeo Oct 2, 2025
adf70d7
Merge branch 'is8102/add-celery-task-manager-to-webserver' of github.…
giancarloromeo Oct 2, 2025
e62a523
typecheck
giancarloromeo Oct 2, 2025
302c44a
Merge branch 'master' into is8102/add-search-api-in-storage
giancarloromeo Oct 2, 2025
ad5c6c1
revert
giancarloromeo Oct 2, 2025
cfe2e8d
Merge branch 'is8102/add-tast-manager-to-webserver' into is8102/add-s…
giancarloromeo Oct 2, 2025
bcba2e7
typecheck
giancarloromeo Oct 2, 2025
259f5de
frozen
giancarloromeo Oct 2, 2025
624bcf0
Merge branch 'is8102/add-celery-task-manager-to-webserver' into is810…
giancarloromeo Oct 2, 2025
59868c2
rename
giancarloromeo Oct 2, 2025
7beb995
Merge branch 'is8102/add-celery-task-manager-to-webserver' into is810…
giancarloromeo Oct 2, 2025
2a43ab2
fix
giancarloromeo Oct 3, 2025
6fb6fb7
Merge branch 'is8102/add-celery-task-manager-to-webserver' into is810…
giancarloromeo Oct 6, 2025
48b104f
remove duplicate
giancarloromeo Oct 6, 2025
953f045
add streaming
giancarloromeo Oct 6, 2025
c9f3e5e
Merge remote-tracking branch 'upstream/master' into is8102/add-celery…
giancarloromeo Oct 6, 2025
6890edc
Merge branch 'is8102/add-celery-task-manager-to-webserver' into is810…
giancarloromeo Oct 6, 2025
e75a3bb
fix params
giancarloromeo Oct 6, 2025
2b43bed
add streaming
giancarloromeo Oct 6, 2025
5115678
fix
giancarloromeo Oct 6, 2025
94ac24a
fix methods
giancarloromeo Oct 6, 2025
d55bdc5
fix response
giancarloromeo Oct 7, 2025
3e10364
register input types
giancarloromeo Oct 7, 2025
9223ddc
fix pull
giancarloromeo Oct 7, 2025
a18507b
fix
giancarloromeo Oct 7, 2025
909f550
add support structs
giancarloromeo Oct 7, 2025
507fbdd
fix
giancarloromeo Oct 7, 2025
a8dbb32
typecheck
giancarloromeo Oct 7, 2025
ed5e4d7
typecheck
giancarloromeo Oct 7, 2025
8838c6e
fix tests
giancarloromeo Oct 7, 2025
2a52b5e
fix validator
giancarloromeo Oct 8, 2025
f1e6fc1
remove unused
giancarloromeo Oct 8, 2025
df83878
Merge branch 'master' into is8102/add-search-api-in-storage
giancarloromeo Oct 8, 2025
69fd847
Merge remote-tracking branch 'upstream/master' into is8102/add-search…
giancarloromeo Oct 8, 2025
3385bc5
fix termination
giancarloromeo Oct 9, 2025
76c0ef0
fix api
giancarloromeo Oct 9, 2025
4058fdc
remove sse
giancarloromeo Oct 9, 2025
bf3d629
Merge branch 'master' into is8102/add-search-api-in-storage
giancarloromeo Oct 9, 2025
c1b9e2d
fix
giancarloromeo Oct 13, 2025
9d81595
fix
giancarloromeo Oct 13, 2025
37ffd8a
fix
giancarloromeo Oct 13, 2025
01cac77
fix
giancarloromeo Oct 13, 2025
2006ec6
fix
giancarloromeo Oct 13, 2025
e28185c
fix
giancarloromeo Oct 13, 2025
84e7d5f
fix
giancarloromeo Oct 13, 2025
6f4aecb
fix
giancarloromeo Oct 13, 2025
808ff59
fix
giancarloromeo Oct 13, 2025
e980086
fix
giancarloromeo Oct 13, 2025
c7bf597
fix
giancarloromeo Oct 13, 2025
710eea2
remove sleep
giancarloromeo Oct 13, 2025
e5bbfe6
fix tests
giancarloromeo Oct 13, 2025
40aec56
fix tests
giancarloromeo Oct 13, 2025
fef6e72
fix tests
giancarloromeo Oct 13, 2025
59ef8a5
fix
giancarloromeo Oct 13, 2025
8c920a4
fix endpoint
giancarloromeo Oct 14, 2025
4cf9f24
Merge branch 'master' into is8102/add-search-api-in-storage
giancarloromeo Oct 14, 2025
883cfd0
typecheck
giancarloromeo Oct 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions api/specs/web-server/_long_running_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
from simcore_service_webserver.tasks._controller._rest_exceptions import (
_TO_HTTP_ERROR_MAP,
)
from simcore_service_webserver.tasks._controller._rest_schemas import (
TaskStreamQueryParams,
TaskStreamResponse,
)

router = APIRouter(
prefix=f"/{API_VTAG}",
Expand Down Expand Up @@ -63,3 +67,14 @@ def get_async_job_result(
_path_params: Annotated[_PathParam, Depends()],
):
"""Retrieves the result of a task"""


@router.get(
"/tasks/{task_id}/stream",
response_model=Envelope[TaskStreamResponse],
)
def get_async_job_stream(
_path_params: Annotated[_PathParam, Depends()],
_query_params: Annotated[TaskStreamQueryParams, Depends()],
):
"""Retrieves the stream of a task"""
32 changes: 25 additions & 7 deletions api/specs/web-server/_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
# pylint: disable=too-many-arguments


from typing import Annotated, TypeAlias
from typing import Annotated, Any, Final, TypeAlias

from fastapi import APIRouter, Depends, Query, status
from models_library.api_schemas_long_running_tasks.tasks import (
TaskGet,
)
from models_library.api_schemas_long_running_tasks.tasks import TaskGet
from models_library.api_schemas_storage.storage_schemas import (
FileLocation,
FileMetaDataGet,
Expand All @@ -25,6 +23,7 @@
BatchDeletePathsBodyParams,
DataExportPost,
ListPathsQueryParams,
SearchBodyParams,
StorageLocationPathParams,
StoragePathComputeSizeParams,
)
Expand Down Expand Up @@ -220,14 +219,33 @@ async def is_completed_upload_file(
"""Returns state of upload completion"""


_RESPONSES: Final[dict[int | str, dict[str, Any]]] = {
i.status_code: {"model": EnvelopedError} for i in _TO_HTTP_ERROR_MAP.values()
}


@router.post(
"/storage/locations/{location_id}/export-data",
status_code=status.HTTP_202_ACCEPTED,
response_model=Envelope[TaskGet],
name="export_data",
description="Export data",
responses={
i.status_code: {"model": EnvelopedError} for i in _TO_HTTP_ERROR_MAP.values()
},
responses=_RESPONSES,
)
async def export_data(export_data: DataExportPost, location_id: LocationID):
"""Trigger data export. Returns async job id for getting status and results"""


@router.post(
"/storage/locations/{location_id}/search",
status_code=status.HTTP_202_ACCEPTED,
response_model=Envelope[TaskGet],
name="search",
description="Starts a files/folders search",
responses=_RESPONSES,
)
async def search(
_path: Annotated[StorageLocationPathParams, Depends()],
_body: SearchBodyParams,
):
"""Trigger search. Returns async job id for getting status and results"""
93 changes: 86 additions & 7 deletions packages/celery-library/src/celery_library/backends/redis.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import contextlib
import logging
from dataclasses import dataclass
from datetime import timedelta
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING, Final

from models_library.progress_bar import ProgressReport
Expand All @@ -13,15 +13,24 @@
Task,
TaskKey,
TaskStore,
TaskStreamItem,
)
from servicelib.redis import RedisClientSDK, handle_redis_returns_union_types

_CELERY_TASK_DELIMTATOR: Final[str] = ":"

_CELERY_TASK_PREFIX: Final[str] = "celery-task-"
_CELERY_TASK_ID_KEY_ENCODING = "utf-8"
_CELERY_TASK_SCAN_COUNT_PER_BATCH: Final[int] = 1000
_CELERY_TASK_METADATA_KEY: Final[str] = "metadata"
_CELERY_TASK_EXEC_METADATA_KEY: Final[str] = "exec-meta"
_CELERY_TASK_PROGRESS_KEY: Final[str] = "progress"

### Redis list to store streamed results
_CELERY_TASK_STREAM_PREFIX: Final[str] = "celery-task-stream-"
_CELERY_TASK_STREAM_EXPIRY: Final[timedelta] = timedelta(minutes=3)
_CELERY_TASK_STREAM_METADATA: Final[str] = "meta"
_CELERY_TASK_STREAM_DONE_KEY: Final[str] = "done"
_CELERY_TASK_STREAM_LAST_UPDATE_KEY: Final[str] = "last_update"

_logger = logging.getLogger(__name__)

Expand All @@ -30,6 +39,18 @@ def _build_redis_task_key(task_key: TaskKey) -> str:
return _CELERY_TASK_PREFIX + task_key


def _build_redis_stream_key(task_key: TaskKey) -> str:
return _CELERY_TASK_STREAM_PREFIX + task_key


def _build_redis_stream_meta_key(task_key: TaskKey) -> str:
return (
_build_redis_stream_key(task_key)
+ _CELERY_TASK_DELIMTATOR
+ _CELERY_TASK_STREAM_METADATA
)


@dataclass(frozen=True)
class RedisTaskStore:
_redis_client_sdk: RedisClientSDK
Expand All @@ -44,7 +65,7 @@ async def create_task(
await handle_redis_returns_union_types(
self._redis_client_sdk.redis.hset(
name=redis_key,
key=_CELERY_TASK_METADATA_KEY,
key=_CELERY_TASK_EXEC_METADATA_KEY,
value=execution_metadata.model_dump_json(),
)
)
Expand All @@ -57,7 +78,7 @@ async def get_task_metadata(self, task_key: TaskKey) -> ExecutionMetadata | None
raw_result = await handle_redis_returns_union_types(
self._redis_client_sdk.redis.hget(
_build_redis_task_key(task_key),
_CELERY_TASK_METADATA_KEY,
_CELERY_TASK_EXEC_METADATA_KEY,
)
)
if not raw_result:
Expand Down Expand Up @@ -99,7 +120,7 @@ async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
)

keys: list[str] = []
pipeline = self._redis_client_sdk.redis.pipeline()
pipe = self._redis_client_sdk.redis.pipeline()
async for key in self._redis_client_sdk.redis.scan_iter(
match=search_key, count=_CELERY_TASK_SCAN_COUNT_PER_BATCH
):
Expand All @@ -110,9 +131,9 @@ async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
else key
)
keys.append(_key)
pipeline.hget(_key, _CELERY_TASK_METADATA_KEY)
pipe.hget(_key, _CELERY_TASK_EXEC_METADATA_KEY)

results = await pipeline.execute()
results = await pipe.execute()

tasks = []
for key, raw_metadata in zip(keys, results, strict=True):
Expand Down Expand Up @@ -153,6 +174,64 @@ async def task_exists(self, task_key: TaskKey) -> bool:
assert isinstance(n, int) # nosec
return n > 0

async def push_task_stream_items(
self, task_key: TaskKey, *result: TaskStreamItem
) -> None:
stream_key = _build_redis_stream_key(task_key)
stream_meta_key = _build_redis_stream_meta_key(task_key)

pipe = self._redis_client_sdk.redis.pipeline()
pipe.rpush(stream_key, *[r.model_dump_json(by_alias=True) for r in result])
pipe.hset(
stream_meta_key, mapping={"last_update": datetime.now(UTC).isoformat()}
)
pipe.expire(stream_key, _CELERY_TASK_STREAM_EXPIRY)
pipe.expire(stream_meta_key, _CELERY_TASK_STREAM_EXPIRY)
await pipe.execute()

async def set_task_stream_done(self, task_key: TaskKey) -> None:
stream_meta_key = _build_redis_stream_meta_key(task_key)
await handle_redis_returns_union_types(
self._redis_client_sdk.redis.hset(
name=stream_meta_key,
key=_CELERY_TASK_STREAM_DONE_KEY,
value="1",
)
)

async def pull_task_stream_items(
self, task_key: TaskKey, limit: int = 20
) -> tuple[list[TaskStreamItem], bool, datetime | None]:
stream_key = _build_redis_stream_key(task_key)
meta_key = _build_redis_stream_meta_key(task_key)

async with self._redis_client_sdk.redis.pipeline(transaction=True) as pipe:
pipe.lrange(stream_key, 0, limit - 1)
pipe.ltrim(stream_key, limit, -1)
pipe.hget(meta_key, _CELERY_TASK_STREAM_DONE_KEY)
pipe.hget(meta_key, _CELERY_TASK_STREAM_LAST_UPDATE_KEY)
raw_items, _, done, last_update = await pipe.execute()

stream_items = [TaskStreamItem.model_validate_json(item) for item in raw_items]

if stream_items:
await handle_redis_returns_union_types(
self._redis_client_sdk.redis.ltrim(stream_key, len(stream_items), -1)
)

empty = (
await handle_redis_returns_union_types(
self._redis_client_sdk.redis.llen(stream_key)
)
== 0
)

return (
stream_items,
done == "1" and empty,
datetime.fromisoformat(last_update) if last_update else None,
)


if TYPE_CHECKING:
_: type[TaskStore] = RedisTaskStore
10 changes: 5 additions & 5 deletions packages/celery-library/src/celery_library/rpc/_async_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import logging

from celery.exceptions import CeleryError # type: ignore[import-untyped]
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
AsyncJobGet,
AsyncJobId,
Expand All @@ -22,6 +21,7 @@
from servicelib.rabbitmq import RPCRouter

from ..errors import (
TaskManagerError,
TaskNotFoundError,
TransferrableCeleryError,
decode_celery_transferrable_error,
Expand All @@ -44,7 +44,7 @@ async def cancel(
)
except TaskNotFoundError as exc:
raise JobMissingError(job_id=job_id) from exc
except CeleryError as exc:
except TaskManagerError as exc:
raise JobSchedulerError(exc=f"{exc}") from exc


Expand All @@ -62,7 +62,7 @@ async def status(
)
except TaskNotFoundError as exc:
raise JobMissingError(job_id=job_id) from exc
except CeleryError as exc:
except TaskManagerError as exc:
raise JobSchedulerError(exc=f"{exc}") from exc

return AsyncJobStatus(
Expand Down Expand Up @@ -101,7 +101,7 @@ async def result(
)
except TaskNotFoundError as exc:
raise JobMissingError(job_id=job_id) from exc
except CeleryError as exc:
except TaskManagerError as exc:
raise JobSchedulerError(exc=f"{exc}") from exc

if _status.task_state == TaskState.FAILURE:
Expand Down Expand Up @@ -136,7 +136,7 @@ async def list_jobs(
tasks = await task_manager.list_tasks(
owner_metadata=owner_metadata,
)
except CeleryError as exc:
except TaskManagerError as exc:
raise JobSchedulerError(exc=f"{exc}") from exc

return [
Expand Down
Loading
Loading