11import typing
22import warnings
3+ from collections .abc import Iterable
34from typing import Any , TypeAlias
45
56import anyio
67from faststream ._internal .application import Application
78from faststream .types import SendableMessage
89from taskiq import AsyncBroker
10+ from taskiq .abc .middleware import TaskiqMiddleware
911from taskiq .acks import AckableMessage
1012from taskiq .decor import AsyncTaskiqDecoratedTask
1113
@@ -30,10 +32,22 @@ class BrokerWrapper(AsyncBroker):
3032 task : Register FastStream scheduled task.
3133 """
3234
33- def __init__ (self , broker : Any ) -> None :
35+ def __init__ (
36+ self ,
37+ broker : Any ,
38+ * ,
39+ middlewares : Iterable [TaskiqMiddleware ] = (),
40+ ) -> None :
41+ """Initialize BrokerWrapper.
42+
43+ Args:
44+ broker: FastStream broker instance to wrap.
45+ middlewares: Middlewares to add to the broker.
46+ """
3447 super ().__init__ ()
3548 self .formatter = PatchedFormatter ()
3649 self .broker = broker
50+ self .add_middlewares (* middlewares )
3751
3852 async def startup (self ) -> None :
3953 """Startup wrapped FastStream broker."""
@@ -105,10 +119,22 @@ class AppWrapper(BrokerWrapper):
105119 task : Register FastStream scheduled task.
106120 """
107121
108- def __init__ (self , app : Application ) -> None :
122+ def __init__ (
123+ self ,
124+ app : Application ,
125+ * ,
126+ middlewares : Iterable [TaskiqMiddleware ] = (),
127+ ) -> None :
128+ """Initialize AppWrapper.
129+
130+ Args:
131+ app: FastStream application instance to wrap.
132+ middlewares: Middlewares to add to the broker.
133+ """
109134 super (BrokerWrapper , self ).__init__ ()
110135 self .formatter = PatchedFormatter ()
111136 self .app = app
137+ self .add_middlewares (* middlewares )
112138
113139 async def startup (self ) -> None :
114140 """Startup wrapped FastStream."""
0 commit comments