Skip to content

Commit d6c5a57

Browse files
suiseriffLancetnik
andauthored
Feat/redis pipeline (#2270)
* example tests fail * lint * fix: change incr to len(result) * feat(broker): add 'pipeline' parameter to publish function * fix(tests): piblisher.publish & Event.set * docs: add example redis pipeline * fix: raise exception on both param rpc and pipeline * docs: add redis/pipeline.md * fix: update hint abount type of queue * fix(docs): removed snippets * docs: moved from examples to docs_src * docs: add info about publish_batch * docs: autogenerated * fix: raise in-place exception * feat: add pipeline for publish_batch * fix: get_pipline renamed to get_pipe * add tests for docs_src and publish_batch * docs: generate API References * fix: ruff lint * fix: removed mark.pipe * tests: polish tests examples * tests: 3.8 compatible * docs: polish pipeline docs * docs: generate API References --------- Co-authored-by: Nikita Pastukhov <[email protected]> Co-authored-by: Lancetnik <[email protected]>
1 parent 6a818e3 commit d6c5a57

File tree

14 files changed

+290
-8
lines changed

14 files changed

+290
-8
lines changed

docs/docs/SUMMARY.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ search:
119119
- [Batching](redis/streams/batch.md)
120120
- [Acknowledgement](redis/streams/ack.md)
121121
- [RPC](redis/rpc.md)
122+
- [Pipeline](redis/pipeline.md)
122123
- [Message Information](redis/message.md)
123124
- [Security Configuration](redis/security.md)
124125
- [Reference - Code API](api/index.md)
@@ -1021,6 +1022,8 @@ search:
10211022
- [StreamSub](api/faststream/redis/StreamSub.md)
10221023
- [TestApp](api/faststream/redis/TestApp.md)
10231024
- [TestRedisBroker](api/faststream/redis/TestRedisBroker.md)
1025+
- annotations
1026+
- [get_pipe](api/faststream/redis/annotations/get_pipe.md)
10241027
- broker
10251028
- broker
10261029
- [RedisBroker](api/faststream/redis/broker/broker/RedisBroker.md)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
# 0.5 - API
3+
# 2 - Release
4+
# 3 - Contributing
5+
# 5 - Template Page
6+
# 10 - Default
7+
search:
8+
boost: 0.5
9+
---
10+
11+
::: faststream.redis.annotations.get_pipe

docs/docs/en/redis/pipeline.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
---
2+
# 0.5 - API
3+
# 2 - Release
4+
# 3 - Contributing
5+
# 5 - Template Page
6+
# 10 - Default
7+
search:
8+
boost: 10
9+
---
10+
11+
# Redis Pipeline
12+
13+
**FastStream** supports [**Redis** pipelining](https://redis.io/docs/latest/develop/use/pipelining/){.external-link target="_blank"} to optimize performance when publishing multiple messages in a batch. This allows you to queue several **Redis** operations and execute them in one network round-trip, reducing latency significantly.
14+
15+
## Usage Example
16+
17+
```python linenums="1" hl_lines="2 11 19 22"
18+
{! docs_src/redis/pipeline/pipeline.py !}
19+
```
20+
21+
## API
22+
23+
You can pass the `pipeline` parameter to the `publish` method to delay the execution of **Redis** commands. The commands will only be executed after you explicitly call `#!python await pipe.execute()`.
24+
25+
The `pipeline` object is injected by the `Pipeline` annotation:
26+
27+
```python
28+
from faststream.redis.annotations import Pipeline
29+
```
30+
31+
`Pipeline` is a **Redis** pipeline object (`redis.asyncio.client.Pipeline`), which is wrapped in a FastStream dependency and will be automatically available in any subscriber.
32+
33+
## Batch Publishing with Pipeline
34+
35+
When using `#!python broker.publish_batch()` in combination with the `pipeline` parameter, all messages sent through the pipeline are queued and processed by the subscriber as a single batch after calling `#!python await pipe.execute()`. This allows the subscriber to handle all messages sent through the pipeline in a single execution, improving the efficiency of batch processing.
36+
37+
## Notes
38+
39+
- Pipelining is supported for all **Redis** queue types, including channels, lists, and streams.
40+
- You can combine multiple queue types in a single pipeline.
41+
42+
## Benefits
43+
44+
- Reduces network traffic by batching **Redis** commands.
45+
- Improves performance in high-volume scenarios.
46+
- Fully integrates with **FastStream**'s dependency injection system.
47+
- Allows for efficient batch processing when using `#!python broker.publish_batch()` and `pipeline`, as all messages are processed as a single entity by the subscriber after `#!python await pipe.execute()`.

docs/docs/navigation_template.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ search:
119119
- [Batching](redis/streams/batch.md)
120120
- [Acknowledgement](redis/streams/ack.md)
121121
- [RPC](redis/rpc.md)
122+
- [Pipeline](redis/pipeline.md)
122123
- [Message Information](redis/message.md)
123124
- [Security Configuration](redis/security.md)
124125
- [Reference - Code API](api/index.md)

docs/docs_src/redis/pipeline/__init__.py

Whitespace-only changes.
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from faststream import FastStream, Logger
2+
from faststream.redis import RedisBroker, Pipeline
3+
4+
broker = RedisBroker()
5+
app = FastStream(broker)
6+
7+
@broker.subscriber("test")
8+
async def handle(
9+
msg: str,
10+
logger: Logger,
11+
pipe: Pipeline,
12+
) -> None:
13+
logger.info(msg)
14+
15+
for i in range(10):
16+
await broker.publish(
17+
f"hello {i}",
18+
channel="test-output", # queue can be channel, list, or stream
19+
pipeline=pipe,
20+
)
21+
22+
results = await pipe.execute() # execute all publish commands
23+
logger.info(results)
24+
25+
@app.after_startup
26+
async def t() -> None:
27+
await broker.publish("Hi!", "test")

faststream/redis/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
try:
22
from faststream.testing.app import TestApp
33

4-
from .annotations import Redis, RedisMessage
4+
from .annotations import Pipeline, Redis, RedisMessage
55
from .broker.broker import RedisBroker
66
from .response import RedisResponse
77
from .router import RedisPublisher, RedisRoute, RedisRouter
@@ -15,6 +15,7 @@
1515

1616
__all__ = (
1717
"ListSub",
18+
"Pipeline",
1819
"PubSub",
1920
"Redis",
2021
"RedisBroker",

faststream/redis/annotations.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,28 @@
1-
from redis.asyncio.client import Redis as RedisClient
1+
from typing import TYPE_CHECKING, AsyncIterator
2+
3+
from redis.asyncio.client import Pipeline as _RedisPipeline
4+
from redis.asyncio.client import Redis as _RedisClient
25
from typing_extensions import Annotated
36

7+
from faststream import Depends
48
from faststream.annotations import ContextRepo, Logger, NoCast
59
from faststream.redis.broker.broker import RedisBroker as RB
610
from faststream.redis.message import UnifyRedisMessage
711
from faststream.utils.context import Context
812

13+
if TYPE_CHECKING:
14+
RedisClient = _RedisClient[bytes]
15+
RedisPipeline = _RedisPipeline[bytes]
16+
else:
17+
RedisClient = _RedisClient
18+
RedisPipeline = _RedisPipeline
19+
20+
921
__all__ = (
1022
"ContextRepo",
1123
"Logger",
1224
"NoCast",
25+
"Pipeline",
1326
"Redis",
1427
"RedisBroker",
1528
"RedisMessage",
@@ -18,3 +31,11 @@
1831
RedisMessage = Annotated[UnifyRedisMessage, Context("message")]
1932
RedisBroker = Annotated[RB, Context("broker")]
2033
Redis = Annotated[RedisClient, Context("broker._connection")]
34+
35+
36+
async def get_pipe(redis: Redis) -> AsyncIterator[RedisPipeline]:
37+
async with redis.pipeline() as pipe:
38+
yield pipe
39+
40+
41+
Pipeline = Annotated[RedisPipeline, Depends(get_pipe, cast=False)]

faststream/redis/broker/broker.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import anyio
1818
from anyio import move_on_after
19-
from redis.asyncio.client import Redis
19+
from redis.asyncio.client import Pipeline, Redis
2020
from redis.asyncio.connection import (
2121
Connection,
2222
ConnectionPool,
@@ -451,6 +451,13 @@ async def publish( # type: ignore[override]
451451
"Argument will be removed in **FastStream 0.6.0**."
452452
),
453453
] = False,
454+
pipeline: Annotated[
455+
Optional["Pipeline[bytes]"],
456+
Doc(
457+
"Optional Redis `Pipeline` object to batch multiple commands. "
458+
"Use it to group Redis operations for optimized execution and reduced latency."
459+
),
460+
] = None,
454461
) -> Optional["DecodedMessage"]:
455462
"""Publish message directly.
456463
@@ -472,6 +479,7 @@ async def publish( # type: ignore[override]
472479
rpc=rpc,
473480
rpc_timeout=rpc_timeout,
474481
raise_timeout=raise_timeout,
482+
pipeline=pipeline,
475483
)
476484

477485
@override
@@ -517,6 +525,13 @@ async def publish_batch(
517525
"**correlation_id** is a useful option to trace messages."
518526
),
519527
] = None,
528+
pipeline: Annotated[
529+
Optional["Pipeline[bytes]"],
530+
Doc(
531+
"Optional Redis `Pipeline` object to batch multiple commands. "
532+
"Use it to group Redis operations for optimized execution and reduced latency."
533+
),
534+
] = None,
520535
) -> None:
521536
"""Publish multiple messages to Redis List by one request."""
522537
assert self._producer, NOT_CONNECTED_YET # nosec B101
@@ -532,6 +547,7 @@ async def publish_batch(
532547
*msgs,
533548
list=list,
534549
correlation_id=correlation_id,
550+
pipeline=pipeline,
535551
)
536552

537553
@override

faststream/redis/publisher/producer.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from faststream.utils.nuid import NUID
1414

1515
if TYPE_CHECKING:
16-
from redis.asyncio.client import PubSub, Redis
16+
from redis.asyncio.client import Pipeline, PubSub, Redis
1717

1818
from faststream.broker.types import (
1919
AsyncCallable,
@@ -62,10 +62,17 @@ async def publish( # type: ignore[override]
6262
rpc: bool = False,
6363
rpc_timeout: Optional[float] = 30.0,
6464
raise_timeout: bool = False,
65+
pipeline: Optional["Pipeline[bytes]"] = None,
6566
) -> Optional[Any]:
6667
if not any((channel, list, stream)):
6768
raise SetupError(INCORRECT_SETUP_MSG)
6869

70+
if pipeline is not None and rpc is True:
71+
raise RuntimeError(
72+
"You cannot use both rpc and pipeline arguments at the same time: "
73+
"select only one delivery mechanism."
74+
)
75+
6976
psub: Optional[PubSub] = None
7077
if rpc:
7178
if reply_to:
@@ -83,12 +90,13 @@ async def publish( # type: ignore[override]
8390
correlation_id=correlation_id,
8491
)
8592

93+
conn = pipeline or self._connection
8694
if channel is not None:
87-
await self._connection.publish(channel, msg)
95+
await conn.publish(channel, msg)
8896
elif list is not None:
89-
await self._connection.rpush(list, msg)
97+
await conn.rpush(list, msg)
9098
elif stream is not None:
91-
await self._connection.xadd(
99+
await conn.xadd(
92100
name=stream,
93101
fields={DATA_KEY: msg},
94102
maxlen=maxlen,
@@ -193,6 +201,7 @@ async def publish_batch(
193201
list: str,
194202
correlation_id: str,
195203
headers: Optional["AnyDict"] = None,
204+
pipeline: Optional["Pipeline[bytes]"] = None,
196205
) -> None:
197206
batch = (
198207
RawMessage.encode(
@@ -203,4 +212,5 @@ async def publish_batch(
203212
)
204213
for msg in msgs
205214
)
206-
await self._connection.rpush(list, *batch)
215+
conn = pipeline or self._connection
216+
await conn.rpush(list, *batch)

0 commit comments

Comments
 (0)