Skip to content

Commit a877aa1

Browse files
committed
wrap worker_function to use sitecustomize
1 parent 4ae2e65 commit a877aa1

File tree

1 file changed

+28
-2
lines changed

1 file changed

+28
-2
lines changed

taskiq/instrumentation.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,12 @@ async def main():
3636

3737

3838
import logging
39+
from functools import partial
3940
from typing import Any, Callable, Collection, Optional
4041
from weakref import WeakSet as _WeakSet
4142

43+
from taskiq.cli.worker.args import WorkerArgs
44+
4245
try:
4346
import opentelemetry # noqa: F401
4447
except ImportError as exc:
@@ -53,14 +56,31 @@ async def main():
5356
from opentelemetry.instrumentation.utils import unwrap
5457
from opentelemetry.metrics import MeterProvider
5558
from opentelemetry.trace import TracerProvider
56-
from wrapt import wrap_function_wrapper
59+
from wrapt import wrap_function_wrapper, wrap_object_attribute
5760

5861
from taskiq import AsyncBroker
62+
from taskiq.cli.worker.process_manager import ProcessManager
5963
from taskiq.middlewares.opentelemetry_middleware import OpenTelemetryMiddleware
6064

6165
logger = logging.getLogger("taskiq.opentelemetry")
6266

6367

68+
def _worker_function_with_sitecustomize(
69+
worker_function: Callable[[WorkerArgs], None],
70+
*args: Any,
71+
**kwargs: Any,
72+
) -> None:
73+
import opentelemetry.instrumentation.auto_instrumentation.sitecustomize # noqa
74+
75+
return worker_function(*args, **kwargs)
76+
77+
78+
def _worker_function_factory(
79+
worker_function: Callable[[WorkerArgs], None],
80+
) -> Callable[[WorkerArgs], None]:
81+
return partial(_worker_function_with_sitecustomize, worker_function)
82+
83+
6484
class TaskiqInstrumentor(BaseInstrumentor):
6585
"""OpenTelemetry instrumentor for Taskiq."""
6686

@@ -121,11 +141,17 @@ def broker_init(
121141
self.instrument_broker(broker)
122142
return result
123143

124-
wrap_function_wrapper("taskiq", "AsyncBroker.__init__", broker_init)
144+
wrap_function_wrapper("taskiq.abc.broker", "AsyncBroker.__init__", broker_init)
145+
wrap_object_attribute(
146+
"taskiq.cli.worker.process_manager",
147+
"ProcessManager.worker_function",
148+
_worker_function_factory,
149+
)
125150

126151
def _uninstrument(self, **kwargs: Any) -> None:
127152
instances_to_uninstrument = list(self._instrumented_brokers)
128153
for broker in instances_to_uninstrument:
129154
self.uninstrument_broker(broker)
130155
self._instrumented_brokers.clear()
131156
unwrap(AsyncBroker, "__init__")
157+
delattr(ProcessManager, "worker_function")

0 commit comments

Comments
 (0)