Skip to content

Commit f3bb46c

Browse files
add cleanup
1 parent b4a5c95 commit f3bb46c

File tree

2 files changed

+30
-18
lines changed
  • packages/celery-library/src/celery_library/backends
  • services/web/server/src/simcore_service_webserver/tasks/_controller

2 files changed

+30
-18
lines changed

packages/celery-library/src/celery_library/backends/redis.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import contextlib
23
import logging
34
from collections.abc import AsyncIterator
@@ -186,11 +187,15 @@ async def consume_task_events(
186187
) -> AsyncIterator[tuple[TaskEventID, TaskEvent]]:
187188
stream_key = _build_stream_key(task_id)
188189
while True:
189-
messages = await self._redis_client_sdk.redis.xread(
190-
{stream_key: last_id or _CELERY_TASK_STREAM_DEFAULT_ID},
191-
block=_CELERY_TASK_STREAM_BLOCK_TIMEOUT,
192-
count=_CELERY_TASK_STREAM_COUNT,
193-
)
190+
try:
191+
messages = await self._redis_client_sdk.redis.xread(
192+
{stream_key: last_id or _CELERY_TASK_STREAM_DEFAULT_ID},
193+
block=_CELERY_TASK_STREAM_BLOCK_TIMEOUT,
194+
count=_CELERY_TASK_STREAM_COUNT,
195+
)
196+
except asyncio.CancelledError:
197+
break
198+
194199
if not messages:
195200
continue
196201
for _, events in messages:
@@ -204,6 +209,7 @@ async def consume_task_events(
204209
raw_event
205210
)
206211
yield msg_id, event
212+
last_id = msg_id
207213
except ValidationError:
208214
continue
209215

services/web/server/src/simcore_service_webserver/tasks/_controller/_rest.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import logging
23
from typing import Final
34

@@ -193,18 +194,23 @@ async def get_async_job_stream(request: web.Request) -> web.Response:
193194
_path_params = parse_request_path_parameters_as(TaskPathParams, request)
194195

195196
async def event_generator():
196-
async for event_id, event in get_task_manager(request.app).consume_task_events(
197-
owner_metadata=OwnerMetadata.model_validate(
198-
WebServerOwnerMetadata(
199-
user_id=_req_ctx.user_id,
200-
product_name=_req_ctx.product_name,
201-
).model_dump()
202-
),
203-
task_uuid=_path_params.task_id,
204-
last_id=_header_params.last_event_id,
205-
):
206-
yield SSEEvent(
207-
id=event_id, event=event.type, data=[json_dumps(event.data)]
208-
).serialize()
197+
try:
198+
async for event_id, event in get_task_manager(
199+
request.app
200+
).consume_task_events(
201+
owner_metadata=OwnerMetadata.model_validate(
202+
WebServerOwnerMetadata(
203+
user_id=_req_ctx.user_id,
204+
product_name=_req_ctx.product_name,
205+
).model_dump()
206+
),
207+
task_uuid=_path_params.task_id,
208+
last_id=_header_params.last_event_id,
209+
):
210+
yield SSEEvent(
211+
id=event_id, event=event.type, data=[json_dumps(event.data)]
212+
).serialize()
213+
except asyncio.CancelledError:
214+
return
209215

210216
return create_event_stream_response(event_generator=event_generator)

0 commit comments

Comments
 (0)