We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent 2368b5f commit 182cf0bCopy full SHA for 182cf0b
faststream/redis/subscriber/usecases/stream_subscriber.py
@@ -219,7 +219,7 @@ def read(
219
220
await super().start(read)
221
222
- async def _get_one_message(self, timeout: float) -> None:
+ async def _get_one_message(self, timeout: float) -> Optional[ReadResponse]:
223
if self.stream_sub.group and self.stream_sub.consumer:
224
def _readgroup_call() -> Awaitable[ReadResponse]:
225
return self._client.xreadgroup(
@@ -274,6 +274,8 @@ def _autoclaim_call() -> Awaitable[Any]:
274
stream=self.stream_sub,
275
)
276
stream_message = await protected_autoclaim()
277
+ if not stream_message:
278
+ return None
279
280
(next_id, messages, _) = stream_message
281
# Update start_id for next call
0 commit comments