Skip to content

fix: Recreate group in case redis flush#2656

Open
pbonneaudiabolocom wants to merge 2 commits intoag2ai:mainfrom
pbonneaudiabolocom:automatic-re-creation-of-redis-group-in-case-of-flush
Open

fix: Recreate group in case redis flush#2656
pbonneaudiabolocom wants to merge 2 commits intoag2ai:mainfrom
pbonneaudiabolocom:automatic-re-creation-of-redis-group-in-case-of-flush

Conversation

@pbonneaudiabolocom
Copy link

@pbonneaudiabolocom pbonneaudiabolocom commented Nov 6, 2025

Description

When using redis with group, if a flush is done by another parties or the stream is destroyed, faststream enter in an infinite loop, trying to get the next element but not finding the expected group.

Fixes #2611

Type of change

Please delete options that are not relevant.

  • Documentation (typos, code examples, or any documentation updates)
  • Bug fix (a non-breaking change that resolves an issue)
  • New feature (a non-breaking change that adds functionality)
  • Breaking change (a fix or feature that would disrupt existing functionality)
  • This change requires a documentation update

Checklist

  • My code adheres to the style guidelines of this project (just lint shows no errors)
  • I have conducted a self-review of my own code
  • I have made the necessary changes to the documentation
  • My changes do not generate any new warnings
  • I have added tests to validate the effectiveness of my fix or the functionality of my new feature
  • Both new and existing unit tests pass successfully on my local environment by running just test-coverage
  • I have ensured that static analysis tests are passing by running just static-analysis
  • I have included code examples to illustrate the modifications

@CLAassistant
Copy link

CLAassistant commented Nov 6, 2025

CLA assistant check
All committers have signed the CLA.

@github-actions github-actions bot added the Redis Issues related to `faststream.redis` module and Redis features label Nov 6, 2025
@pbonneaudiabolocom
Copy link
Author

I added all the casting to pass the linter.
It might be another way as just the next read is returning a coroutine without raising any issue.. just tell me what you'd prefer me to do.

Also, I discivered that this part is not tested at all. (if I insert an exit in that part of the code the tests are running fine.

If you have some test example, I would imagine something such as:

create stream with group
remove that group in redis(external command)
check that the group is recreated + it works fine.

)
except ResponseError as e:
if "NOGROUP" in str(e):
await self._create_group()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change current last_id read position at stream recreation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed we should. The group being reseted, next id should retart from scratch.

I added some tests as well to be sure it was working better

@pbonneaudiabolocom pbonneaudiabolocom force-pushed the automatic-re-creation-of-redis-group-in-case-of-flush branch from befd241 to b70b55f Compare November 7, 2025 10:42
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Nov 7, 2025
@pbonneaudiabolocom
Copy link
Author

My code is entering into conflict with the one from 9998c58

I have no idea what the fix on autoclaim is, so it's very hard to understand how to mix it with my code and see what should happen.

May you help please ? @JonathanSerafini

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this was intended, but your redis PR also includes this change and an unrelated package version change.

...,
],
]:
async def read_from_group() -> tuple[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm thinking at this point you may as well just make def read an async function and do without the nested read_from_group it may make things more readable ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this type annotation is likely the first conflict, you can use ReadResponse as a replacement

Copy link
Contributor

@JonathanSerafini JonathanSerafini Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the other conflict probably stems from the fact that there are now 2 stream group readers in this code now, one for xreadgroup and another for xautoclaim ... i would assuming that if you want to re-create the group for the former you may also want for the latter.

for the xautoclaim handling, i would just start over from 0-0 i.e the first position and it should be able to take it from there.

@pbonneaudiabolocom pbonneaudiabolocom force-pushed the automatic-re-creation-of-redis-group-in-case-of-flush branch from b70b55f to cad2597 Compare November 10, 2025 12:45
@pbonneaudiabolocom
Copy link
Author

Thanks for you feedback.

Thanks to it, I decided to crete a specific function that has to protect any provided method against group removal.

I updated the code accordingly.

Tests are passing.

I still have the collowing linter error, but I don't know how to make them disapeer. I'm totally lost in the return typing used to be honnest.

faststream/redis/subscriber/usecases/stream_subscriber.py:199: error: Incompatible types in assignment (expression has type "tuple[bytes, tuple[tuple[bytes, dict[bytes, bytes]], ...]]", variable has type "str")  [assignment]
faststream/redis/subscriber/usecases/stream_subscriber.py:202: error: Incompatible types in assignment (expression has type "tuple[bytes, tuple[tuple[bytes, dict[bytes, bytes]], ...]]", variable has type "bytes")  [assignment]
faststream/redis/subscriber/usecases/stream_subscriber.py:204: error: Non-overlapping equality check (left operand type: "tuple[bytes, tuple[tuple[bytes, dict[bytes, bytes]], ...]]", right operand type: "Literal[b'0-0']")  [comparison-overlap]
faststream/redis/subscriber/usecases/stream_subscriber.py:208: error: Incompatible return value type (got "tuple[tuple[bytes, tuple[bytes, tuple[tuple[bytes, dict[bytes, bytes]], ...]]]]", expected "tuple[tuple[bytes, tuple[tuple[bytes, dict[bytes, bytes]], ...]], ...]")  [return-value]
faststream/redis/subscriber/usecases/stream_subscriber.py:282: error: Incompatible types in assignment (expression has type "tuple[bytes, tuple[tuple[bytes, dict[bytes, bytes]], ...]]", variable has type "bytes")  [assignment]
faststream/redis/subscriber/usecases/stream_subscriber.py:286: error: Too many values to unpack (1 expected, 2 provided)  [misc]
faststream/redis/subscriber/usecases/stream_subscriber.py:347: error: Incompatible types in assignment (expression has type "tuple[bytes, tuple[tuple[bytes, dict[bytes, bytes]], ...]]", variable has type "bytes")  [assignment]
faststream/redis/subscriber/usecases/stream_subscriber.py:351: error: Too many values to unpack (1 expected, 2 provided)  [misc]

Any idea on how to correct that ?

@pbonneaudiabolocom
Copy link
Author

@JonathanSerafini Would you think it' possible to merge it ?

Last time it was working fine, but the more the system progress, the more I have to redo the job...

@JonathanSerafini
Copy link
Contributor

@pbonneaudiabolocom I'm not the right one to ask, i'm not a maintainer or affiliated with this project.

That being said re:typing, you have to account for the changes you're making. You've created _xautoclaim_call and stated it returns Any. Then wrap it in a decorator _protect_read_from_group_removal that states it returns ReadResponse. This is not actually true, xautoclaim's output is different.

To account for different input functions you might want to look at how to type a 'parametrized decorator'.

@pbonneaudiabolocom pbonneaudiabolocom force-pushed the automatic-re-creation-of-redis-group-in-case-of-flush branch from 182cf0b to c10161b Compare January 7, 2026 09:00
 - adding a test
 - reset properly the id
 - adding another rare case when group is tempered by another system, and IDs gets desynchronized. (AI recommendation)
 - fixing group creation in autoclaim + get_one message and iter.
 - adding a test
 - reset properly the id
 - adding another rare case when group is tempered by another system, and IDs gets desynchronized. (AI recommendation)
 - fixing group creation in autoclaim + get_one message and iter.
@pbonneaudiabolocom
Copy link
Author

pbonneaudiabolocom commented Jan 7, 2026

Hi,

I updated my code to fit the different updates.

I'm struggling with typing with mypy validation.

Honnestly, I have no idea on how to resolve the warning given.

I'm far from being at that level of expertise in typing. help would be welcome.

@powersemmi
Copy link
Contributor

powersemmi commented Jan 25, 2026

@pbonneaudiabolocom
Hi!

I believe the current implementation deviates from the framework's standards. While FastStream supports creating queues on startup (primarily for development convenience), it shouldn't be responsible for managing production infrastructure at runtime. Recreating the stream on the fly effectively hides the underlying infrastructure issue.

The only correct approach here is to fail fast and raise an error indicating that the Redis stream no longer exists. This ensures the service stops predictably, triggering monitoring alerts so we can investigate the incident properly.

Also, it would be great if you could revert uv.lock

@pbonneaudiabolocom
Copy link
Author

Hi,

For me, whenever a system creates something, it is accountable for it.

Then there is 2 solutions:

  • either to repair when something happen (that's the goal of my PR)
  • fail quickly and provides a way to trigger an action on user level.

I'm not against the 2nd approach, but the question then would be, how can we ensure in case a stream is dead that the application will be able to recover somehow.
In the simple case when there is one stream, application will fail, start again, init the group back and it will be ok.
But in case there is multiple publishers / subscribers, failing the full app might have great consequences on other flow ongoing.

It would be appreciable to raise an exception and to allow the user to capture it somehow and to call the group creation back if it's what is desired.
That comes with 2 questions on my side:

  • can we capture / handle exception raised by Faststream somewhere ?
  • can we call the group creation / subscriber restart ?

What I would like to avoid is a group creation done in a separate function, ending up with 2 ways to create group, one in faststream and the othr one in the backup system... more likely to create intrical issues on production.

I could adapt the PR to fit that other approach if you want.

Best regards,
Pierre

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation Redis Issues related to `faststream.redis` module and Redis features

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Bug: When the redis server is killed, faststream can't recover

5 participants