11import contextlib
22import logging
33from dataclasses import dataclass
4- from datetime import timedelta
4+ from datetime import UTC , datetime , timedelta
55from typing import TYPE_CHECKING , Final
66
77from models_library .progress_bar import ProgressReport
1313 Task ,
1414 TaskKey ,
1515 TaskStore ,
16+ TaskStreamItem ,
1617)
1718from servicelib .redis import RedisClientSDK , handle_redis_returns_union_types
1819
20+ _CELERY_TASK_DELIMTATOR : Final [str ] = ":"
21+
1922_CELERY_TASK_PREFIX : Final [str ] = "celery-task-"
2023_CELERY_TASK_ID_KEY_ENCODING = "utf-8"
2124_CELERY_TASK_SCAN_COUNT_PER_BATCH : Final [int ] = 1000
22- _CELERY_TASK_METADATA_KEY : Final [str ] = "metadata "
25+ _CELERY_TASK_EXEC_METADATA_KEY : Final [str ] = "exec-meta "
2326_CELERY_TASK_PROGRESS_KEY : Final [str ] = "progress"
2427
28+ ### Redis list to store streamed results
29+ _CELERY_TASK_STREAM_PREFIX : Final [str ] = "celery-task-stream-"
30+ _CELERY_TASK_STREAM_EXPIRY : Final [timedelta ] = timedelta (minutes = 3 )
31+ _CELERY_TASK_STREAM_METADATA : Final [str ] = "meta"
32+ _CELERY_TASK_STREAM_DONE_KEY : Final [str ] = "done"
33+ _CELERY_TASK_STREAM_LAST_UPDATE_KEY : Final [str ] = "last_update"
2534
2635_logger = logging .getLogger (__name__ )
2736
2837
2938def _build_redis_task_key (task_key : TaskKey ) -> str :
30- return _CELERY_TASK_PREFIX + task_key
39+ return f"{ _CELERY_TASK_PREFIX } { task_key } "
40+
41+
42+ def _build_redis_stream_key (task_key : TaskKey ) -> str :
43+ return f"{ _CELERY_TASK_STREAM_PREFIX } { task_key } "
44+
45+
46+ def _build_redis_stream_meta_key (task_key : TaskKey ) -> str :
47+ return f"{ _build_redis_stream_key (task_key )} { _CELERY_TASK_DELIMTATOR } { _CELERY_TASK_STREAM_METADATA } "
3148
3249
3350@dataclass (frozen = True )
@@ -44,7 +61,7 @@ async def create_task(
4461 await handle_redis_returns_union_types (
4562 self ._redis_client_sdk .redis .hset (
4663 name = redis_key ,
47- key = _CELERY_TASK_METADATA_KEY ,
64+ key = _CELERY_TASK_EXEC_METADATA_KEY ,
4865 value = execution_metadata .model_dump_json (),
4966 )
5067 )
@@ -57,7 +74,7 @@ async def get_task_metadata(self, task_key: TaskKey) -> ExecutionMetadata | None
5774 raw_result = await handle_redis_returns_union_types (
5875 self ._redis_client_sdk .redis .hget (
5976 _build_redis_task_key (task_key ),
60- _CELERY_TASK_METADATA_KEY ,
77+ _CELERY_TASK_EXEC_METADATA_KEY ,
6178 )
6279 )
6380 if not raw_result :
@@ -99,7 +116,7 @@ async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
99116 )
100117
101118 keys : list [str ] = []
102- pipeline = self ._redis_client_sdk .redis .pipeline ()
119+ pipe = self ._redis_client_sdk .redis .pipeline ()
103120 async for key in self ._redis_client_sdk .redis .scan_iter (
104121 match = search_key , count = _CELERY_TASK_SCAN_COUNT_PER_BATCH
105122 ):
@@ -110,9 +127,9 @@ async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
110127 else key
111128 )
112129 keys .append (_key )
113- pipeline .hget (_key , _CELERY_TASK_METADATA_KEY )
130+ pipe .hget (_key , _CELERY_TASK_EXEC_METADATA_KEY )
114131
115- results = await pipeline .execute ()
132+ results = await pipe .execute ()
116133
117134 tasks = []
118135 for key , raw_metadata in zip (keys , results , strict = True ):
@@ -153,6 +170,62 @@ async def task_exists(self, task_key: TaskKey) -> bool:
153170 assert isinstance (n , int ) # nosec
154171 return n > 0
155172
173+ async def push_task_stream_items (
174+ self , task_key : TaskKey , * result : TaskStreamItem
175+ ) -> None :
176+ stream_key = _build_redis_stream_key (task_key )
177+ stream_meta_key = _build_redis_stream_meta_key (task_key )
178+
179+ pipe = self ._redis_client_sdk .redis .pipeline ()
180+ pipe .rpush (stream_key , * (r .model_dump_json (by_alias = True ) for r in result ))
181+ pipe .hset (
182+ stream_meta_key , mapping = {"last_update" : datetime .now (UTC ).isoformat ()}
183+ )
184+ pipe .expire (stream_key , _CELERY_TASK_STREAM_EXPIRY )
185+ pipe .expire (stream_meta_key , _CELERY_TASK_STREAM_EXPIRY )
186+ await pipe .execute ()
187+
188+ async def set_task_stream_done (self , task_key : TaskKey ) -> None :
189+ stream_meta_key = _build_redis_stream_meta_key (task_key )
190+ await handle_redis_returns_union_types (
191+ self ._redis_client_sdk .redis .hset (
192+ name = stream_meta_key ,
193+ key = _CELERY_TASK_STREAM_DONE_KEY ,
194+ value = "1" ,
195+ )
196+ )
197+
198+ async def pull_task_stream_items (
199+ self , task_key : TaskKey , limit : int = 20
200+ ) -> tuple [list [TaskStreamItem ], bool , datetime | None ]:
201+ stream_key = _build_redis_stream_key (task_key )
202+ meta_key = _build_redis_stream_meta_key (task_key )
203+
204+ async with self ._redis_client_sdk .redis .pipeline (transaction = True ) as pipe :
205+ pipe .lpop (stream_key , limit )
206+ pipe .hget (meta_key , _CELERY_TASK_STREAM_DONE_KEY )
207+ pipe .hget (meta_key , _CELERY_TASK_STREAM_LAST_UPDATE_KEY )
208+ raw_items , done , last_update = await pipe .execute ()
209+
210+ stream_items = (
211+ [TaskStreamItem .model_validate_json (item ) for item in raw_items ]
212+ if raw_items
213+ else []
214+ )
215+
216+ empty = (
217+ await handle_redis_returns_union_types (
218+ self ._redis_client_sdk .redis .llen (stream_key )
219+ )
220+ == 0
221+ )
222+
223+ return (
224+ stream_items ,
225+ done == "1" and empty ,
226+ datetime .fromisoformat (last_update ) if last_update else None ,
227+ )
228+
156229
157230if TYPE_CHECKING :
158231 _ : type [TaskStore ] = RedisTaskStore
0 commit comments