Skip to content

Commit b047675

Browse files
authored
deps: bump taskiq to 0.12 and support OTEL (#121) (#124)
2 parents eaec6d9 + 3b492c3 commit b047675

File tree

3 files changed

+95
-4
lines changed

3 files changed

+95
-4
lines changed

README.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ pip install taskiq-faststream[nats]
4646
pip install taskiq-faststream[redis]
4747
```
4848

49+
For **OpenTelemetry** distributed tracing support:
50+
51+
```bash
52+
pip install taskiq-faststream[otel]
53+
```
54+
4955
## Usage
5056

5157
The package gives you two classes: `AppWrapper` and `BrokerWrapper`
@@ -141,3 +147,37 @@ taskiq_broker.task(
141147
...,
142148
)
143149
```
150+
151+
## OpenTelemetry Support
152+
153+
**taskiq-faststream** supports distributed tracing with OpenTelemetry. To enable it, install the `otel` extra and pass `enable_otel=True` when creating the broker wrapper:
154+
155+
```python
156+
from faststream.nats import NatsBroker
157+
from taskiq_faststream import BrokerWrapper
158+
159+
broker = NatsBroker()
160+
161+
# Enable OpenTelemetry middleware
162+
taskiq_broker = BrokerWrapper(broker, enable_otel=True)
163+
```
164+
165+
This will automatically add OpenTelemetry middleware to track task execution, providing insights into:
166+
- Task execution spans
167+
- Task dependencies and call chains
168+
- Performance metrics
169+
- Error tracking
170+
171+
Make sure to configure your OpenTelemetry exporter (e.g., Jaeger, Zipkin) according to your monitoring setup.
172+
173+
The same applies to `AppWrapper`:
174+
175+
```python
176+
from faststream import FastStream
177+
from taskiq_faststream import AppWrapper
178+
179+
app = FastStream(broker)
180+
181+
# Enable OpenTelemetry middleware
182+
taskiq_broker = AppWrapper(app, enable_otel=True)
183+
```

pyproject.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "taskiq-faststream"
3-
version = "0.3.2"
3+
version = "0.4.0"
44
description = "FastStream - taskiq integration to schedule FastStream tasks"
55
readme = "README.md"
66
license = "MIT"
@@ -40,7 +40,7 @@ classifiers = [
4040

4141

4242
dependencies = [
43-
"taskiq>=0.11.0,<0.12.0",
43+
"taskiq>=0.12.1,<0.13.0",
4444
"faststream>=0.3.14,<0.7",
4545
]
4646

@@ -65,6 +65,10 @@ redis = [
6565
"faststream[redis]"
6666
]
6767

68+
otel = [
69+
"taskiq[opentelemetry]>=0.12.1,<0.13.0"
70+
]
71+
6872
[dependency-groups]
6973
test = [
7074
"taskiq-faststream[nats]",

taskiq_faststream/broker.py

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@
1313
from taskiq_faststream.types import ScheduledTask
1414
from taskiq_faststream.utils import resolve_msg
1515

16+
try:
17+
from taskiq.middlewares.otel_middleware import OpenTelemetryMiddleware
18+
except ImportError:
19+
OpenTelemetryMiddleware = None # type: ignore[assignment,misc]
20+
1621
PublishParameters: TypeAlias = typing.Any
1722

1823

@@ -30,11 +35,32 @@ class BrokerWrapper(AsyncBroker):
3035
task : Register FastStream scheduled task.
3136
"""
3237

33-
def __init__(self, broker: Any) -> None:
38+
def __init__(
39+
self,
40+
broker: Any,
41+
*,
42+
enable_otel: bool = False,
43+
) -> None:
44+
"""Initialize BrokerWrapper.
45+
46+
Args:
47+
broker: FastStream broker instance to wrap.
48+
enable_otel: Enable OpenTelemetry middleware for distributed tracing.
49+
Requires taskiq[otel] to be installed.
50+
"""
3451
super().__init__()
3552
self.formatter = PatchedFormatter()
3653
self.broker = broker
3754

55+
if enable_otel:
56+
if OpenTelemetryMiddleware is None:
57+
msg = (
58+
"OpenTelemetry middleware requires taskiq[otel] to be installed. "
59+
"Install it with: pip install taskiq-faststream[otel]"
60+
)
61+
raise ImportError(msg)
62+
self.middlewares.append(OpenTelemetryMiddleware())
63+
3864
async def startup(self) -> None:
3965
"""Startup wrapped FastStream broker."""
4066
await super().startup()
@@ -105,11 +131,32 @@ class AppWrapper(BrokerWrapper):
105131
task : Register FastStream scheduled task.
106132
"""
107133

108-
def __init__(self, app: Application) -> None:
134+
def __init__(
135+
self,
136+
app: Application,
137+
*,
138+
enable_otel: bool = False,
139+
) -> None:
140+
"""Initialize AppWrapper.
141+
142+
Args:
143+
app: FastStream application instance to wrap.
144+
enable_otel: Enable OpenTelemetry middleware for distributed tracing.
145+
Requires taskiq[otel] to be installed.
146+
"""
109147
super(BrokerWrapper, self).__init__()
110148
self.formatter = PatchedFormatter()
111149
self.app = app
112150

151+
if enable_otel:
152+
if OpenTelemetryMiddleware is None:
153+
msg = (
154+
"OpenTelemetry middleware requires taskiq[otel] to be installed. "
155+
"Install it with: pip install taskiq-faststream[otel]"
156+
)
157+
raise ImportError(msg)
158+
self.middlewares.append(OpenTelemetryMiddleware())
159+
113160
async def startup(self) -> None:
114161
"""Startup wrapped FastStream."""
115162
await super(BrokerWrapper, self).startup()

0 commit comments

Comments
 (0)