Skip to content

Commit a14150d

Browse files
committed
monkeypatch start_listen
1 parent 1b2107b commit a14150d

File tree

1 file changed

+13
-24
lines changed

1 file changed

+13
-24
lines changed

taskiq/instrumentation.py

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ async def main():
3636

3737

3838
import logging
39-
from functools import partial
4039
from typing import Any, Callable, Collection, Optional
4140
from weakref import WeakSet as _WeakSet
4241

@@ -56,34 +55,23 @@ async def main():
5655
from opentelemetry.instrumentation.utils import unwrap
5756
from opentelemetry.metrics import MeterProvider
5857
from opentelemetry.trace import TracerProvider
59-
from wrapt import wrap_function_wrapper, wrap_object_attribute
58+
from wrapt import wrap_function_wrapper
6059

60+
import taskiq.cli.worker.run
6161
from taskiq import AsyncBroker
62-
from taskiq.cli.worker.process_manager import ProcessManager
6362
from taskiq.middlewares.opentelemetry_middleware import OpenTelemetryMiddleware
6463

6564
logger = logging.getLogger("taskiq.opentelemetry")
6665

6766

68-
def _worker_function_with_initialize(
69-
worker_function: Callable[[WorkerArgs], None],
70-
*args: Any,
71-
**kwargs: Any,
72-
) -> None:
73-
initialize()
74-
return worker_function(*args, **kwargs)
75-
76-
77-
def _worker_function_factory(
78-
worker_function: Callable[[WorkerArgs], None],
79-
) -> Callable[[WorkerArgs], None]:
80-
return partial(_worker_function_with_initialize, worker_function)
81-
82-
8367
class TaskiqInstrumentor(BaseInstrumentor):
8468
"""OpenTelemetry instrumentor for Taskiq."""
8569

8670
_instrumented_brokers: _WeakSet[AsyncBroker] = _WeakSet()
71+
_original_start_listen: Callable[
72+
[WorkerArgs],
73+
None,
74+
] = taskiq.cli.worker.run.start_listen
8775

8876
def __init__(self) -> None:
8977
super().__init__()
@@ -129,6 +117,11 @@ def instrumentation_dependencies(self) -> Collection[str]:
129117
"""This function tells which library this instrumentor instruments."""
130118
return ("taskiq >= 0.0.1",)
131119

120+
@classmethod
121+
def _start_listen_with_initialize(cls, args: WorkerArgs) -> None:
122+
initialize()
123+
cls._original_start_listen(args)
124+
132125
def _instrument(self, **kwargs: Any) -> None:
133126
def broker_init(
134127
init: Callable[[Any], Any],
@@ -141,16 +134,12 @@ def broker_init(
141134
return result
142135

143136
wrap_function_wrapper("taskiq.abc.broker", "AsyncBroker.__init__", broker_init)
144-
wrap_object_attribute(
145-
"taskiq.cli.worker.process_manager",
146-
"ProcessManager.worker_function",
147-
_worker_function_factory,
148-
)
137+
taskiq.cli.worker.run.start_listen = self._start_listen_with_initialize
149138

150139
def _uninstrument(self, **kwargs: Any) -> None:
151140
instances_to_uninstrument = list(self._instrumented_brokers)
152141
for broker in instances_to_uninstrument:
153142
self.uninstrument_broker(broker)
154143
self._instrumented_brokers.clear()
155144
unwrap(AsyncBroker, "__init__")
156-
delattr(ProcessManager, "worker_function")
145+
taskiq.cli.worker.run.start_listen = self._original_start_listen # type: ignore[assignment]

0 commit comments

Comments
 (0)