Skip to content

Commit df015e5

Browse files
committed
Initial stream support.
1 parent 090a00a commit df015e5

File tree

10 files changed

+951
-665
lines changed

10 files changed

+951
-665
lines changed

docker-compose.yml

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1-
version: '3.2'
2-
31
services:
42
redis:
5-
image: bitnami/redis:6.2.5
3+
image: bitnami/redis:7.4.2
64
environment:
75
ALLOW_EMPTY_PASSWORD: "yes"
86
healthcheck:
@@ -14,7 +12,7 @@ services:
1412
ports:
1513
- 7000:6379
1614
redis-node-0: &redis-node
17-
image: docker.io/bitnami/redis-cluster:7.2
15+
image: docker.io/bitnami/redis-cluster:7.4.2
1816
environment:
1917
ALLOW_EMPTY_PASSWORD: "yes"
2018
REDIS_NODES: "redis-node-0 redis-node-1 redis-node-2 redis-node-3 redis-node-4 redis-node-5"
@@ -38,7 +36,7 @@ services:
3836
<<: *redis-node
3937

4038
redis-node-5:
41-
image: docker.io/bitnami/redis-cluster:7.2
39+
image: docker.io/bitnami/redis-cluster:7.4.2
4240
depends_on:
4341
- redis-node-0
4442
- redis-node-1
@@ -60,7 +58,7 @@ services:
6058
- 7001:6379
6159

6260
redis-master:
63-
image: bitnami/redis:6.2.5
61+
image: bitnami/redis:7.4.2
6462
environment:
6563
ALLOW_EMPTY_PASSWORD: "yes"
6664
healthcheck:
@@ -71,7 +69,7 @@ services:
7169
start_period: 10s
7270

7371
redis-sentinel:
74-
image: bitnami/redis-sentinel:latest
72+
image: bitnami/redis-sentinel:7.4.2
7573
depends_on:
7674
- redis-master
7775
environment:

poetry.lock

Lines changed: 504 additions & 619 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,21 @@ keywords = [
2525
]
2626

2727
[tool.poetry.dependencies]
28-
python = "^3.8.1"
29-
taskiq = ">=0.11.1,<1"
28+
python = "^3.9"
29+
taskiq = ">=0.11.12,<1"
3030
redis = "^5"
3131

3232
[tool.poetry.group.dev.dependencies]
33-
pytest = "^7.0"
33+
pytest = "^8"
3434
mypy = "^1"
35-
black = "^22.3.0"
36-
pytest-cov = "^3.0.0"
37-
anyio = "^3.6.1"
38-
pytest-env = "^0.6.2"
35+
black = "^25"
36+
pytest-cov = "^6"
37+
anyio = "^4"
38+
pytest-env = "^1"
3939
fakeredis = "^2"
40-
pre-commit = "^2.20.0"
41-
pytest-xdist = { version = "^2.5.0", extras = ["psutil"] }
40+
pre-commit = "^4"
41+
pytest-xdist = { version = "^3", extras = ["psutil"] }
4242
ruff = "^0"
43-
types-redis = "^4.6.0.20240425"
4443

4544
[tool.mypy]
4645
strict = true
@@ -65,7 +64,7 @@ build-backend = "poetry.core.masonry.api"
6564
[tool.ruff]
6665
# List of enabled rulsets.
6766
# See https://docs.astral.sh/ruff/rules/ for more information.
68-
select = [
67+
lint.select = [
6968
"E", # Error
7069
"F", # Pyflakes
7170
"W", # Pycodestyle
@@ -92,24 +91,22 @@ select = [
9291
"PL", # PyLint checks
9392
"RUF", # Specific to Ruff checks
9493
]
95-
ignore = [
94+
lint.ignore = [
9695
"D105", # Missing docstring in magic method
9796
"D107", # Missing docstring in __init__
9897
"D212", # Multi-line docstring summary should start at the first line
9998
"D401", # First line should be in imperative mood
10099
"D104", # Missing docstring in public package
101100
"D100", # Missing docstring in public module
102-
"ANN102", # Missing type annotation for self in method
103-
"ANN101", # Missing type annotation for argument
104101
"ANN401", # typing.Any are disallowed in `**kwargs
105102
"PLR0913", # Too many arguments for function call
106103
"D106", # Missing docstring in public nested class
107104
]
108105
exclude = [".venv/"]
109-
mccabe = { max-complexity = 10 }
106+
lint.mccabe = { max-complexity = 10 }
110107
line-length = 88
111108

112-
[tool.ruff.per-file-ignores]
109+
[tool.ruff.lint.per-file-ignores]
113110
"tests/*" = [
114111
"S101", # Use of assert detected
115112
"S301", # Use of pickle detected
@@ -119,12 +116,12 @@ line-length = 88
119116
"D101", # Missing docstring in public class
120117
]
121118

122-
[tool.ruff.pydocstyle]
119+
[tool.ruff.lint.pydocstyle]
123120
convention = "pep257"
124121
ignore-decorators = ["typing.overload"]
125122

126-
[tool.ruff.pylint]
123+
[tool.ruff.lint.pylint]
127124
allow-magic-value-types = ["int", "str", "float"]
128125

129-
[tool.ruff.flake8-bugbear]
126+
[tool.ruff.lint.flake8-bugbear]
130127
extend-immutable-calls = ["taskiq_dependencies.Depends", "taskiq.TaskiqDepends"]

taskiq_redis/__init__.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
"""Package for redis integration."""
2+
23
from taskiq_redis.redis_backend import (
34
RedisAsyncClusterResultBackend,
45
RedisAsyncResultBackend,
56
RedisAsyncSentinelResultBackend,
67
)
7-
from taskiq_redis.redis_broker import ListQueueBroker, PubSubBroker
8-
from taskiq_redis.redis_cluster_broker import ListQueueClusterBroker
8+
from taskiq_redis.redis_broker import ListQueueBroker, PubSubBroker, RedisStreamBroker
9+
from taskiq_redis.redis_cluster_broker import (
10+
ListQueueClusterBroker,
11+
RedisStreamClusterBroker,
12+
)
913
from taskiq_redis.redis_sentinel_broker import (
1014
ListQueueSentinelBroker,
1115
PubSubSentinelBroker,
16+
RedisStreamSentinelBroker,
1217
)
1318
from taskiq_redis.schedule_source import (
1419
RedisClusterScheduleSource,
@@ -17,15 +22,18 @@
1722
)
1823

1924
__all__ = [
20-
"RedisAsyncClusterResultBackend",
21-
"RedisAsyncResultBackend",
22-
"RedisAsyncSentinelResultBackend",
2325
"ListQueueBroker",
24-
"PubSubBroker",
2526
"ListQueueClusterBroker",
2627
"ListQueueSentinelBroker",
28+
"PubSubBroker",
2729
"PubSubSentinelBroker",
28-
"RedisScheduleSource",
30+
"RedisAsyncClusterResultBackend",
31+
"RedisAsyncResultBackend",
32+
"RedisAsyncSentinelResultBackend",
2933
"RedisClusterScheduleSource",
34+
"RedisScheduleSource",
3035
"RedisSentinelScheduleSource",
36+
"RedisStreamBroker",
37+
"RedisStreamClusterBroker",
38+
"RedisStreamSentinelBroker",
3139
]

taskiq_redis/exceptions.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,16 @@ class TaskIQRedisError(TaskiqError):
88
class DuplicateExpireTimeSelectedError(ResultBackendError, TaskIQRedisError):
99
"""Error if two lifetimes are selected."""
1010

11+
__template__ = "Choose either result_ex_time or result_px_time."
12+
1113

1214
class ExpireTimeMustBeMoreThanZeroError(ResultBackendError, TaskIQRedisError):
1315
"""Error if two lifetimes are less or equal zero."""
1416

17+
__template__ = (
18+
"You must select one expire time param and it must be more than zero."
19+
)
20+
1521

1622
class ResultIsMissingError(TaskIQRedisError, ResultGetError):
1723
"""Error if there is no result when trying to get it."""

taskiq_redis/redis_backend.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
from redis.asyncio.cluster import RedisCluster
1717
from redis.asyncio.connection import Connection
1818
from taskiq import AsyncResultBackend
19-
from taskiq.abc.result_backend import TaskiqResult
2019
from taskiq.abc.serializer import TaskiqSerializer
2120
from taskiq.compat import model_dump, model_validate
2221
from taskiq.depends.progress_tracker import TaskProgress
22+
from taskiq.result import TaskiqResult
2323
from taskiq.serializers import PickleSerializer
2424

2525
from taskiq_redis.exceptions import (
@@ -92,14 +92,10 @@ def __init__(
9292
),
9393
)
9494
if unavailable_conditions:
95-
raise ExpireTimeMustBeMoreThanZeroError(
96-
"You must select one expire time param and it must be more than zero.",
97-
)
95+
raise ExpireTimeMustBeMoreThanZeroError
9896

9997
if self.result_ex_time and self.result_px_time:
100-
raise DuplicateExpireTimeSelectedError(
101-
"Choose either result_ex_time or result_px_time.",
102-
)
98+
raise DuplicateExpireTimeSelectedError
10399

104100
def _task_name(self, task_id: str) -> str:
105101
if self.prefix_str is None:

taskiq_redis/redis_broker.py

Lines changed: 121 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,19 @@
11
import sys
2+
import uuid
23
from logging import getLogger
3-
from typing import TYPE_CHECKING, Any, AsyncGenerator, Callable, Optional, TypeVar
4-
5-
from redis.asyncio import BlockingConnectionPool, Connection, Redis
4+
from typing import (
5+
TYPE_CHECKING,
6+
Any,
7+
AsyncGenerator,
8+
Awaitable,
9+
Callable,
10+
Dict,
11+
Optional,
12+
TypeVar,
13+
)
14+
15+
from redis.asyncio import BlockingConnectionPool, Connection, Redis, ResponseError
16+
from taskiq import AckableMessage
617
from taskiq.abc.broker import AsyncBroker
718
from taskiq.abc.result_backend import AsyncResultBackend
819
from taskiq.message import BrokerMessage
@@ -132,3 +143,110 @@ async def listen(self) -> AsyncGenerator[bytes, None]:
132143
except ConnectionError as exc:
133144
logger.warning("Redis connection error: %s", exc)
134145
continue
146+
147+
148+
class RedisStreamBroker(BaseRedisBroker):
149+
"""
150+
Redis broker that uses streams for task distribution.
151+
152+
You can read more about streams here:
153+
https://redis.io/docs/latest/develop/data-types/streams
154+
155+
This broker supports acknowledgment of messages.
156+
"""
157+
158+
def __init__(
159+
self,
160+
url: str,
161+
queue_name: str = "taskiq",
162+
max_connection_pool_size: Optional[int] = None,
163+
consumer_group_name: str = "taskiq",
164+
consumer_name: Optional[str] = None,
165+
consumer_id: str = "$",
166+
mkstream: bool = True,
167+
xread_block: int = 10000,
168+
additional_streams: Optional[Dict[str, str]] = None,
169+
**connection_kwargs: Any,
170+
) -> None:
171+
super().__init__(
172+
url,
173+
task_id_generator=None,
174+
result_backend=None,
175+
queue_name=queue_name,
176+
max_connection_pool_size=max_connection_pool_size,
177+
**connection_kwargs,
178+
)
179+
self.consumer_group_name = consumer_group_name
180+
self.consumer_name = consumer_name or str(uuid.uuid4())
181+
self.consumer_id = consumer_id
182+
self.mkstream = mkstream
183+
self.block = xread_block
184+
self.additional_streams = additional_streams or {}
185+
186+
async def _declare_consumer_group(self) -> None:
187+
"""
188+
Declare consumber group.
189+
190+
Required for proper work of the broker.
191+
"""
192+
streams = {self.queue_name, *self.additional_streams.keys()}
193+
async with Redis(connection_pool=self.connection_pool) as redis_conn:
194+
for stream_name in streams:
195+
try:
196+
await redis_conn.xgroup_create(
197+
stream_name,
198+
self.consumer_group_name,
199+
id=self.consumer_id,
200+
mkstream=self.mkstream,
201+
)
202+
except ResponseError as err:
203+
logger.debug(err)
204+
205+
async def startup(self) -> None:
206+
"""Declare consumer group on startup."""
207+
await super().startup()
208+
await self._declare_consumer_group()
209+
210+
async def kick(self, message: BrokerMessage) -> None:
211+
"""
212+
Put a message in a list.
213+
214+
This method appends a message to the list of all messages.
215+
216+
:param message: message to append.
217+
"""
218+
async with Redis(connection_pool=self.connection_pool) as redis_conn:
219+
await redis_conn.xadd(self.queue_name, {b"data": message.message})
220+
221+
def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]:
222+
async def _ack() -> None:
223+
async with Redis(connection_pool=self.connection_pool) as redis_conn:
224+
await redis_conn.xack(
225+
self.queue_name,
226+
self.consumer_group_name,
227+
id,
228+
)
229+
230+
return _ack
231+
232+
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
233+
"""Listen to incoming messages."""
234+
async with Redis(connection_pool=self.connection_pool) as redis_conn:
235+
while True:
236+
fetched = await redis_conn.xreadgroup(
237+
self.consumer_group_name,
238+
self.consumer_name,
239+
{
240+
self.queue_name: ">",
241+
**self.additional_streams,
242+
},
243+
block=self.block,
244+
noack=False,
245+
)
246+
for _, msg_list in fetched:
247+
for msg_id, msg in msg_list:
248+
logger.debug("Received message: %s", msg)
249+
yield AckableMessage(
250+
data=msg[b"data"],
251+
ack=self._ack_generator(msg_id),
252+
)

0 commit comments

Comments
 (0)