Skip to content

Commit 4c3343f

Browse files
committed
feat: Added ListQueueSentinelBroker.
1 parent d84e179 commit 4c3343f

File tree

5 files changed

+188
-2
lines changed

5 files changed

+188
-2
lines changed

docker-compose.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,24 @@ services:
5858
REDIS_CLUSTER_CREATOR: "yes"
5959
ports:
6060
- 7001:6379
61+
62+
redis-master:
63+
image: bitnami/redis:6.2.5
64+
environment:
65+
ALLOW_EMPTY_PASSWORD: "yes"
66+
healthcheck:
67+
test: ["CMD", "redis-cli", "ping"]
68+
interval: 5s
69+
timeout: 5s
70+
retries: 3
71+
start_period: 10s
72+
73+
redis-sentinel:
74+
image: bitnami/redis-sentinel:latest
75+
depends_on:
76+
- redis-master
77+
environment:
78+
ALLOW_EMPTY_PASSWORD: "yes"
79+
REDIS_MASTER_HOST: "redis-master"
80+
ports:
81+
- 7002:26379

taskiq_redis/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
)
66
from taskiq_redis.redis_broker import ListQueueBroker, PubSubBroker
77
from taskiq_redis.redis_cluster_broker import ListQueueClusterBroker
8+
from taskiq_redis.redis_sentinel_broker import ListQueueSentinelBroker
89
from taskiq_redis.schedule_source import (
910
RedisClusterScheduleSource,
1011
RedisScheduleSource,
@@ -16,6 +17,7 @@
1617
"ListQueueBroker",
1718
"PubSubBroker",
1819
"ListQueueClusterBroker",
20+
"ListQueueSentinelBroker",
1921
"RedisScheduleSource",
2022
"RedisClusterScheduleSource",
2123
]

taskiq_redis/redis_sentinel_broker.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import sys
2+
from contextlib import asynccontextmanager
3+
from logging import getLogger
4+
from typing import (
5+
TYPE_CHECKING,
6+
Any,
7+
AsyncGenerator,
8+
AsyncIterator,
9+
Callable,
10+
List,
11+
Optional,
12+
Tuple,
13+
TypeVar,
14+
)
15+
16+
from redis.asyncio import Redis, Sentinel
17+
from taskiq import AsyncResultBackend, BrokerMessage
18+
from taskiq.abc.broker import AsyncBroker
19+
20+
if sys.version_info >= (3, 10):
21+
from typing import TypeAlias
22+
else:
23+
from typing_extensions import TypeAlias
24+
25+
if TYPE_CHECKING:
26+
_Redis: TypeAlias = Redis[bytes]
27+
else:
28+
_Redis: TypeAlias = Redis
29+
30+
_T = TypeVar("_T")
31+
32+
logger = getLogger("taskiq.redis_sentinel_broker")
33+
34+
35+
class BaseSentinelBroker(AsyncBroker):
36+
"""Base broker that works with Sentinel."""
37+
38+
def __init__(
39+
self,
40+
sentinels: List[Tuple[str, int]],
41+
master_name: str,
42+
result_backend: Optional[AsyncResultBackend[_T]] = None,
43+
task_id_generator: Optional[Callable[[], str]] = None,
44+
queue_name: str = "taskiq",
45+
min_other_sentinels: int = 0,
46+
sentinel_kwargs: Optional[Any] = None,
47+
**connection_kwargs: Any,
48+
) -> None:
49+
super().__init__(
50+
result_backend=result_backend,
51+
task_id_generator=task_id_generator,
52+
)
53+
54+
self.sentinel = Sentinel(
55+
sentinels=sentinels,
56+
min_other_sentinels=min_other_sentinels,
57+
sentinel_kwargs=sentinel_kwargs,
58+
**connection_kwargs,
59+
)
60+
self.master_name = master_name
61+
self.queue_name = queue_name
62+
63+
@asynccontextmanager
64+
async def _acquire_master_conn(self) -> AsyncIterator[_Redis]:
65+
async with self.sentinel.master_for(self.master_name) as redis_conn:
66+
yield redis_conn
67+
68+
69+
class ListQueueSentinelBroker(BaseSentinelBroker):
70+
"""Broker that works with Sentinel and distributes tasks between workers."""
71+
72+
async def kick(self, message: BrokerMessage) -> None:
73+
"""
74+
Put a message in a list.
75+
76+
This method appends a message to the list of all messages.
77+
78+
:param message: message to append.
79+
"""
80+
queue_name = message.labels.get("queue_name") or self.queue_name
81+
async with self._acquire_master_conn() as redis_conn:
82+
await redis_conn.lpush(queue_name, message.message)
83+
84+
async def listen(self) -> AsyncGenerator[bytes, None]:
85+
"""
86+
Listen redis queue for new messages.
87+
88+
This function listens to the queue
89+
and yields new messages if they have BrokerMessage type.
90+
91+
:yields: broker messages.
92+
"""
93+
redis_brpop_data_position = 1
94+
async with self._acquire_master_conn() as redis_conn:
95+
while True:
96+
yield (await redis_conn.brpop(self.queue_name))[
97+
redis_brpop_data_position
98+
]

tests/conftest.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
from typing import List, Tuple
23

34
import pytest
45

@@ -40,3 +41,33 @@ def redis_cluster_url() -> str:
4041
:return: URL string.
4142
"""
4243
return os.environ.get("TEST_REDIS_CLUSTER_URL", "redis://localhost:7001")
44+
45+
46+
@pytest.fixture
47+
def redis_sentinels() -> List[Tuple[str, int]]:
48+
"""
49+
List of redis sentinel hosts.
50+
51+
It tries to get it from environ,
52+
and return default one if the variable is
53+
not set.
54+
55+
:return: list of host and port pairs.
56+
"""
57+
sentinels = os.environ.get("TEST_REDIS_SENTINELS", "localhost:7002")
58+
host, _, port = sentinels.partition(":")
59+
return [(host, int(port))]
60+
61+
62+
@pytest.fixture
63+
def redis_sentinel_master_name() -> str:
64+
"""
65+
Redis sentinel master name.
66+
67+
It tries to get it from environ,
68+
and return default one if the variable is
69+
not set.
70+
71+
:return: redis sentinel master name string.
72+
"""
73+
return os.environ.get("TEST_REDIS_SENTINEL_MASTER_NAME", "mymaster")

tests/test_broker.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
import asyncio
22
import uuid
3-
from typing import Union
3+
from typing import List, Tuple, Union
44

55
import pytest
66
from taskiq import AckableMessage, AsyncBroker, BrokerMessage
77

8-
from taskiq_redis import ListQueueBroker, ListQueueClusterBroker, PubSubBroker
8+
from taskiq_redis import (
9+
ListQueueBroker,
10+
ListQueueClusterBroker,
11+
ListQueueSentinelBroker,
12+
PubSubBroker,
13+
)
914

1015

1116
def test_no_url_should_raise_typeerror() -> None:
@@ -169,3 +174,32 @@ async def test_list_queue_cluster_broker(
169174
assert worker_task.result() == valid_broker_message.message
170175
worker_task.cancel()
171176
await broker.shutdown()
177+
178+
179+
@pytest.mark.anyio
180+
async def test_list_queue_sentinel_broker(
181+
valid_broker_message: BrokerMessage,
182+
redis_sentinels: List[Tuple[str, int]],
183+
redis_sentinel_master_name: str,
184+
) -> None:
185+
"""
186+
Test that messages are published and read correctly by ListQueueSentinelBroker.
187+
188+
We create two workers that listen and send a message to them.
189+
Expect only one worker to receive the same message we sent.
190+
"""
191+
broker = ListQueueSentinelBroker(
192+
sentinels=redis_sentinels,
193+
master_name=redis_sentinel_master_name,
194+
queue_name=uuid.uuid4().hex,
195+
)
196+
worker_task = asyncio.create_task(get_message(broker))
197+
await asyncio.sleep(0.3)
198+
199+
await broker.kick(valid_broker_message)
200+
await asyncio.sleep(0.3)
201+
202+
assert worker_task.done()
203+
assert worker_task.result() == valid_broker_message.message
204+
worker_task.cancel()
205+
await broker.shutdown()

0 commit comments

Comments
 (0)