Skip to content

Commit f81d9ca

Browse files
move event_generator logic down to service
1 parent d7bea56 commit f81d9ca

File tree

2 files changed

+40
-15
lines changed

2 files changed

+40
-15
lines changed

services/web/server/src/simcore_service_webserver/tasks/_controller/_rest.py

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
create_data_response,
2222
create_event_stream_response,
2323
)
24-
from servicelib.celery.models import OwnerMetadata, TaskEventType, TaskStatusValue
24+
from servicelib.celery.models import OwnerMetadata
2525
from servicelib.long_running_tasks import lrt_api
2626
from servicelib.sse.models import SSEEvent, SSEHeaders
2727
from simcore_service_webserver.tasks._controller._rest_schemas import TaskPathParams
@@ -60,7 +60,7 @@ async def get_async_jobs(request: web.Request) -> web.Response:
6060
_req_ctx = AuthenticatedRequestContext.model_validate(request)
6161

6262
tasks = await _tasks_service.list_tasks(
63-
task_manager=get_task_manager(request.app),
63+
get_task_manager(request.app),
6464
owner_metadata=OwnerMetadata.model_validate(
6565
WebServerOwnerMetadata(
6666
user_id=_req_ctx.user_id,
@@ -105,7 +105,7 @@ async def get_async_job_status(request: web.Request) -> web.Response:
105105
_path_params = parse_request_path_parameters_as(TaskPathParams, request)
106106

107107
task_status = await _tasks_service.get_task_status(
108-
task_manager=get_task_manager(request.app),
108+
get_task_manager(request.app),
109109
owner_metadata=OwnerMetadata.model_validate(
110110
WebServerOwnerMetadata(
111111
user_id=_req_ctx.user_id,
@@ -140,7 +140,7 @@ async def cancel_async_job(request: web.Request) -> web.Response:
140140
_path_params = parse_request_path_parameters_as(TaskPathParams, request)
141141

142142
await _tasks_service.cancel_task(
143-
task_manager=get_task_manager(request.app),
143+
get_task_manager(request.app),
144144
owner_metadata=OwnerMetadata.model_validate(
145145
WebServerOwnerMetadata(
146146
user_id=_req_ctx.user_id,
@@ -165,7 +165,7 @@ async def get_async_job_result(request: web.Request) -> web.Response:
165165
_path_params = parse_request_path_parameters_as(TaskPathParams, request)
166166

167167
task_result = await _tasks_service.get_task_result(
168-
task_manager=get_task_manager(request.app),
168+
get_task_manager(request.app),
169169
owner_metadata=OwnerMetadata.model_validate(
170170
WebServerOwnerMetadata(
171171
user_id=_req_ctx.user_id,
@@ -203,17 +203,8 @@ async def event_generator():
203203
task_uuid=_path_params.task_id,
204204
last_id=_header_params.last_event_id,
205205
):
206-
if (
207-
event.type == TaskEventType.STATUS
208-
and event.data == TaskStatusValue.CREATED
209-
):
210-
continue
211-
212206
yield SSEEvent(
213207
id=event_id, event=event.type, data=[json_dumps(event.data)]
214208
).serialize()
215209

216-
if event.type == TaskEventType.STATUS and event.is_done():
217-
break
218-
219210
return create_event_stream_response(event_generator=event_generator)

services/web/server/src/simcore_service_webserver/tasks/_tasks_service.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from collections.abc import AsyncIterator
23

34
from celery_library.errors import (
45
TaskManagerError,
@@ -17,7 +18,15 @@
1718
JobNotDoneError,
1819
JobSchedulerError,
1920
)
20-
from servicelib.celery.models import OwnerMetadata, TaskState, TaskUUID
21+
from servicelib.celery.models import (
22+
OwnerMetadata,
23+
TaskEvent,
24+
TaskEventID,
25+
TaskEventType,
26+
TaskState,
27+
TaskStatusValue,
28+
TaskUUID,
29+
)
2130
from servicelib.celery.task_manager import TaskManager
2231
from servicelib.logging_utils import log_catch
2332

@@ -26,6 +35,7 @@
2635

2736
async def cancel_task(
2837
task_manager: TaskManager,
38+
*,
2939
owner_metadata: OwnerMetadata,
3040
task_uuid: TaskUUID,
3141
):
@@ -42,6 +52,7 @@ async def cancel_task(
4252

4353
async def get_task_result(
4454
task_manager: TaskManager,
55+
*,
4556
owner_metadata: OwnerMetadata,
4657
task_uuid: TaskUUID,
4758
) -> AsyncJobResult:
@@ -84,6 +95,7 @@ async def get_task_result(
8495

8596
async def get_task_status(
8697
task_manager: TaskManager,
98+
*,
8799
owner_metadata: OwnerMetadata,
88100
task_uuid: TaskUUID,
89101
) -> AsyncJobStatus:
@@ -106,6 +118,7 @@ async def get_task_status(
106118

107119
async def list_tasks(
108120
task_manager: TaskManager,
121+
*,
109122
owner_metadata: OwnerMetadata,
110123
) -> list[AsyncJobGet]:
111124
try:
@@ -118,3 +131,24 @@ async def list_tasks(
118131
return [
119132
AsyncJobGet(job_id=task.uuid, job_name=task.metadata.name) for task in tasks
120133
]
134+
135+
136+
async def consume_task_events(
137+
task_manager: TaskManager,
138+
*,
139+
owner_metadata: OwnerMetadata,
140+
task_uuid: TaskUUID,
141+
last_event_id: TaskEventID | None = None,
142+
) -> AsyncIterator[tuple[TaskEventID, TaskEvent]]:
143+
async for event_id, event in task_manager.consume_task_events(
144+
owner_metadata=owner_metadata,
145+
task_uuid=task_uuid,
146+
last_id=last_event_id,
147+
):
148+
if event.type == TaskEventType.STATUS and event.data == TaskStatusValue.CREATED:
149+
continue
150+
151+
yield event_id, event
152+
153+
if event.type == TaskEventType.STATUS and event.is_done():
154+
break

0 commit comments

Comments
 (0)