Skip to content

Commit 9998c58

Browse files
redis: fix xautoclaim consumption (#2628)
* fixes redis xautoclain * revert
1 parent 671d5bb commit 9998c58

File tree

2 files changed

+61
-101
lines changed

2 files changed

+61
-101
lines changed

faststream/redis/subscriber/usecases/stream_subscriber.py

Lines changed: 48 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
import asyncio
12
import math
2-
from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine
3+
from collections.abc import AsyncIterator, Awaitable, Callable
34
from typing import TYPE_CHECKING, Any, Optional, TypeAlias
45

56
from redis.exceptions import ResponseError
@@ -34,6 +35,21 @@
3435
TopicName: TypeAlias = bytes
3536
Offset: TypeAlias = bytes
3637

38+
ReadResponse = tuple[
39+
tuple[
40+
TopicName,
41+
tuple[
42+
tuple[
43+
Offset,
44+
dict[bytes, bytes],
45+
],
46+
...,
47+
],
48+
],
49+
...,
50+
]
51+
ReadCallable = Callable[[str], Awaitable[ReadResponse]]
52+
3753

3854
class _StreamHandlerMixin(LogicSubscriber):
3955
def __init__(
@@ -80,24 +96,7 @@ async def start(self) -> None:
8096

8197
stream = self.stream_sub
8298

83-
read: Callable[
84-
[str],
85-
Awaitable[
86-
tuple[
87-
tuple[
88-
TopicName,
89-
tuple[
90-
tuple[
91-
Offset,
92-
dict[bytes, bytes],
93-
],
94-
...,
95-
],
96-
],
97-
...,
98-
],
99-
],
100-
]
99+
read: ReadCallable
101100

102101
if stream.group and stream.consumer:
103102
group_create_id = "$" if self.last_id == ">" else self.last_id
@@ -112,81 +111,23 @@ async def start(self) -> None:
112111
if "already exists" not in str(e):
113112
raise
114113

115-
def read(
116-
_: str,
117-
) -> Awaitable[
118-
tuple[
119-
tuple[
120-
TopicName,
121-
tuple[
122-
tuple[
123-
Offset,
124-
dict[bytes, bytes],
125-
],
126-
...,
127-
],
128-
],
129-
...,
130-
],
131-
]:
132-
return client.xreadgroup(
133-
groupname=stream.group,
134-
consumername=stream.consumer,
135-
streams={stream.name: stream.last_id},
136-
count=stream.max_records,
137-
block=stream.polling_interval,
138-
noack=stream.no_ack,
139-
)
140-
141-
elif self.stream_sub.min_idle_time is None:
142-
143-
def read(
144-
last_id: str,
145-
) -> Awaitable[
146-
tuple[
147-
tuple[
148-
TopicName,
149-
tuple[
150-
tuple[
151-
Offset,
152-
dict[bytes, bytes],
153-
],
154-
...,
155-
],
156-
],
157-
...,
158-
],
159-
]:
160-
return client.xread(
161-
{stream.name: last_id},
162-
block=stream.polling_interval,
163-
count=stream.max_records,
164-
)
114+
if stream.min_idle_time is None:
115+
116+
def read(
117+
_: str,
118+
) -> Awaitable[ReadResponse]:
119+
return client.xreadgroup(
120+
groupname=stream.group,
121+
consumername=stream.consumer,
122+
streams={stream.name: stream.last_id},
123+
count=stream.max_records,
124+
block=stream.polling_interval,
125+
noack=stream.no_ack,
126+
)
165127

166-
else:
128+
else:
167129

168-
def read(
169-
_: str,
170-
) -> Coroutine[
171-
Any,
172-
Any,
173-
tuple[
174-
tuple[
175-
TopicName,
176-
tuple[
177-
tuple[
178-
Offset,
179-
dict[bytes, bytes],
180-
],
181-
...,
182-
],
183-
],
184-
...,
185-
],
186-
]:
187-
async def xautoclaim() -> tuple[
188-
tuple[TopicName, tuple[tuple[Offset, dict[bytes, bytes]], ...]], ...
189-
]:
130+
async def read(_: str) -> ReadResponse:
190131
stream_message = await client.xautoclaim(
191132
name=self.stream_sub.name,
192133
groupname=self.stream_sub.group,
@@ -197,13 +138,26 @@ async def xautoclaim() -> tuple[
197138
)
198139
stream_name = self.stream_sub.name.encode()
199140
(next_id, messages, _) = stream_message
141+
200142
# Update start_id for next call
201143
self.autoclaim_start_id = next_id
202-
if not messages:
144+
145+
if next_id == b"0-0" and not messages:
146+
await asyncio.sleep(stream.polling_interval / 1000) # ms to s
203147
return ()
148+
204149
return ((stream_name, messages),)
205150

206-
return xautoclaim()
151+
else:
152+
153+
def read(
154+
last_id: str,
155+
) -> Awaitable[ReadResponse]:
156+
return client.xread(
157+
{stream.name: last_id},
158+
block=stream.polling_interval,
159+
count=stream.max_records,
160+
)
207161

208162
await super().start(read)
209163

tests/brokers/redis/test_autoclaim.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@
44

55
import pytest
66

7-
from faststream.redis import (
8-
StreamSub,
9-
)
7+
from faststream.exceptions import NackMessage
8+
from faststream.redis import StreamSub
109
from tests.brokers.base.consume import BrokerRealConsumeTestcase
1110

1211
from .basic import RedisTestcaseConfig
@@ -27,6 +26,16 @@ async def test_consume_stream_with_min_idle_time(
2726

2827
consume_broker = self.get_broker(apply_types=True)
2928

29+
@consume_broker.subscriber(
30+
stream=StreamSub(
31+
queue,
32+
group="test_group",
33+
consumer="consumer1",
34+
),
35+
)
36+
async def regular(msg: str) -> None:
37+
raise NackMessage
38+
3039
@consume_broker.subscriber(
3140
stream=StreamSub(
3241
queue,
@@ -35,7 +44,7 @@ async def test_consume_stream_with_min_idle_time(
3544
min_idle_time=100, # 100ms
3645
),
3746
)
38-
async def handler(msg: str) -> None:
47+
async def retry(msg: str) -> None:
3948
mock(msg)
4049
event.set()
4150

@@ -45,9 +54,6 @@ async def handler(msg: str) -> None:
4554
# First, publish a message and let it become pending
4655
await br.publish("pending_message", stream=queue)
4756

48-
# Wait a bit to ensure message becomes idle
49-
await asyncio.sleep(0.2)
50-
5157
# The subscriber with XAUTOCLAIM should reclaim it
5258
await asyncio.wait(
5359
(asyncio.create_task(event.wait()),),

0 commit comments

Comments
 (0)