Skip to content

Commit fbe4b78

Browse files
committed
add opentelemetry instrumentation, suppress false-positive security checks from pre-commit, fix #417
1 parent 9339f9d commit fbe4b78

File tree

12 files changed

+1078
-111
lines changed

12 files changed

+1078
-111
lines changed

docs/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ actions:
1414
head:
1515
- - meta
1616
- name: "google-site-verification"
17-
content: "hQCR5w2tmeuOvYIYXsOYU3u4kLNwT86lnqltANYlRQ0"
17+
content: "hQCR5w2tmeuOvYIYXsOYU3u4kLNwT86lnqltANYlRQ0" # pragma: allowlist secret
1818
- - meta
1919
- name: "msvalidate.01"
20-
content: "97DC185FE0A2F5B123861F0790FDFB26"
20+
content: "97DC185FE0A2F5B123861F0790FDFB26" # pragma: allowlist secret
2121
- - meta
2222
- name: "yandex-verification"
23-
content: "9b105f7c58cbc920"
23+
content: "9b105f7c58cbc920" # pragma: allowlist secret
2424
highlights:
2525
- features:
2626
- title: Production ready

poetry.lock

Lines changed: 206 additions & 108 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ pycron = "^3.0.0"
3636
taskiq_dependencies = ">=1.3.1,<2"
3737
anyio = ">=3"
3838
packaging = ">=19"
39+
# For opentelemetry instrumentation
40+
opentelemetry-api = { version = "^1.38.0", optional = true }
41+
opentelemetry-instrumentation = { version = "^0.59b0", optional = true}
42+
opentelemetry-semantic-conventions = { version = "^0.59b0", optional = true}
3943
# For prometheus metrics
4044
prometheus_client = { version = "^0", optional = true }
4145
# For ZMQBroker
@@ -69,10 +73,12 @@ pytest-mock = "^3.11.1"
6973
tzlocal = "^5.0.1"
7074
types-tzlocal = "^5.0.1.1"
7175
types-pytz = "^2023.3.1.1"
76+
opentelemetry-test-utils = "^0.59b0"
7277

7378
[tool.poetry.extras]
7479
zmq = ["pyzmq"]
7580
uv = ["uvloop"]
81+
opentelemetry = ["opentelemetry-api", "opentelemetry-instrumentation", "opentelemetry-semantic-conventions"]
7682
metrics = ["prometheus_client"]
7783
reload = ["watchdog", "gitignore-parser"]
7884
orjson = ["orjson"]
@@ -86,6 +92,9 @@ taskiq = "taskiq.__main__:main"
8692
worker = "taskiq.cli.worker.cmd:WorkerCMD"
8793
scheduler = "taskiq.cli.scheduler.cmd:SchedulerCMD"
8894

95+
[tool.poetry.plugins.opentelemetry_instrumentor]
96+
taskiq = "taskiq.instrumentation:TaskiqInstrumentor"
97+
8998
[tool.mypy]
9099
strict = true
91100
ignore_missing_imports = true
@@ -174,6 +183,7 @@ line-length = 88
174183
"SLF001", # Private member accessed
175184
"S311", # Standard pseudo-random generators are not suitable for security/cryptographic purposes
176185
"D101", # Missing docstring in public class
186+
"D102", # Missing docstring in public method
177187
]
178188

179189
[tool.ruff.lint.pydocstyle]

taskiq/instrumentation.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
"""
2+
Instrument `taskiq`_ to trace Taskiq applications.
3+
4+
.. _taskiq: https://pypi.org/project/taskiq/
5+
6+
Usage
7+
-----
8+
9+
* Run instrumented task
10+
11+
.. code:: python
12+
13+
import asyncio
14+
15+
from taskiq import InMemoryBroker, TaskiqEvents, TaskiqState
16+
from taskiq.instrumentation import TaskiqInstrumentor
17+
18+
broker = InMemoryBroker()
19+
20+
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
21+
async def startup(state: TaskiqState) -> None:
22+
TaskiqInstrumentor().instrument()
23+
24+
@broker.task
25+
async def add(x, y):
26+
return x + y
27+
28+
async def main():
29+
await broker.startup()
30+
await my_task.kiq(1, 2)
31+
await broker.shutdown()
32+
33+
if __name__ == "__main__":
34+
asyncio.run(main())
35+
36+
API
37+
---
38+
"""
39+
40+
from __future__ import annotations
41+
42+
import logging
43+
from typing import TYPE_CHECKING, Any, Callable, Collection, Optional
44+
from weakref import WeakSet as _WeakSet
45+
46+
try:
47+
import opentelemetry # noqa: F401
48+
except ImportError as exc:
49+
raise ImportError(
50+
"Cannot instrument. Please install 'taskiq[opentelemetry]'.",
51+
) from exc
52+
53+
54+
from opentelemetry.instrumentation.instrumentor import ( # type: ignore[attr-defined]
55+
BaseInstrumentor,
56+
)
57+
from opentelemetry.instrumentation.utils import unwrap
58+
from opentelemetry.metrics import MeterProvider
59+
from opentelemetry.trace import TracerProvider
60+
from wrapt import wrap_function_wrapper
61+
62+
from taskiq import AsyncBroker
63+
from taskiq.middlewares.opentelemetry_middleware import OpenTelemetryMiddleware
64+
65+
if TYPE_CHECKING:
66+
pass
67+
68+
logger = logging.getLogger("taskiq.opentelemetry")
69+
70+
71+
class TaskiqInstrumentor(BaseInstrumentor):
72+
"""OpenTelemetry instrumentor for Taskiq."""
73+
74+
_instrumented_brokers: _WeakSet[AsyncBroker] = _WeakSet()
75+
76+
def __init__(self) -> None:
77+
super().__init__()
78+
self._middleware = None
79+
80+
def instrument_broker(
81+
self,
82+
broker: AsyncBroker,
83+
tracer_provider: Optional[TracerProvider] = None,
84+
meter_provider: Optional[MeterProvider] = None,
85+
) -> None:
86+
"""Instrument broker."""
87+
if not hasattr(broker, "_is_instrumented_by_opentelemetry"):
88+
broker._is_instrumented_by_opentelemetry = False # type: ignore[attr-defined] # noqa: SLF001
89+
90+
if not getattr(broker, "is_instrumented_by_opentelemetry", False):
91+
broker.middlewares.insert(
92+
0,
93+
OpenTelemetryMiddleware(
94+
tracer_provider=tracer_provider,
95+
meter_provider=meter_provider,
96+
),
97+
)
98+
broker._is_instrumented_by_opentelemetry = True # type: ignore[attr-defined] # noqa: SLF001
99+
if broker not in self._instrumented_brokers:
100+
self._instrumented_brokers.add(broker)
101+
else:
102+
logger.warning(
103+
"Attempting to instrument taskiq broker while already instrumented",
104+
)
105+
106+
def uninstrument_broker(self, broker: AsyncBroker) -> None:
107+
"""Uninstrument broker."""
108+
broker.middlewares = [
109+
middleware
110+
for middleware in broker.middlewares
111+
if not isinstance(middleware, OpenTelemetryMiddleware)
112+
]
113+
broker._is_instrumented_by_opentelemetry = False # type: ignore[attr-defined] # noqa: SLF001
114+
self._instrumented_brokers.discard(broker)
115+
116+
def instrumentation_dependencies(self) -> Collection[str]:
117+
"""This function tells which library this instrumentor instruments."""
118+
return ("taskiq >= 0.0.1",)
119+
120+
def _instrument(self, **kwargs: Any) -> None:
121+
def broker_init(
122+
init: Callable[[Any], Any],
123+
broker: AsyncBroker,
124+
args: Any,
125+
kwargs: Any,
126+
) -> None:
127+
result = init(*args, **kwargs)
128+
self.instrument_broker(broker)
129+
return result
130+
131+
wrap_function_wrapper("taskiq", "AsyncBroker.__init__", broker_init)
132+
133+
def _uninstrument(self, **kwargs: Any) -> None:
134+
instances_to_uninstrument = list(self._instrumented_brokers)
135+
for broker in instances_to_uninstrument:
136+
self.uninstrument_broker(broker)
137+
self._instrumented_brokers.clear()
138+
unwrap(AsyncBroker, "__init__")

0 commit comments

Comments
 (0)