Skip to content

Commit 5175164

Browse files
🐛 Fix log streaming issues (#5104)
1 parent 70efe70 commit 5175164

File tree

11 files changed

+421
-164
lines changed

11 files changed

+421
-164
lines changed
Lines changed: 5 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,17 @@
1-
import asyncio
2-
from asyncio.queues import Queue
3-
from typing import Annotated, AsyncIterable, Final, cast
4-
from uuid import UUID
1+
from typing import Annotated, cast
52

63
from fastapi import Depends, FastAPI
7-
from models_library.projects import ProjectID
8-
from models_library.rabbitmq_messages import LoggerRabbitMessage
9-
from pydantic import PositiveInt
104
from servicelib.fastapi.dependencies import get_app
115
from servicelib.rabbitmq import RabbitMQClient
126

13-
from ...models.schemas.jobs import JobLog
14-
from ...services.director_v2 import DirectorV2Api
15-
from ..dependencies.authentication import get_current_user_id
16-
from ..dependencies.services import get_api_client
17-
18-
_NEW_LINE: Final[str] = "\n"
7+
from ...services.log_streaming import LogDistributor
198

209

2110
def get_rabbitmq_client(app: Annotated[FastAPI, Depends(get_app)]) -> RabbitMQClient:
2211
assert app.state.rabbitmq_client # nosec
2312
return cast(RabbitMQClient, app.state.rabbitmq_client)
2413

2514

26-
class LogListener:
27-
_queue: Queue[JobLog]
28-
_queue_name: str
29-
_rabbit_consumer: RabbitMQClient
30-
_project_id: ProjectID
31-
_user_id: PositiveInt
32-
_director2_api: DirectorV2Api
33-
34-
def __init__(
35-
self,
36-
user_id: Annotated[PositiveInt, Depends(get_current_user_id)],
37-
rabbit_consumer: Annotated[RabbitMQClient, Depends(get_rabbitmq_client)],
38-
director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))],
39-
):
40-
41-
self._rabbit_consumer = rabbit_consumer
42-
self._user_id = user_id
43-
self._director2_api = director2_api
44-
self._queue: Queue[JobLog] = Queue(50)
45-
46-
async def listen(
47-
self,
48-
project_id: UUID,
49-
):
50-
self._project_id = project_id
51-
52-
self._queue_name = await self._rabbit_consumer.subscribe(
53-
LoggerRabbitMessage.get_channel_name(),
54-
self._add_logs_to_queue,
55-
exclusive_queue=True,
56-
topics=[f"{self._project_id}.*"],
57-
)
58-
59-
async def stop_listening(self):
60-
await self._rabbit_consumer.unsubscribe(self._queue_name)
61-
62-
async def _add_logs_to_queue(self, data: bytes):
63-
got = LoggerRabbitMessage.parse_raw(data)
64-
item = JobLog(
65-
job_id=got.project_id,
66-
node_id=got.node_id,
67-
log_level=got.log_level,
68-
messages=got.messages,
69-
)
70-
await self._queue.put(item)
71-
return True
72-
73-
async def _project_done(self) -> bool:
74-
task = await self._director2_api.get_computation(
75-
self._project_id, self._user_id
76-
)
77-
return not task.stopped is None
78-
79-
async def log_generator(self) -> AsyncIterable[str]:
80-
while True:
81-
while self._queue.empty():
82-
if await self._project_done():
83-
return
84-
await asyncio.sleep(5)
85-
log: JobLog = await self._queue.get()
86-
yield log.json() + _NEW_LINE
15+
def get_log_distributor(app: Annotated[FastAPI, Depends(get_app)]) -> LogDistributor:
16+
assert app.state.log_distributor # nosec
17+
return cast(LogDistributor, app.state.log_distributor)
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from fastapi import status
2+
from starlette.requests import Request
3+
from starlette.responses import JSONResponse
4+
5+
from ...services.log_streaming import (
6+
LogDistributionBaseException,
7+
LogStreamerNotRegistered,
8+
LogStreamerRegistionConflict,
9+
)
10+
from .http_error import create_error_json_response
11+
12+
13+
async def log_handling_error_handler(
14+
_: Request, exc: LogDistributionBaseException
15+
) -> JSONResponse:
16+
msg = f"{exc}"
17+
status_code: int = 500
18+
if isinstance(exc, LogStreamerNotRegistered):
19+
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
20+
elif isinstance(exc, LogStreamerRegistionConflict):
21+
status_code = status.HTTP_409_CONFLICT
22+
23+
return create_error_json_response(msg, status_code=status_code)

services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,31 +8,33 @@
88

99
from fastapi import APIRouter, Depends, status
1010
from fastapi.exceptions import HTTPException
11-
from fastapi.responses import RedirectResponse, StreamingResponse
11+
from fastapi.responses import RedirectResponse
1212
from fastapi_pagination.api import create_page
1313
from models_library.api_schemas_webserver.projects import ProjectGet
1414
from models_library.api_schemas_webserver.resource_usage import PricingUnitGet
1515
from models_library.api_schemas_webserver.wallets import WalletGetWithAvailableCredits
1616
from models_library.projects_nodes_io import BaseFileLink
17+
from models_library.users import UserID
1718
from pydantic.types import PositiveInt
1819
from servicelib.logging_utils import log_context
1920
from starlette.background import BackgroundTask
2021

21-
from ...models.basic_types import VersionStr
22+
from ...models.basic_types import LogStreamingResponse, VersionStr
2223
from ...models.pagination import Page, PaginationParams
2324
from ...models.schemas.files import File
2425
from ...models.schemas.jobs import ArgumentTypes, Job, JobID, JobMetadata, JobOutputs
2526
from ...models.schemas.solvers import SolverKeyId
2627
from ...services.catalog import CatalogApi
2728
from ...services.director_v2 import DirectorV2Api, DownloadLink, NodeName
29+
from ...services.log_streaming import LogDistributor, LogStreamer
2830
from ...services.solver_job_models_converters import create_job_from_project
2931
from ...services.solver_job_outputs import ResultsTypes, get_solver_output_results
3032
from ...services.storage import StorageApi, to_file_api_model
3133
from ...services.webserver import ProjectNotFoundError
3234
from ..dependencies.application import get_reverse_url_mapper
3335
from ..dependencies.authentication import get_current_user_id, get_product_name
3436
from ..dependencies.database import Engine, get_db_engine
35-
from ..dependencies.rabbitmq import LogListener
37+
from ..dependencies.rabbitmq import get_log_distributor
3638
from ..dependencies.services import get_api_client
3739
from ..dependencies.webserver import AuthSession, get_webserver_session
3840
from ..errors.http_error import create_error_json_response
@@ -360,24 +362,27 @@ async def get_job_pricing_unit(
360362

361363
@router.get(
362364
"/{solver_key:path}/releases/{version}/jobs/{job_id:uuid}/logstream",
363-
response_class=StreamingResponse,
365+
response_class=LogStreamingResponse,
364366
include_in_schema=API_SERVER_DEV_FEATURES_ENABLED,
365367
)
366368
async def get_log_stream(
367369
solver_key: SolverKeyId,
368370
version: VersionStr,
369371
job_id: JobID,
370-
log_listener: Annotated[LogListener, Depends(LogListener)],
371372
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
373+
director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))],
374+
log_distributor: Annotated[LogDistributor, Depends(get_log_distributor)],
375+
user_id: Annotated[UserID, Depends(get_current_user_id)],
372376
):
373377
job_name = _compose_job_resource_name(solver_key, version, job_id)
374-
with log_context(_logger, logging.DEBUG, "Begin streaming logs"):
375-
_logger.debug("job: %s", job_name)
378+
with log_context(
379+
_logger, logging.DEBUG, f"Streaming logs for {job_name=} and {user_id=}"
380+
):
376381
project: ProjectGet = await webserver_api.get_project(project_id=job_id)
377382
_raise_if_job_not_associated_with_solver(solver_key, version, project)
378-
await log_listener.listen(job_id)
379-
return StreamingResponse(
380-
log_listener.log_generator(),
381-
media_type="application/x-ndjson",
382-
background=BackgroundTask(log_listener.stop_listening),
383+
log_streamer = LogStreamer(user_id, director2_api, job_id, log_distributor)
384+
await log_streamer.setup()
385+
return LogStreamingResponse(
386+
log_streamer.log_generator(),
387+
background=BackgroundTask(log_streamer.teardown),
383388
)

services/api-server/src/simcore_service_api_server/core/application.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66
from httpx import HTTPStatusError
77
from models_library.basic_types import BootModeEnum
88
from servicelib.logging_utils import config_all_loggers
9+
from simcore_service_api_server.api.errors.log_handling_error import (
10+
log_handling_error_handler,
11+
)
12+
from simcore_service_api_server.services.log_streaming import (
13+
LogDistributionBaseException,
14+
)
915
from starlette import status
1016
from starlette.exceptions import HTTPException
1117

@@ -98,6 +104,7 @@ def init_app(settings: ApplicationSettings | None = None) -> FastAPI:
98104
app.add_exception_handler(HTTPException, http_error_handler)
99105
app.add_exception_handler(RequestValidationError, http422_error_handler)
100106
app.add_exception_handler(HTTPStatusError, httpx_client_error_handler)
107+
app.add_exception_handler(LogDistributionBaseException, log_handling_error_handler)
101108

102109
# SEE https://docs.python.org/3/library/exceptions.html#exception-hierarchy
103110
app.add_exception_handler(

services/api-server/src/simcore_service_api_server/models/basic_types.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import re
22

3+
from fastapi.responses import StreamingResponse
34
from models_library.basic_regex import VERSION_RE
45
from pydantic import ConstrainedStr
56

@@ -11,3 +12,7 @@ class VersionStr(ConstrainedStr):
1112

1213
class FileNameStr(ConstrainedStr):
1314
strip_whitespace = True
15+
16+
17+
class LogStreamingResponse(StreamingResponse):
18+
media_type = "application/x-ndjson"

services/api-server/src/simcore_service_api_server/models/schemas/jobs.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import datetime
22
import hashlib
3+
import logging
34
from typing import Any, ClassVar, TypeAlias
45
from uuid import UUID, uuid4
56

@@ -306,3 +307,13 @@ class JobLog(BaseModel):
306307
node_id: NodeID | None
307308
log_level: LogLevelInt
308309
messages: list[LogMessageStr]
310+
311+
class Config(BaseConfig):
312+
schema_extra: ClassVar[dict[str, Any]] = {
313+
"example": {
314+
"job_id": "145beae4-a3a8-4fde-adbb-4e8257c2c083",
315+
"node_id": "3742215e-6756-48d2-8b73-4d043065309f",
316+
"log_level": logging.DEBUG,
317+
"messages": ["PROGRESS: 5/10"],
318+
}
319+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import asyncio
2+
from asyncio import Queue
3+
from typing import AsyncIterable, Awaitable, Callable, Final
4+
5+
from models_library.rabbitmq_messages import LoggerRabbitMessage
6+
from models_library.users import UserID
7+
from pydantic import PositiveInt
8+
from servicelib.rabbitmq import RabbitMQClient
9+
10+
from ..models.schemas.jobs import JobID, JobLog
11+
from .director_v2 import DirectorV2Api
12+
13+
_NEW_LINE: Final[str] = "\n"
14+
_SLEEP_SECONDS_BEFORE_CHECK_JOB_STATUS: Final[PositiveInt] = 10
15+
16+
17+
class LogDistributionBaseException(Exception):
18+
pass
19+
20+
21+
class LogStreamerNotRegistered(LogDistributionBaseException):
22+
pass
23+
24+
25+
class LogStreamerRegistionConflict(LogDistributionBaseException):
26+
pass
27+
28+
29+
class LogDistributor:
30+
def __init__(self, rabbitmq_client: RabbitMQClient):
31+
self._rabbit_client = rabbitmq_client
32+
self._log_streamers: dict[JobID, Callable[[JobLog], Awaitable[None]]] = {}
33+
self._queue_name: str
34+
35+
async def setup(self):
36+
self._queue_name = await self._rabbit_client.subscribe(
37+
LoggerRabbitMessage.get_channel_name(),
38+
self._distribute_logs,
39+
exclusive_queue=True,
40+
topics=[],
41+
)
42+
43+
async def teardown(self):
44+
await self._rabbit_client.unsubscribe(self._queue_name)
45+
46+
async def __aenter__(self):
47+
await self.setup()
48+
return self
49+
50+
async def __aexit__(self, exc_type, exc, tb):
51+
await self.teardown()
52+
53+
async def _distribute_logs(self, data: bytes):
54+
got = LoggerRabbitMessage.parse_raw(data)
55+
item = JobLog(
56+
job_id=got.project_id,
57+
node_id=got.node_id,
58+
log_level=got.log_level,
59+
messages=got.messages,
60+
)
61+
callback = self._log_streamers.get(item.job_id)
62+
if callback is None:
63+
raise LogStreamerNotRegistered(
64+
f"Could not forward log because a logstreamer associated with job_id={item.job_id} was not registered"
65+
)
66+
await callback(item)
67+
return True
68+
69+
async def register(
70+
self, job_id: JobID, callback: Callable[[JobLog], Awaitable[None]]
71+
):
72+
if job_id in self._log_streamers:
73+
raise LogStreamerRegistionConflict(
74+
f"A stream was already connected to {job_id=}. Only a single stream can be connected at the time"
75+
)
76+
self._log_streamers[job_id] = callback
77+
await self._rabbit_client.add_topics(
78+
LoggerRabbitMessage.get_channel_name(), topics=[f"{job_id}.*"]
79+
)
80+
81+
async def deregister(self, job_id: JobID):
82+
if job_id not in self._log_streamers:
83+
raise LogStreamerNotRegistered(f"No stream was connected to {job_id=}.")
84+
await self._rabbit_client.remove_topics(
85+
LoggerRabbitMessage.get_channel_name(), topics=[f"{job_id}.*"]
86+
)
87+
del self._log_streamers[job_id]
88+
89+
90+
class LogStreamer:
91+
def __init__(
92+
self,
93+
user_id: UserID,
94+
director2_api: DirectorV2Api,
95+
job_id: JobID,
96+
log_distributor: LogDistributor,
97+
):
98+
self._user_id = user_id
99+
self._director2_api = director2_api
100+
self._queue: Queue[JobLog] = Queue()
101+
self._job_id: JobID = job_id
102+
self._log_distributor: LogDistributor = log_distributor
103+
self._is_registered: bool = False
104+
105+
async def setup(self):
106+
await self._log_distributor.register(self._job_id, self._queue.put)
107+
self._is_registered = True
108+
109+
async def teardown(self):
110+
await self._log_distributor.deregister(self._job_id)
111+
self._is_registered = False
112+
113+
async def __aenter__(self):
114+
await self.setup()
115+
return self
116+
117+
async def __aexit__(self, exc_type, exc, tb):
118+
await self.teardown()
119+
120+
async def _project_done(self) -> bool:
121+
task = await self._director2_api.get_computation(self._job_id, self._user_id)
122+
return not task.stopped is None
123+
124+
async def log_generator(self) -> AsyncIterable[str]:
125+
if not self._is_registered:
126+
raise LogStreamerNotRegistered(
127+
f"LogStreamer for job_id={self._job_id} is not correctly registered"
128+
)
129+
while True:
130+
while self._queue.empty():
131+
if await self._project_done():
132+
return
133+
await asyncio.sleep(_SLEEP_SECONDS_BEFORE_CHECK_JOB_STATUS)
134+
log: JobLog = await self._queue.get()
135+
yield log.json() + _NEW_LINE

0 commit comments

Comments
 (0)