Skip to content

Commit 9cc92c9

Browse files
authored
Merge branch 'develop' into feat/package_version
2 parents 2fa7799 + 8d18913 commit 9cc92c9

File tree

4 files changed

+22
-17
lines changed

4 files changed

+22
-17
lines changed

poetry.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ keywords = ["taskiq", "tasks", "distributed", "async", "aio-pika"]
2020

2121
[tool.poetry.dependencies]
2222
python = "^3.7"
23-
taskiq = "^0"
23+
taskiq = ">=0.6.0,<1"
2424
aio-pika = "^9.0"
2525
importlib-metadata = {version = "^4.0.0", python = "<3.8"}
2626

taskiq_aio_pika/broker.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@
55

66
from aio_pika import DeliveryMode, ExchangeType, Message, connect_robust
77
from aio_pika.abc import AbstractChannel, AbstractQueue, AbstractRobustConnection
8-
from taskiq.abc.broker import AsyncBroker
9-
from taskiq.abc.result_backend import AsyncResultBackend
10-
from taskiq.message import BrokerMessage
8+
from taskiq import AckableMessage, AsyncBroker, AsyncResultBackend, BrokerMessage
119

1210
_T = TypeVar("_T") # noqa: WPS111
1311

@@ -267,7 +265,7 @@ async def kick(self, message: BrokerMessage) -> None:
267265
routing_key=self._delay_queue_name,
268266
)
269267

270-
async def listen(self) -> AsyncGenerator[bytes, None]:
268+
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
271269
"""
272270
Listen to queue.
273271
@@ -283,5 +281,8 @@ async def listen(self) -> AsyncGenerator[bytes, None]:
283281
queue = await self.declare_queues(self.read_channel)
284282
async with queue.iterator() as iterator:
285283
async for message in iterator:
286-
async with message.process():
287-
yield message.body
284+
yield AckableMessage(
285+
data=message.body,
286+
ack=message.ack,
287+
reject=message.reject,
288+
)

tests/test_broker.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44
import pytest
55
from aio_pika import Channel, Message
66
from aio_pika.exceptions import QueueEmpty
7-
from taskiq import BrokerMessage
7+
from taskiq import AckableMessage, BrokerMessage
8+
from taskiq.utils import maybe_awaitable
89

910
from taskiq_aio_pika.broker import AioPikaBroker
1011

1112

12-
async def get_first_task(broker: AioPikaBroker) -> bytes: # type: ignore
13+
async def get_first_task(broker: AioPikaBroker) -> AckableMessage: # type: ignore
1314
"""
1415
Get first message from the queue.
1516
@@ -46,7 +47,8 @@ async def test_kick_success(broker: AioPikaBroker) -> None:
4647

4748
message = await asyncio.wait_for(get_first_task(broker), timeout=0.4)
4849

49-
assert message == sent.message
50+
assert message.data == sent.message
51+
await maybe_awaitable(message.ack())
5052

5153

5254
@pytest.mark.anyio
@@ -111,7 +113,8 @@ async def test_listen(
111113

112114
message = await asyncio.wait_for(get_first_task(broker), timeout=0.4)
113115

114-
assert message == b"test_message"
116+
assert message.data == b"test_message"
117+
await maybe_awaitable(message.ack())
115118

116119

117120
@pytest.mark.anyio
@@ -133,9 +136,10 @@ async def test_wrong_format(
133136
routing_key=queue_name,
134137
)
135138

136-
msg_bytes = await asyncio.wait_for(get_first_task(broker), 0.4)
139+
message = await asyncio.wait_for(get_first_task(broker), 0.4)
137140

138-
assert msg_bytes == b"wrong"
141+
assert message.data == b"wrong"
142+
await maybe_awaitable(message.ack())
139143

140144
with pytest.raises(QueueEmpty):
141145
await queue.get()

0 commit comments

Comments
 (0)