Skip to content

Commit 2515ef2

Browse files
authored
Changed listen signature for brokers. (#100)
1 parent d9a5867 commit 2515ef2

File tree

10 files changed

+61
-46
lines changed

10 files changed

+61
-46
lines changed

docs/examples/extending/broker.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ async def shutdown(self) -> None:
2525
return await super().shutdown()
2626

2727
async def kick(self, message: BrokerMessage) -> None:
28-
# Send a message.
28+
# Send a message.message.
2929
pass
3030

31-
async def listen(self) -> AsyncGenerator[BrokerMessage, None]:
31+
async def listen(self) -> AsyncGenerator[bytes, None]:
3232
while True:
3333
# Get new message.
34-
new_message: BrokerMessage = ... # type: ignore
34+
new_message: bytes = ... # type: ignore
3535
# Yield it!
3636
yield new_message

docs/extending-taskiq/broker.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,20 @@ Here is a template for new brokers:
1111

1212
@[code python](../examples/extending/broker.py)
1313

14+
15+
# About kick and listen
16+
17+
The `kick` method takes a `BrokerMessage` as a parameter. The `BrokerMessage` class is a handy helper class for brokers. You can use information from the BrokerMessage to alter the delivery method.
18+
19+
::: warning "cool warning!"
20+
21+
As a broker developer, please send only raw bytes from the `message` field of a BrokerMessage if possible. Serializing it to the string may result in a problem if message bytes are not utf-8 compatible.
22+
23+
:::
24+
25+
26+
The `listen` method should yield raw bytes that were sent over the network.
27+
1428
## Conventions
1529

1630
For brokers, we have several conventions. It's good if your broker implements them.

taskiq/abc/broker.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,14 @@ async def kick(
158158
Using this method tasks are sent to
159159
workers.
160160
161+
You don't need to send broker message. It's helper for brokers,
162+
please send only bytes from message.message.
163+
161164
:param message: name of a task.
162165
"""
163166

164167
@abstractmethod
165-
def listen(self) -> AsyncGenerator[BrokerMessage, None]:
168+
def listen(self) -> AsyncGenerator[bytes, None]:
166169
"""
167170
This function listens to new messages and yields them.
168171

taskiq/abc/formatter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def dumps(self, message: TaskiqMessage) -> BrokerMessage:
1616
"""
1717

1818
@abstractmethod
19-
def loads(self, message: BrokerMessage) -> TaskiqMessage:
19+
def loads(self, message: bytes) -> TaskiqMessage:
2020
"""
2121
Parses broker message to TaskiqMessage.
2222

taskiq/brokers/inmemory_broker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,11 @@ async def kick(self, message: BrokerMessage) -> None:
141141
target_task.original_func,
142142
)
143143

144-
task = asyncio.create_task(self.receiver.callback(message=message))
144+
task = asyncio.create_task(self.receiver.callback(message=message.message))
145145
self._running_tasks.add(task)
146146
task.add_done_callback(self._running_tasks.discard)
147147

148-
def listen(self) -> AsyncGenerator[BrokerMessage, None]:
148+
def listen(self) -> AsyncGenerator[bytes, None]:
149149
"""
150150
Inmemory broker cannot listen.
151151

taskiq/brokers/shared_broker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ async def kick(self, message: BrokerMessage) -> None:
5959
"without setting the default_broker.",
6060
)
6161

62-
async def listen(self) -> AsyncGenerator[BrokerMessage, None]: # type: ignore
62+
async def listen(self) -> AsyncGenerator[bytes, None]: # type: ignore
6363
"""
6464
Shared broker cannot listen to tasks.
6565

taskiq/brokers/zmq_broker.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import math
12
from logging import getLogger
23
from typing import AsyncGenerator, Callable, Optional, TypeVar
34

@@ -58,21 +59,23 @@ async def kick(self, message: BrokerMessage) -> None:
5859
5960
:param message: message to publish.
6061
"""
62+
part_len = 100
63+
parts = [
64+
message.message[
65+
idx * part_len : min(idx * part_len + part_len, len(message.message))
66+
]
67+
for idx in range(math.ceil(len(message.message) / part_len))
68+
]
6169
with self.socket.connect(self.sub_host) as sock:
62-
await sock.send_string(message.json())
70+
await sock.send_multipart(parts)
6371

64-
async def listen(self) -> AsyncGenerator[BrokerMessage, None]:
72+
async def listen(self) -> AsyncGenerator[bytes, None]:
6573
"""
6674
Start accepting new messages.
6775
6876
:yields: incoming messages.
6977
"""
7078
with self.socket.connect(self.sub_host) as sock:
71-
while True:
72-
received_str = await sock.recv_string()
73-
try:
74-
broker_msg = BrokerMessage.parse_raw(received_str)
75-
except ValueError:
76-
logger.warning("Cannot parse received message %s", received_str)
77-
continue
78-
yield broker_msg
79+
while True: # noqa: WPS457
80+
data = await sock.recv_multipart()
81+
yield b"".join(data)

taskiq/formatters/json_formatter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ def dumps(self, message: TaskiqMessage) -> BrokerMessage:
1919
labels=message.labels,
2020
)
2121

22-
def loads(self, message: BrokerMessage) -> TaskiqMessage:
22+
def loads(self, message: bytes) -> TaskiqMessage:
2323
"""
2424
Loads json from message.
2525
2626
:param message: broker's message.
2727
:return: parsed taskiq message.
2828
"""
29-
return TaskiqMessage.parse_raw(message.message)
29+
return TaskiqMessage.parse_raw(message)

taskiq/receiver/receiver.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from taskiq.abc.broker import AsyncBroker
1111
from taskiq.abc.middleware import TaskiqMiddleware
1212
from taskiq.context import Context
13-
from taskiq.message import BrokerMessage, TaskiqMessage
13+
from taskiq.message import TaskiqMessage
1414
from taskiq.receiver.params_parser import parse_params
1515
from taskiq.result import TaskiqResult
1616
from taskiq.state import TaskiqState
@@ -64,7 +64,7 @@ def __init__(
6464

6565
async def callback( # noqa: C901, WPS213
6666
self,
67-
message: BrokerMessage,
67+
message: bytes,
6868
raise_err: bool = False,
6969
) -> None:
7070
"""
@@ -79,17 +79,6 @@ async def callback( # noqa: C901, WPS213
7979
:param raise_err: raise an error if cannot save result in
8080
result_backend.
8181
"""
82-
logger.debug(f"Received message: {message}")
83-
if message.task_name not in self.broker.available_tasks:
84-
logger.warning(
85-
'task "%s" is not found. Maybe you forgot to import it?',
86-
message.task_name,
87-
)
88-
return
89-
logger.debug(
90-
"Function for task %s is resolved. Executing...",
91-
message.task_name,
92-
)
9382
try:
9483
taskiq_msg = self.broker.formatter.loads(message=message)
9584
except Exception as exc:
@@ -100,6 +89,17 @@ async def callback( # noqa: C901, WPS213
10089
exc_info=True,
10190
)
10291
return
92+
logger.debug(f"Received message: {taskiq_msg}")
93+
if taskiq_msg.task_name not in self.broker.available_tasks:
94+
logger.warning(
95+
'task "%s" is not found. Maybe you forgot to import it?',
96+
taskiq_msg.task_name,
97+
)
98+
return
99+
logger.debug(
100+
"Function for task %s is resolved. Executing...",
101+
taskiq_msg.task_name,
102+
)
103103
for middleware in self.broker.middlewares:
104104
if middleware.__class__.pre_execute != TaskiqMiddleware.pre_execute:
105105
taskiq_msg = await maybe_awaitable(
@@ -114,14 +114,14 @@ async def callback( # noqa: C901, WPS213
114114
taskiq_msg.task_id,
115115
)
116116
result = await self.run_task(
117-
target=self.broker.available_tasks[message.task_name].original_func,
117+
target=self.broker.available_tasks[taskiq_msg.task_name].original_func,
118118
message=taskiq_msg,
119119
)
120120
for middleware in self.broker.middlewares:
121121
if middleware.__class__.post_execute != TaskiqMiddleware.post_execute:
122122
await maybe_awaitable(middleware.post_execute(taskiq_msg, result))
123123
try:
124-
await self.broker.result_backend.set_result(message.task_id, result)
124+
await self.broker.result_backend.set_result(taskiq_msg.task_id, result)
125125
for middleware in self.broker.middlewares:
126126
if middleware.__class__.post_save != TaskiqMiddleware.post_save:
127127
await maybe_awaitable(middleware.post_save(taskiq_msg, result))

tests/cli/worker/test_receiver.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from taskiq.abc.middleware import TaskiqMiddleware
1010
from taskiq.abc.result_backend import AsyncResultBackend
1111
from taskiq.brokers.inmemory_broker import InMemoryBroker
12-
from taskiq.message import BrokerMessage, TaskiqMessage
12+
from taskiq.message import TaskiqMessage
1313
from taskiq.receiver import Receiver
1414
from taskiq.result import TaskiqResult
1515

@@ -28,9 +28,9 @@ def __init__(
2828
)
2929
self.to_send: "List[TaskiqMessage]" = []
3030

31-
async def listen(self) -> AsyncGenerator[BrokerMessage, None]:
31+
async def listen(self) -> AsyncGenerator[bytes, None]:
3232
for message in self.to_send:
33-
yield self.formatter.dumps(message)
33+
yield self.formatter.dumps(message).message
3434

3535

3636
def get_receiver(
@@ -186,7 +186,7 @@ async def my_task() -> int:
186186
),
187187
)
188188

189-
await receiver.callback(broker_message)
189+
await receiver.callback(broker_message.message)
190190
assert called_times == 1
191191

192192

@@ -196,12 +196,7 @@ async def test_callback_wrong_format() -> None:
196196
receiver = get_receiver()
197197

198198
await receiver.callback(
199-
BrokerMessage(
200-
task_id="",
201-
task_name="my_task.task_name",
202-
message='{"aaaa": "bbb"}',
203-
labels={},
204-
),
199+
b"{some wrong bytes}",
205200
)
206201

207202

@@ -221,7 +216,7 @@ async def test_callback_unknown_task() -> None:
221216
),
222217
)
223218

224-
await receiver.callback(broker_message)
219+
await receiver.callback(broker_message.message)
225220

226221

227222
@pytest.mark.anyio

0 commit comments

Comments
 (0)