Skip to content

Commit 467bdd6

Browse files
committed
Replaced string with bytes in broker Message.
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent 231d325 commit 467bdd6

File tree

2 files changed

+12
-32
lines changed

2 files changed

+12
-32
lines changed

taskiq_aio_pika/broker.py

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ async def kick(self, message: BrokerMessage) -> None:
191191
raise ValueError("Please run startup before kicking.")
192192
priority = parse_val(int, message.labels.get("priority"))
193193
rmq_msg = Message(
194-
body=message.message.encode(),
194+
body=message.message,
195195
headers={
196196
"task_id": message.task_id,
197197
"task_name": message.task_name,
@@ -214,7 +214,7 @@ async def kick(self, message: BrokerMessage) -> None:
214214
routing_key=self._delay_queue_name,
215215
)
216216

217-
async def listen(self) -> AsyncGenerator[BrokerMessage, None]: # noqa: WPS210
217+
async def listen(self) -> AsyncGenerator[bytes, None]:
218218
"""
219219
Listen to queue.
220220
@@ -231,25 +231,7 @@ async def listen(self) -> AsyncGenerator[BrokerMessage, None]: # noqa: WPS210
231231
async with queue.iterator() as iterator:
232232
async for message in iterator:
233233
async with message.process():
234-
headers = {}
235-
for header_name, header_value in message.headers.items():
236-
headers[header_name] = str(header_value)
237-
try:
238-
broker_message = BrokerMessage(
239-
task_id=headers.pop("task_id"),
240-
task_name=headers.pop("task_name"),
241-
message=message.body,
242-
labels=headers,
243-
)
244-
except (ValueError, LookupError) as exc:
245-
logger.warning(
246-
"Cannot read broker message %s",
247-
exc,
248-
exc_info=True,
249-
)
250-
continue
251-
252-
yield broker_message
234+
yield message.body
253235

254236
async def shutdown(self) -> None:
255237
"""Close all connections on shutdown."""

tests/test_broker.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from taskiq_aio_pika.broker import AioPikaBroker
1010

1111

12-
async def get_first_task(broker: AioPikaBroker) -> BrokerMessage: # type: ignore
12+
async def get_first_task(broker: AioPikaBroker) -> bytes: # type: ignore
1313
"""
1414
Get first message from the queue.
1515
@@ -36,7 +36,7 @@ async def test_kick_success(broker: AioPikaBroker) -> None:
3636
sent = BrokerMessage(
3737
task_id=task_id,
3838
task_name=task_name,
39-
message="my_msg",
39+
message=b"my_msg",
4040
labels={
4141
"label1": "val1",
4242
},
@@ -46,7 +46,7 @@ async def test_kick_success(broker: AioPikaBroker) -> None:
4646

4747
message = await asyncio.wait_for(get_first_task(broker), timeout=0.4)
4848

49-
assert message == sent
49+
assert message == sent.message
5050

5151

5252
@pytest.mark.anyio
@@ -111,10 +111,7 @@ async def test_listen(
111111

112112
message = await asyncio.wait_for(get_first_task(broker), timeout=0.4)
113113

114-
assert message.message == "test_message"
115-
assert message.labels == {"label1": "label_val"}
116-
assert message.task_id == "test_id"
117-
assert message.task_name == "task_name"
114+
assert message == b"test_message"
118115

119116

120117
@pytest.mark.anyio
@@ -124,7 +121,7 @@ async def test_wrong_format(
124121
test_channel: Channel,
125122
) -> None:
126123
"""
127-
Tests that messages with wrong format are ignored.
124+
Tests that messages with wrong format are still received.
128125
129126
:param broker: aio-pika broker.
130127
:param queue_name: test queue name.
@@ -136,8 +133,9 @@ async def test_wrong_format(
136133
routing_key=queue_name,
137134
)
138135

139-
with pytest.raises(asyncio.TimeoutError):
140-
await asyncio.wait_for(get_first_task(broker), 0.4)
136+
msg_bytes = await asyncio.wait_for(get_first_task(broker), 0.4)
137+
138+
assert msg_bytes == b"wrong"
141139

142140
with pytest.raises(QueueEmpty):
143141
await queue.get()
@@ -168,7 +166,7 @@ async def test_delayed_message(
168166
broker_msg = BrokerMessage(
169167
task_id="1",
170168
task_name="name",
171-
message="message",
169+
message=b"message",
172170
labels={"delay": "2"},
173171
)
174172
await broker.kick(broker_msg)

0 commit comments

Comments
 (0)