4
4
import pytest
5
5
from aio_pika import Channel , Message
6
6
from aio_pika .exceptions import QueueEmpty
7
- from taskiq import BrokerMessage
7
+ from taskiq import AckableMessage , BrokerMessage
8
+ from taskiq .utils import maybe_awaitable
8
9
9
10
from taskiq_aio_pika .broker import AioPikaBroker
10
11
11
12
12
- async def get_first_task (broker : AioPikaBroker ) -> bytes : # type: ignore
13
+ async def get_first_task (broker : AioPikaBroker ) -> AckableMessage : # type: ignore
13
14
"""
14
15
Get first message from the queue.
15
16
@@ -46,7 +47,8 @@ async def test_kick_success(broker: AioPikaBroker) -> None:
46
47
47
48
message = await asyncio .wait_for (get_first_task (broker ), timeout = 0.4 )
48
49
49
- assert message == sent .message
50
+ assert message .data == sent .message
51
+ await maybe_awaitable (message .ack ())
50
52
51
53
52
54
@pytest .mark .anyio
@@ -111,7 +113,8 @@ async def test_listen(
111
113
112
114
message = await asyncio .wait_for (get_first_task (broker ), timeout = 0.4 )
113
115
114
- assert message == b"test_message"
116
+ assert message .data == b"test_message"
117
+ await maybe_awaitable (message .ack ())
115
118
116
119
117
120
@pytest .mark .anyio
@@ -133,9 +136,10 @@ async def test_wrong_format(
133
136
routing_key = queue_name ,
134
137
)
135
138
136
- msg_bytes = await asyncio .wait_for (get_first_task (broker ), 0.4 )
139
+ message = await asyncio .wait_for (get_first_task (broker ), 0.4 )
137
140
138
- assert msg_bytes == b"wrong"
141
+ assert message .data == b"wrong"
142
+ await maybe_awaitable (message .ack ())
139
143
140
144
with pytest .raises (QueueEmpty ):
141
145
await queue .get ()
0 commit comments