Skip to content

Commit 1aae10c

Browse files
🎨 Enhance detection of stalled streamed results (#8685)
1 parent 2aab05d commit 1aae10c

File tree

5 files changed

+41
-10
lines changed

5 files changed

+41
-10
lines changed

packages/celery-library/src/celery_library/backends/redis.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ async def push_task_stream_items(
179179
pipe = self._redis_client_sdk.redis.pipeline()
180180
pipe.rpush(stream_key, *(r.model_dump_json(by_alias=True) for r in result))
181181
pipe.hset(
182-
stream_meta_key, mapping={"last_update": datetime.now(UTC).isoformat()}
182+
stream_meta_key, mapping={"last_update": datetime.now(tz=UTC).isoformat()}
183183
)
184184
pipe.expire(stream_key, _CELERY_TASK_STREAM_EXPIRY)
185185
pipe.expire(stream_meta_key, _CELERY_TASK_STREAM_EXPIRY)
@@ -195,6 +195,16 @@ async def set_task_stream_done(self, task_key: TaskKey) -> None:
195195
)
196196
)
197197

198+
async def set_task_stream_last_update(self, task_key: TaskKey) -> None:
199+
stream_meta_key = _build_redis_stream_meta_key(task_key)
200+
await handle_redis_returns_union_types(
201+
self._redis_client_sdk.redis.hset(
202+
name=stream_meta_key,
203+
key=_CELERY_TASK_STREAM_LAST_UPDATE_KEY,
204+
value=datetime.now(tz=UTC).isoformat(),
205+
)
206+
)
207+
198208
async def pull_task_stream_items(
199209
self, task_key: TaskKey, limit: int = 20
200210
) -> tuple[list[TaskStreamItem], bool, datetime | None]:

packages/celery-library/src/celery_library/task_manager.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,7 @@ async def get_task_status(
186186
@handle_celery_errors
187187
async def list_tasks(self, owner_metadata: OwnerMetadata) -> list[Task]:
188188
with log_context(
189-
_logger,
190-
logging.DEBUG,
191-
msg=f"Listing tasks: {owner_metadata=}",
189+
_logger, logging.DEBUG, "Listing tasks: owner_metadata=%s", owner_metadata
192190
):
193191
return await self._task_store.list_tasks(owner_metadata)
194192

@@ -204,23 +202,33 @@ async def set_task_progress(
204202
@handle_celery_errors
205203
async def set_task_stream_done(self, task_key: TaskKey) -> None:
206204
with log_context(
207-
_logger,
208-
logging.DEBUG,
209-
msg=f"Set task stream done: {task_key=}",
205+
_logger, logging.DEBUG, "Set task stream done: task_key= %s", task_key
210206
):
211207
if not await self.task_exists(task_key):
212208
raise TaskNotFoundError(task_key=task_key)
213209

214210
await self._task_store.set_task_stream_done(task_key)
215211

212+
@handle_celery_errors
213+
async def set_task_stream_last_update(self, task_key: TaskKey) -> None:
214+
with log_context(
215+
_logger, logging.DEBUG, "Set task stream last update: task_key=%s", task_key
216+
):
217+
if not await self.task_exists(task_key):
218+
raise TaskNotFoundError(task_key=task_key)
219+
220+
await self._task_store.set_task_stream_last_update(task_key)
221+
216222
@handle_celery_errors
217223
async def push_task_stream_items(
218224
self, task_key: TaskKey, *items: TaskStreamItem
219225
) -> None:
220226
with log_context(
221227
_logger,
222228
logging.DEBUG,
223-
msg=f"Push task stream items: {task_key=} {items=}",
229+
"Push task stream items: task_key=%s items=%s",
230+
task_key,
231+
items,
224232
):
225233
if not await self.task_exists(task_key):
226234
raise TaskNotFoundError(task_key=task_key)
@@ -238,7 +246,11 @@ async def pull_task_stream_items(
238246
with log_context(
239247
_logger,
240248
logging.DEBUG,
241-
msg=f"Pull task results: {owner_metadata=} {task_uuid=} {offset=} {limit=}",
249+
"Pull task results: owner_metadata=%s task_uuid=%s offset=%s limit=%s",
250+
owner_metadata,
251+
task_uuid,
252+
offset,
253+
limit,
242254
):
243255
task_key = owner_metadata.model_dump_task_key(task_uuid=task_uuid)
244256
if not await self.task_exists(task_key):

packages/service-library/src/servicelib/celery/models.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ async def set_task_progress(
213213

214214
async def set_task_stream_done(self, task_key: TaskKey) -> None: ...
215215

216+
async def set_task_stream_last_update(self, task_key: TaskKey) -> None: ...
217+
216218
async def push_task_stream_items(
217219
self, task_key: TaskKey, *item: TaskStreamItem
218220
) -> None: ...

packages/service-library/src/servicelib/celery/task_manager.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,6 @@ async def pull_task_stream_items(
5656

5757
async def set_task_stream_done(self, task_key: TaskKey) -> None: ...
5858

59+
async def set_task_stream_last_update(self, task_key: TaskKey) -> None: ...
60+
5961
async def task_exists(self, task_key: TaskKey) -> bool: ...

services/storage/src/simcore_service_storage/api/_worker_tasks/_simcore_s3.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,10 @@ async def export_data(
8484
with log_context(
8585
_logger,
8686
logging.INFO,
87-
f"'{task_key}' export data (for {user_id=}) fom selection: {paths_to_export}",
87+
"export data task (%s) (for user=%s) from selection: %s",
88+
task_key,
89+
user_id,
90+
paths_to_export,
8891
):
8992
dsm = get_dsm_provider(get_app_server(task.app).app).get(
9093
SimcoreS3DataManager.get_location_id()
@@ -166,6 +169,8 @@ async def search(
166169
limit=1, # NOTE: yield items as they come
167170
):
168171
if not items:
172+
# NOTE: still set the last update time to signal progress in search
173+
await app_server.task_manager.set_task_stream_last_update(task_key)
169174
continue
170175

171176
data = [

0 commit comments

Comments
 (0)