Skip to content

Commit 1793644

Browse files
committed
Fixed formats, added log.
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent 555a330 commit 1793644

File tree

1 file changed

+14
-5
lines changed

1 file changed

+14
-5
lines changed

taskiq_aio_pika/taskiq/brokers/aio_pika_broker.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from asyncio import AbstractEventLoop
2+
from logging import getLogger
23
from typing import Any, AsyncGenerator, Optional, TypeVar
34

45
from aio_pika import Channel, ExchangeType, Message, connect_robust
@@ -10,6 +11,8 @@
1011

1112
_T = TypeVar("_T")
1213

14+
logger = getLogger("taskiq.aio_pika_broker")
15+
1316

1417
class AioPikaBroker(AsyncBroker):
1518
def __init__(
@@ -84,12 +87,18 @@ async def listen(self) -> AsyncGenerator[BrokerMessage, None]:
8487
async for rmq_message in queue_iter:
8588
async with rmq_message.process():
8689
try:
87-
yield BrokerMessage.parse_raw(
88-
rmq_message.body,
89-
content_type=rmq_message.content_type or "",
90+
yield BrokerMessage(
91+
task_id=rmq_message.headers["task_id"],
92+
task_name=rmq_message.headers["task_name"],
93+
message=rmq_message.body,
94+
headers=rmq_message.headers,
95+
)
96+
except (ValueError, LookupError) as exc:
97+
logger.debug(
98+
"Cannot read broker message %s",
99+
exc,
100+
exc_info=True,
90101
)
91-
except ValueError:
92-
continue
93102

94103
async def shutdown(self) -> None:
95104
await self.connection_pool.close()

0 commit comments

Comments
 (0)