Skip to content

Commit 8d8c49a

Browse files
committed
feat: Added PubSubSentinelBroker.
1 parent 62c030d commit 8d8c49a

File tree

3 files changed

+71
-1
lines changed

3 files changed

+71
-1
lines changed

taskiq_redis/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@
66
)
77
from taskiq_redis.redis_broker import ListQueueBroker, PubSubBroker
88
from taskiq_redis.redis_cluster_broker import ListQueueClusterBroker
9-
from taskiq_redis.redis_sentinel_broker import ListQueueSentinelBroker
9+
from taskiq_redis.redis_sentinel_broker import (
10+
ListQueueSentinelBroker,
11+
PubSubSentinelBroker,
12+
)
1013
from taskiq_redis.schedule_source import (
1114
RedisClusterScheduleSource,
1215
RedisScheduleSource,
@@ -21,6 +24,7 @@
2124
"PubSubBroker",
2225
"ListQueueClusterBroker",
2326
"ListQueueSentinelBroker",
27+
"PubSubSentinelBroker",
2428
"RedisScheduleSource",
2529
"RedisClusterScheduleSource",
2630
"RedisSentinelScheduleSource",

taskiq_redis/redis_sentinel_broker.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,40 @@ async def _acquire_master_conn(self) -> AsyncIterator[_Redis]:
6666
yield redis_conn
6767

6868

69+
class PubSubSentinelBroker(BaseSentinelBroker):
70+
"""Broker that works with Redis and broadcasts tasks to all workers."""
71+
72+
async def kick(self, message: BrokerMessage) -> None:
73+
"""
74+
Publish message over PUBSUB channel.
75+
76+
:param message: message to send.
77+
"""
78+
queue_name = message.labels.get("queue_name") or self.queue_name
79+
async with self._acquire_master_conn() as redis_conn:
80+
await redis_conn.publish(queue_name, message.message)
81+
82+
async def listen(self) -> AsyncGenerator[bytes, None]:
83+
"""
84+
Listen redis queue for new messages.
85+
86+
This function listens to the pubsub channel
87+
and yields all messages with proper types.
88+
89+
:yields: broker messages.
90+
"""
91+
async with self._acquire_master_conn() as redis_conn:
92+
redis_pubsub_channel = redis_conn.pubsub()
93+
await redis_pubsub_channel.subscribe(self.queue_name)
94+
async for message in redis_pubsub_channel.listen():
95+
if not message:
96+
continue
97+
if message["type"] != "message":
98+
logger.debug("Received non-message from redis: %s", message)
99+
continue
100+
yield message["data"]
101+
102+
69103
class ListQueueSentinelBroker(BaseSentinelBroker):
70104
"""Broker that works with Sentinel and distributes tasks between workers."""
71105

tests/test_broker.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
ListQueueClusterBroker,
1111
ListQueueSentinelBroker,
1212
PubSubBroker,
13+
PubSubSentinelBroker,
1314
)
1415

1516

@@ -176,6 +177,37 @@ async def test_list_queue_cluster_broker(
176177
await broker.shutdown()
177178

178179

180+
@pytest.mark.anyio
181+
async def test_pub_sub_sentinel_broker(
182+
valid_broker_message: BrokerMessage,
183+
redis_sentinels: List[Tuple[str, int]],
184+
redis_sentinel_master_name: str,
185+
) -> None:
186+
"""
187+
Test that messages are published and read correctly by PubSubBroker.
188+
189+
We create two workers that listen and send a message to them.
190+
Expect both workers to receive the same message we sent.
191+
"""
192+
broker = PubSubSentinelBroker(
193+
sentinels=redis_sentinels,
194+
master_name=redis_sentinel_master_name,
195+
queue_name=uuid.uuid4().hex,
196+
)
197+
worker1_task = asyncio.create_task(get_message(broker))
198+
worker2_task = asyncio.create_task(get_message(broker))
199+
await asyncio.sleep(0.3)
200+
201+
await broker.kick(valid_broker_message)
202+
await asyncio.sleep(0.3)
203+
204+
message1 = worker1_task.result()
205+
message2 = worker2_task.result()
206+
assert message1 == valid_broker_message.message
207+
assert message1 == message2
208+
await broker.shutdown()
209+
210+
179211
@pytest.mark.anyio
180212
async def test_list_queue_sentinel_broker(
181213
valid_broker_message: BrokerMessage,

0 commit comments

Comments
 (0)