Skip to content

Commit cad2597

Browse files
fix: Recreate group in case redis flush
- adding a test - reset properly the id - adding another rare case when group is tempered by another system, and IDs gets desynchronized. (AI recommendation)
1 parent 9998c58 commit cad2597

File tree

2 files changed

+54
-7
lines changed

2 files changed

+54
-7
lines changed

tests/brokers/redis/test_consume.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -844,6 +844,52 @@ async def handler(msg: RedisStreamMessage) -> None:
844844
assert queue_len == 0, (
845845
f"Redis stream must be empty here, found {queue_len} messages"
846846
)
847+
async def test_consume_from_group(
848+
self,
849+
queue: str,
850+
) -> None:
851+
event = asyncio.Event()
852+
853+
consume_broker = self.get_broker(apply_types=True)
854+
855+
@consume_broker.subscriber(
856+
stream=StreamSub(queue, group="group", consumer=queue),
857+
)
858+
async def handler(msg: RedisMessage) -> None:
859+
event.set()
860+
861+
async with self.patch_broker(consume_broker) as br:
862+
await br.start()
863+
redis_client = br._connection
864+
with (
865+
patch.object(redis_client, "xreadgroup", spy_decorator(redis_client.xreadgroup)) as m_readgroup,
866+
patch.object(redis_client, "xgroup_create", spy_decorator(redis_client.xgroup_create)) as m_group_create
867+
):
868+
await asyncio.wait(
869+
(
870+
asyncio.create_task(br.publish("hello", stream=queue)),
871+
asyncio.create_task(event.wait()),
872+
),
873+
timeout=3,
874+
)
875+
await asyncio.sleep(0.1)
876+
m_readgroup.mock.assert_called_once()
877+
assert event.is_set()
878+
await redis_client.flushall()
879+
event.clear()
880+
await asyncio.sleep(0.1)
881+
await asyncio.wait(
882+
(
883+
asyncio.create_task(br.publish("hello again", stream=queue)),
884+
asyncio.create_task(event.wait()),
885+
),
886+
timeout=3,
887+
)
888+
889+
await asyncio.sleep(0.1)
890+
m_group_create.mock.assert_called_once()
891+
892+
assert event.is_set()
847893

848894
async def test_get_one(
849895
self,

uv.lock

Lines changed: 8 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)