Skip to content
30 changes: 28 additions & 2 deletions taskiq/instrumentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ async def main():


import logging
from functools import partial
from typing import Any, Callable, Collection, Optional
from weakref import WeakSet as _WeakSet

from taskiq.cli.worker.args import WorkerArgs

try:
import opentelemetry # noqa: F401
except ImportError as exc:
Expand All @@ -53,14 +56,31 @@ async def main():
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.metrics import MeterProvider
from opentelemetry.trace import TracerProvider
from wrapt import wrap_function_wrapper
from wrapt import wrap_function_wrapper, wrap_object_attribute

from taskiq import AsyncBroker
from taskiq.cli.worker.process_manager import ProcessManager
from taskiq.middlewares.opentelemetry_middleware import OpenTelemetryMiddleware

logger = logging.getLogger("taskiq.opentelemetry")


def _worker_function_with_sitecustomize(
worker_function: Callable[[WorkerArgs], None],
*args: Any,
**kwargs: Any,
) -> None:
import opentelemetry.instrumentation.auto_instrumentation.sitecustomize # noqa
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import opentelemetry.instrumentation.auto_instrumentation.sitecustomize # noqa
from opentelemetry.instrumentation.auto_instrumentation import initialize
initialize()


return worker_function(*args, **kwargs)


def _worker_function_factory(
worker_function: Callable[[WorkerArgs], None],
) -> Callable[[WorkerArgs], None]:
return partial(_worker_function_with_sitecustomize, worker_function)


class TaskiqInstrumentor(BaseInstrumentor):
"""OpenTelemetry instrumentor for Taskiq."""

Expand Down Expand Up @@ -121,11 +141,17 @@ def broker_init(
self.instrument_broker(broker)
return result

wrap_function_wrapper("taskiq", "AsyncBroker.__init__", broker_init)
wrap_function_wrapper("taskiq.abc.broker", "AsyncBroker.__init__", broker_init)
wrap_object_attribute(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I guess it might be simpler to wrap taskiq.cli.worker.run:start_listen. At least it won't require any partial.

"taskiq.cli.worker.process_manager",
"ProcessManager.worker_function",
_worker_function_factory,
)

def _uninstrument(self, **kwargs: Any) -> None:
instances_to_uninstrument = list(self._instrumented_brokers)
for broker in instances_to_uninstrument:
self.uninstrument_broker(broker)
self._instrumented_brokers.clear()
unwrap(AsyncBroker, "__init__")
delattr(ProcessManager, "worker_function")