|
1 | 1 | import asyncio |
| 2 | +import logging |
2 | 3 | from asyncio import Queue |
3 | 4 | from datetime import datetime, timezone |
4 | 5 | from typing import AsyncIterable, Awaitable, Callable, Final |
5 | 6 |
|
6 | 7 | from models_library.rabbitmq_messages import LoggerRabbitMessage |
7 | 8 | from models_library.users import UserID |
8 | | -from pydantic import NonNegativeInt, PositiveInt |
| 9 | +from pydantic import NonNegativeInt, ValidationError |
9 | 10 | from servicelib.rabbitmq import RabbitMQClient |
10 | 11 |
|
11 | 12 | from ..models.schemas.jobs import JobID, JobLog |
12 | 13 | from .director_v2 import DirectorV2Api |
13 | 14 |
|
| 15 | +_logger = logging.getLogger(__name__) |
| 16 | + |
14 | 17 | _NEW_LINE: Final[str] = "\n" |
15 | | -_SLEEP_SECONDS_BEFORE_CHECK_JOB_STATUS: Final[PositiveInt] = 10 |
16 | 18 |
|
17 | 19 |
|
18 | 20 | class LogDistributionBaseException(Exception): |
@@ -52,7 +54,19 @@ async def __aexit__(self, exc_type, exc, tb): |
52 | 54 | await self.teardown() |
53 | 55 |
|
54 | 56 | async def _distribute_logs(self, data: bytes): |
55 | | - got = LoggerRabbitMessage.parse_raw(data) |
| 57 | + try: |
| 58 | + got = LoggerRabbitMessage.parse_raw( |
| 59 | + data |
| 60 | + ) # rabbitmq client safe_nacks the message if this deserialization fails |
| 61 | + except ValidationError as e: |
| 62 | + _logger.debug( |
| 63 | + "Could not parse log message from RabbitMQ in LogDistributor._distribute_logs" |
| 64 | + ) |
| 65 | + raise e |
| 66 | + _logger.debug( |
| 67 | + "LogDistributor._distribute_logs received message message from RabbitMQ: %s", |
| 68 | + got.json(), |
| 69 | + ) |
56 | 70 | item = JobLog( |
57 | 71 | job_id=got.project_id, |
58 | 72 | node_id=got.node_id, |
|
0 commit comments