Skip to content

Commit b70b55f

Browse files
fix: Recreate group in case redis flush
- adding a test - reset properly the id - adding another rare case when group is tempered by another system, and IDs gets desynchronized. (AI recommendation)
1 parent 0cdaaac commit b70b55f

File tree

3 files changed

+76
-12
lines changed

3 files changed

+76
-12
lines changed

faststream/redis/subscriber/usecases/stream_subscriber.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,11 @@ async def _consume(self, *args: Any, start_signal: "Event") -> None:
6969
start_signal.set()
7070
await super()._consume(*args, start_signal=start_signal)
7171

72-
async def _create_group(self) -> None:
73-
group_create_id = "$" if self.last_id == ">" else self.last_id
72+
async def _create_group(self, reset_counter:bool = False) -> None:
73+
if reset_counter:
74+
group_create_id = "0"
75+
else:
76+
group_create_id = "$" if self.last_id == ">" else self.last_id
7477
try:
7578
await self._client.xgroup_create(
7679
name=self.stream_sub.name,
@@ -155,8 +158,22 @@ async def read_from_group() -> tuple[
155158
noack=stream.no_ack,
156159
)
157160
except ResponseError as e:
158-
if "NOGROUP" in str(e):
159-
await self._create_group()
161+
err_msg = str(e)
162+
known_error:bool = False
163+
if "NOGROUP" in err_msg:
164+
# most likely redis was flushed, so we need to reset our group
165+
await self._create_group(reset_counter=True)
166+
# Important: reset our internal position too
167+
stream.last_id = ">"
168+
known_error = True
169+
if (
170+
"smaller than the first available entry" in err_msg
171+
or "greater than the maximum id" in err_msg
172+
):
173+
# group was modified by third party and we need to reset our position to an existing id
174+
stream.last_id = "$"
175+
known_error = True
176+
if known_error:
160177
return await client.xreadgroup( # type: ignore[no-any-return]
161178
groupname=stream.group,
162179
consumername=stream.consumer,
@@ -165,7 +182,7 @@ async def read_from_group() -> tuple[
165182
block=stream.polling_interval,
166183
noack=stream.no_ack,
167184
)
168-
raise
185+
raise e
169186

170187
return read_from_group()
171188

tests/brokers/redis/test_consume.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -844,6 +844,52 @@ async def handler(msg: RedisStreamMessage) -> None:
844844
assert queue_len == 0, (
845845
f"Redis stream must be empty here, found {queue_len} messages"
846846
)
847+
async def test_consume_from_group(
848+
self,
849+
queue: str,
850+
) -> None:
851+
event = asyncio.Event()
852+
853+
consume_broker = self.get_broker(apply_types=True)
854+
855+
@consume_broker.subscriber(
856+
stream=StreamSub(queue, group="group", consumer=queue),
857+
)
858+
async def handler(msg: RedisMessage) -> None:
859+
event.set()
860+
861+
async with self.patch_broker(consume_broker) as br:
862+
await br.start()
863+
redis_client = br._connection
864+
with (
865+
patch.object(redis_client, "xreadgroup", spy_decorator(redis_client.xreadgroup)) as m_readgroup,
866+
patch.object(redis_client, "xgroup_create", spy_decorator(redis_client.xgroup_create)) as m_group_create
867+
):
868+
await asyncio.wait(
869+
(
870+
asyncio.create_task(br.publish("hello", stream=queue)),
871+
asyncio.create_task(event.wait()),
872+
),
873+
timeout=3,
874+
)
875+
await asyncio.sleep(0.1)
876+
m_readgroup.mock.assert_called_once()
877+
assert event.is_set()
878+
await redis_client.flushall()
879+
event.clear()
880+
await asyncio.sleep(0.1)
881+
await asyncio.wait(
882+
(
883+
asyncio.create_task(br.publish("hello again", stream=queue)),
884+
asyncio.create_task(event.wait()),
885+
),
886+
timeout=3,
887+
)
888+
889+
await asyncio.sleep(0.1)
890+
m_group_create.mock.assert_called_once()
891+
892+
assert event.is_set()
847893

848894
async def test_get_one(
849895
self,

uv.lock

Lines changed: 8 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)