Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from ..log_streaming_errors import (
LogStreamerNotRegisteredError,
LogStreamerRegistionConflictError,
LogStreamerRegistrationConflictError,
LogStreamingBaseError,
)
from ._utils import create_error_json_response
Expand All @@ -18,7 +18,7 @@ async def log_handling_error_handler(request: Request, exc: Exception) -> JSONRe
status_code: int = status.HTTP_500_INTERNAL_SERVER_ERROR
if isinstance(exc, LogStreamerNotRegisteredError):
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
elif isinstance(exc, LogStreamerRegistionConflictError):
elif isinstance(exc, LogStreamerRegistrationConflictError):
status_code = status.HTTP_409_CONFLICT

return create_error_json_response(msg, status_code=status_code)
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ class LogStreamerNotRegisteredError(LogStreamingBaseError):
msg_template = "{msg}"


class LogStreamerRegistionConflictError(LogStreamingBaseError):
class LogStreamerRegistrationConflictError(LogStreamingBaseError):
msg_template = "A stream was already connected to {job_id}. Only a single stream can be connected at the time"
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ..exceptions.backend_errors import BaseBackEndError
from ..exceptions.log_streaming_errors import (
LogStreamerNotRegisteredError,
LogStreamerRegistionConflictError,
LogStreamerRegistrationConflictError,
)
from ..models.schemas.errors import ErrorGet
from ..models.schemas.jobs import JobID, JobLog
Expand Down Expand Up @@ -70,7 +70,7 @@ async def _distribute_logs(self, data: bytes):

async def register(self, job_id: JobID, queue: Queue[JobLog]):
if job_id in self._log_streamers:
raise LogStreamerRegistionConflictError(job_id=job_id)
raise LogStreamerRegistrationConflictError(job_id=job_id)
self._log_streamers[job_id] = queue
await self._rabbit_client.add_topics(
LoggerRabbitMessage.get_channel_name(), topics=[f"{job_id}.*"]
Expand Down Expand Up @@ -126,26 +126,24 @@ async def log_generator(self) -> AsyncIterable[str]:
except TimeoutError:
done = await self._project_done()

except BaseBackEndError as exc:
_logger.info("%s", f"{exc}")
yield ErrorGet(errors=[f"{exc}"]).model_dump_json() + _NEW_LINE
except (BaseBackEndError, LogStreamerRegistrationConflictError) as exc:
error_msg = f"{exc}"

_logger.info("%s: %s", exc.code, error_msg)
yield ErrorGet(errors=[error_msg]).model_dump_json() + _NEW_LINE

except Exception as exc: # pylint: disable=W0718
error_code = create_error_code(exc)
user_error_msg = (
MSG_INTERNAL_ERROR_USER_FRIENDLY_TEMPLATE + f" [{error_code}]"
)
error_msg = MSG_INTERNAL_ERROR_USER_FRIENDLY_TEMPLATE + f" [{error_code}]"

_logger.exception(
**create_troubleshotting_log_kwargs(
user_error_msg,
error_msg,
error=exc,
error_code=error_code,
)
)
yield ErrorGet(
errors=[
MSG_INTERNAL_ERROR_USER_FRIENDLY_TEMPLATE + f" (OEC: {error_code})"
]
).model_dump_json() + _NEW_LINE
yield ErrorGet(errors=[error_msg]).model_dump_json() + _NEW_LINE

finally:
await self._log_distributor.deregister(self._job_id)
4 changes: 2 additions & 2 deletions services/api-server/tests/unit/test_services_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from simcore_service_api_server.services.log_streaming import (
LogDistributor,
LogStreamer,
LogStreamerRegistionConflictError,
LogStreamerRegistrationConflictError,
)
from tenacity import AsyncRetrying, retry_if_not_exception_type, stop_after_delay

Expand Down Expand Up @@ -219,7 +219,7 @@ async def _(job_log: JobLog):
pass

await log_distributor.register(project_id, _)
with pytest.raises(LogStreamerRegistionConflictError):
with pytest.raises(LogStreamerRegistrationConflictError):
await log_distributor.register(project_id, _)
await log_distributor.deregister(project_id)

Expand Down
Loading