Skip to content

Commit 302aca4

Browse files
committed
deregister log streamer via starlette background task
1 parent 4cf8ef1 commit 302aca4

File tree

3 files changed

+9
-9
lines changed

3 files changed

+9
-9
lines changed

services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44
from collections import deque
55
from collections.abc import Callable
6+
from functools import partial
67
from typing import Annotated, Any, Union
78
from uuid import UUID
89

@@ -16,10 +17,10 @@
1617
from models_library.wallets import ZERO_CREDITS
1718
from pydantic import HttpUrl, NonNegativeInt
1819
from pydantic.types import PositiveInt
19-
from servicelib.fastapi.requests_decorators import cancel_on_disconnect
2020
from servicelib.logging_utils import log_context
2121
from simcore_service_api_server.models.api_resources import parse_resources_ids
2222
from sqlalchemy.ext.asyncio import AsyncEngine
23+
from starlette.background import BackgroundTask
2324

2425
from ..._service_solvers import SolverService
2526
from ...exceptions.custom_errors import InsufficientCreditsError, MissingWalletError
@@ -526,7 +527,6 @@ async def get_job_pricing_unit(
526527
response_class=LogStreamingResponse,
527528
responses=_LOGSTREAM_STATUS_CODES,
528529
)
529-
@cancel_on_disconnect
530530
async def get_log_stream(
531531
request: Request,
532532
solver_key: SolverKeyId,
@@ -553,6 +553,8 @@ async def get_log_stream(
553553
log_distributor=log_distributor,
554554
log_check_timeout=log_check_timeout,
555555
)
556+
await log_distributor.register(job_id, log_streamer.queue)
556557
return LogStreamingResponse(
557558
log_streamer.log_generator(),
559+
background=BackgroundTask(partial(log_distributor.deregister, job_id)),
558560
)

services/api-server/src/simcore_service_api_server/services_http/log_streaming.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ async def _distribute_logs(self, data: bytes):
6969
return False
7070

7171
async def register(self, job_id: JobID, queue: Queue[JobLog]):
72+
_logger.debug("Registering log streamer for job_id=%s", job_id)
7273
if job_id in self._log_streamers:
7374
raise LogStreamerRegistrationConflictError(job_id=job_id)
7475
self._log_streamers[job_id] = queue
@@ -77,6 +78,7 @@ async def register(self, job_id: JobID, queue: Queue[JobLog]):
7778
)
7879

7980
async def deregister(self, job_id: JobID):
81+
_logger.debug("Deregistering log streamer for job_id=%s", job_id)
8082
if job_id not in self._log_streamers:
8183
msg = f"No stream was connected to {job_id}."
8284
raise LogStreamerNotRegisteredError(details=msg, job_id=job_id)
@@ -103,7 +105,7 @@ def __init__(
103105
):
104106
self._user_id = user_id
105107
self._director2_api = director2_api
106-
self._queue: Queue[JobLog] = Queue()
108+
self.queue: Queue[JobLog] = Queue()
107109
self._job_id: JobID = job_id
108110
self._log_distributor: LogDistributor = log_distributor
109111
self._log_check_timeout: NonNegativeInt = log_check_timeout
@@ -116,12 +118,11 @@ async def _project_done(self) -> bool:
116118

117119
async def log_generator(self) -> AsyncIterable[str]:
118120
try:
119-
await self._log_distributor.register(self._job_id, self._queue)
120121
done: bool = False
121122
while not done:
122123
try:
123124
log: JobLog = await asyncio.wait_for(
124-
self._queue.get(), timeout=self._log_check_timeout
125+
self.queue.get(), timeout=self._log_check_timeout
125126
)
126127
yield log.model_dump_json() + _NEW_LINE
127128
except TimeoutError:
@@ -145,6 +146,3 @@ async def log_generator(self) -> AsyncIterable[str]:
145146
)
146147
)
147148
yield ErrorGet(errors=[error_msg]).model_dump_json() + _NEW_LINE
148-
149-
finally:
150-
await self._log_distributor.deregister(self._job_id)

services/api-server/tests/unit/test_services_rabbitmq.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ async def test_log_generator(mocker: MockFixture, faker: Faker):
451451
msg = faker.text()
452452
published_logs.append(msg)
453453
job_log.messages = [msg]
454-
await log_streamer._queue.put(job_log)
454+
await log_streamer.queue.put(job_log)
455455

456456
collected_logs: list[str] = []
457457
async for log in log_streamer.log_generator():

0 commit comments

Comments
 (0)