Skip to content

Commit 94d07d3

Browse files
✨ opentelemetry instrument rpc clients (#7642)
1 parent fa9d3a0 commit 94d07d3

File tree

48 files changed

+428
-129
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+428
-129
lines changed

packages/aws-library/requirements/_base.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ opentelemetry-api==1.30.0
145145
# opentelemetry-exporter-otlp-proto-grpc
146146
# opentelemetry-exporter-otlp-proto-http
147147
# opentelemetry-instrumentation
148+
# opentelemetry-instrumentation-aio-pika
148149
# opentelemetry-instrumentation-botocore
149150
# opentelemetry-instrumentation-logging
150151
# opentelemetry-instrumentation-redis
@@ -164,10 +165,13 @@ opentelemetry-exporter-otlp-proto-http==1.30.0
164165
# via opentelemetry-exporter-otlp
165166
opentelemetry-instrumentation==0.51b0
166167
# via
168+
# opentelemetry-instrumentation-aio-pika
167169
# opentelemetry-instrumentation-botocore
168170
# opentelemetry-instrumentation-logging
169171
# opentelemetry-instrumentation-redis
170172
# opentelemetry-instrumentation-requests
173+
opentelemetry-instrumentation-aio-pika==0.51b0
174+
# via -r requirements/../../../packages/service-library/requirements/_base.in
171175
opentelemetry-instrumentation-botocore==0.51b0
172176
# via -r requirements/_base.in
173177
opentelemetry-instrumentation-logging==0.51b0
@@ -431,6 +435,7 @@ wrapt==1.17.2
431435
# aiobotocore
432436
# deprecated
433437
# opentelemetry-instrumentation
438+
# opentelemetry-instrumentation-aio-pika
434439
# opentelemetry-instrumentation-redis
435440
yarl==1.18.3
436441
# via

packages/service-library/requirements/_base.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ arrow # date/time
1818
faststream
1919
opentelemetry-api
2020
opentelemetry-exporter-otlp
21+
opentelemetry-instrumentation-aio-pika
2122
opentelemetry-instrumentation-logging
2223
opentelemetry-instrumentation-redis
2324
opentelemetry-instrumentation-requests

packages/service-library/requirements/_base.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ opentelemetry-api==1.30.0
103103
# opentelemetry-exporter-otlp-proto-grpc
104104
# opentelemetry-exporter-otlp-proto-http
105105
# opentelemetry-instrumentation
106+
# opentelemetry-instrumentation-aio-pika
106107
# opentelemetry-instrumentation-logging
107108
# opentelemetry-instrumentation-redis
108109
# opentelemetry-instrumentation-requests
@@ -120,9 +121,12 @@ opentelemetry-exporter-otlp-proto-http==1.30.0
120121
# via opentelemetry-exporter-otlp
121122
opentelemetry-instrumentation==0.51b0
122123
# via
124+
# opentelemetry-instrumentation-aio-pika
123125
# opentelemetry-instrumentation-logging
124126
# opentelemetry-instrumentation-redis
125127
# opentelemetry-instrumentation-requests
128+
opentelemetry-instrumentation-aio-pika==0.51b0
129+
# via -r requirements/_base.in
126130
opentelemetry-instrumentation-logging==0.51b0
127131
# via -r requirements/_base.in
128132
opentelemetry-instrumentation-redis==0.51b0
@@ -297,6 +301,7 @@ wrapt==1.17.2
297301
# via
298302
# deprecated
299303
# opentelemetry-instrumentation
304+
# opentelemetry-instrumentation-aio-pika
300305
# opentelemetry-instrumentation-redis
301306
yarl==1.18.3
302307
# via

packages/service-library/src/servicelib/aiohttp/tracing.py

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
""" Adds aiohttp middleware for tracing using opentelemetry instrumentation.
2-
3-
"""
1+
"""Adds aiohttp middleware for tracing using opentelemetry instrumentation."""
42

53
import logging
4+
from collections.abc import AsyncIterator, Callable
65

76
from aiohttp import web
87
from opentelemetry import trace
@@ -44,8 +43,15 @@
4443
except ImportError:
4544
HAS_REQUESTS = False
4645

46+
try:
47+
from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor
48+
49+
HAS_AIO_PIKA = True
50+
except ImportError:
51+
HAS_AIO_PIKA = False
52+
4753

48-
def setup_tracing(
54+
def _startup(
4955
app: web.Application,
5056
tracing_settings: TracingSettings,
5157
service_name: str,
@@ -74,7 +80,9 @@ def setup_tracing(
7480
trace.set_tracer_provider(TracerProvider(resource=resource))
7581
tracer_provider: trace.TracerProvider = trace.get_tracer_provider()
7682

77-
tracing_destination: str = f"{URL(opentelemetry_collector_endpoint).with_port(opentelemetry_collector_port).with_path('/v1/traces')}"
83+
tracing_destination: str = (
84+
f"{URL(opentelemetry_collector_endpoint).with_port(opentelemetry_collector_port).with_path('/v1/traces')}"
85+
)
7886

7987
_logger.info(
8088
"Trying to connect service %s to tracing collector at %s.",
@@ -128,3 +136,52 @@ def setup_tracing(
128136
msg="Attempting to add requests opentelemetry autoinstrumentation...",
129137
):
130138
RequestsInstrumentor().instrument()
139+
140+
if HAS_AIO_PIKA:
141+
with log_context(
142+
_logger,
143+
logging.INFO,
144+
msg="Attempting to add aio_pika opentelemetry autoinstrumentation...",
145+
):
146+
AioPikaInstrumentor().instrument()
147+
148+
149+
def _shutdown() -> None:
150+
"""Uninstruments all opentelemetry instrumentors that were instrumented."""
151+
try:
152+
AioHttpClientInstrumentor().uninstrument()
153+
except Exception: # pylint:disable=broad-exception-caught
154+
_logger.exception("Failed to uninstrument AioHttpClientInstrumentor")
155+
if HAS_AIOPG:
156+
try:
157+
AiopgInstrumentor().uninstrument()
158+
except Exception: # pylint:disable=broad-exception-caught
159+
_logger.exception("Failed to uninstrument AiopgInstrumentor")
160+
if HAS_BOTOCORE:
161+
try:
162+
BotocoreInstrumentor().uninstrument()
163+
except Exception: # pylint:disable=broad-exception-caught
164+
_logger.exception("Failed to uninstrument BotocoreInstrumentor")
165+
if HAS_REQUESTS:
166+
try:
167+
RequestsInstrumentor().uninstrument()
168+
except Exception: # pylint:disable=broad-exception-caught
169+
_logger.exception("Failed to uninstrument RequestsInstrumentor")
170+
if HAS_AIO_PIKA:
171+
try:
172+
AioPikaInstrumentor().uninstrument()
173+
except Exception: # pylint:disable=broad-exception-caught
174+
_logger.exception("Failed to uninstrument AioPikaInstrumentor")
175+
176+
177+
def get_tracing_lifespan(
178+
app: web.Application, tracing_settings: TracingSettings, service_name: str
179+
) -> Callable[[web.Application], AsyncIterator]:
180+
_startup(app=app, tracing_settings=tracing_settings, service_name=service_name)
181+
182+
async def tracing_lifespan(app: web.Application):
183+
assert app # nosec
184+
yield
185+
_shutdown()
186+
187+
return tracing_lifespan

packages/service-library/src/servicelib/fastapi/tracing.py

Lines changed: 89 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
""" Adds fastapi middleware for tracing using opentelemetry instrumentation.
2-
3-
"""
1+
"""Adds fastapi middleware for tracing using opentelemetry instrumentation."""
42

53
import logging
4+
from collections.abc import AsyncIterator
65

76
from fastapi import FastAPI
7+
from fastapi_lifespan_manager import State
88
from httpx import AsyncClient, Client
99
from opentelemetry import trace
1010
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
@@ -60,10 +60,17 @@
6060
except ImportError:
6161
HAS_REQUESTS = False
6262

63+
try:
64+
from opentelemetry.instrumentation.aio_pika.aio_pika_instrumentor import (
65+
AioPikaInstrumentor,
66+
)
6367

64-
def initialize_tracing(
65-
app: FastAPI, tracing_settings: TracingSettings, service_name: str
66-
) -> None:
68+
HAS_AIOPIKA_INSTRUMENTOR = True
69+
except ImportError:
70+
HAS_AIOPIKA_INSTRUMENTOR = False
71+
72+
73+
def _startup(tracing_settings: TracingSettings, service_name: str) -> None:
6774
if (
6875
not tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT
6976
and not tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_PORT
@@ -80,7 +87,9 @@ def initialize_tracing(
8087
f"{tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT}"
8188
)
8289

83-
tracing_destination: str = f"{URL(opentelemetry_collector_endpoint).with_port(tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_PORT).with_path('/v1/traces')}"
90+
tracing_destination: str = (
91+
f"{URL(opentelemetry_collector_endpoint).with_port(tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_PORT).with_path('/v1/traces')}"
92+
)
8493

8594
_logger.info(
8695
"Trying to connect service %s to opentelemetry tracing collector at %s.",
@@ -91,16 +100,22 @@ def initialize_tracing(
91100
otlp_exporter = OTLPSpanExporterHTTP(endpoint=tracing_destination)
92101
span_processor = BatchSpanProcessor(otlp_exporter)
93102
global_tracer_provider.add_span_processor(span_processor)
94-
# Instrument FastAPI
95-
FastAPIInstrumentor().instrument_app(app)
96103

104+
FastAPIInstrumentor().instrument()
97105
if HAS_AIOPG:
98106
with log_context(
99107
_logger,
100108
logging.INFO,
101109
msg="Attempting to add asyncpg opentelemetry autoinstrumentation...",
102110
):
103111
AiopgInstrumentor().instrument()
112+
if HAS_AIOPIKA_INSTRUMENTOR:
113+
with log_context(
114+
_logger,
115+
logging.INFO,
116+
msg="Attempting to add aio_pika opentelemetry autoinstrumentation...",
117+
):
118+
AioPikaInstrumentor().instrument()
104119
if HAS_ASYNCPG:
105120
with log_context(
106121
_logger,
@@ -131,5 +146,70 @@ def initialize_tracing(
131146
RequestsInstrumentor().instrument()
132147

133148

149+
def _shutdown() -> None:
150+
"""Uninstruments all opentelemetry instrumentors that were instrumented."""
151+
FastAPIInstrumentor().uninstrument()
152+
if HAS_AIOPG:
153+
try:
154+
AiopgInstrumentor().uninstrument()
155+
except Exception: # pylint:disable=broad-exception-caught
156+
_logger.exception("Failed to uninstrument AiopgInstrumentor")
157+
if HAS_AIOPIKA_INSTRUMENTOR:
158+
try:
159+
AioPikaInstrumentor().uninstrument()
160+
except Exception: # pylint:disable=broad-exception-caught
161+
_logger.exception("Failed to uninstrument AioPikaInstrumentor")
162+
if HAS_ASYNCPG:
163+
try:
164+
AsyncPGInstrumentor().uninstrument()
165+
except Exception: # pylint:disable=broad-exception-caught
166+
_logger.exception("Failed to uninstrument AsyncPGInstrumentor")
167+
if HAS_REDIS:
168+
try:
169+
RedisInstrumentor().uninstrument()
170+
except Exception: # pylint:disable=broad-exception-caught
171+
_logger.exception("Failed to uninstrument RedisInstrumentor")
172+
if HAS_BOTOCORE:
173+
try:
174+
BotocoreInstrumentor().uninstrument()
175+
except Exception: # pylint:disable=broad-exception-caught
176+
_logger.exception("Failed to uninstrument BotocoreInstrumentor")
177+
if HAS_REQUESTS:
178+
try:
179+
RequestsInstrumentor().uninstrument()
180+
except Exception: # pylint:disable=broad-exception-caught
181+
_logger.exception("Failed to uninstrument RequestsInstrumentor")
182+
183+
134184
def setup_httpx_client_tracing(client: AsyncClient | Client):
135185
HTTPXClientInstrumentor.instrument_client(client)
186+
187+
188+
def setup_tracing(
189+
app: FastAPI, tracing_settings: TracingSettings, service_name: str
190+
) -> None:
191+
192+
_startup(tracing_settings=tracing_settings, service_name=service_name)
193+
194+
def _on_shutdown() -> None:
195+
_shutdown()
196+
197+
app.add_event_handler("shutdown", _on_shutdown)
198+
199+
200+
def get_tracing_instrumentation_lifespan(
201+
tracing_settings: TracingSettings, service_name: str
202+
):
203+
204+
_startup(tracing_settings=tracing_settings, service_name=service_name)
205+
206+
async def tracing_instrumentation_lifespan(
207+
app: FastAPI,
208+
) -> AsyncIterator[State]:
209+
assert app # nosec
210+
211+
yield {}
212+
213+
_shutdown()
214+
215+
return tracing_instrumentation_lifespan

packages/service-library/src/servicelib/rabbitmq/_client_base.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,5 +70,4 @@ async def ping(self) -> bool:
7070
return False
7171

7272
@abstractmethod
73-
async def close(self) -> None:
74-
...
73+
async def close(self) -> None: ...

packages/service-library/tests/aiohttp/test_tracing.py

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from aiohttp import web
1212
from aiohttp.test_utils import TestClient
1313
from pydantic import ValidationError
14-
from servicelib.aiohttp.tracing import setup_tracing
14+
from servicelib.aiohttp.tracing import get_tracing_lifespan
1515
from settings_library.tracing import TracingSettings
1616

1717

@@ -24,14 +24,23 @@ def tracing_settings_in(request):
2424
def set_and_clean_settings_env_vars(
2525
monkeypatch: pytest.MonkeyPatch, tracing_settings_in
2626
):
27+
endpoint_mocked = False
2728
if tracing_settings_in[0]:
29+
endpoint_mocked = True
2830
monkeypatch.setenv(
2931
"TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT", f"{tracing_settings_in[0]}"
3032
)
33+
port_mocked = False
3134
if tracing_settings_in[1]:
35+
port_mocked = True
3236
monkeypatch.setenv(
3337
"TRACING_OPENTELEMETRY_COLLECTOR_PORT", f"{tracing_settings_in[1]}"
3438
)
39+
yield
40+
if endpoint_mocked:
41+
monkeypatch.delenv("TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT")
42+
if port_mocked:
43+
monkeypatch.delenv("TRACING_OPENTELEMETRY_COLLECTOR_PORT")
3544

3645

3746
@pytest.mark.parametrize(
@@ -50,11 +59,10 @@ async def test_valid_tracing_settings(
5059
app = web.Application()
5160
service_name = "simcore_service_webserver"
5261
tracing_settings = TracingSettings()
53-
setup_tracing(
54-
app,
55-
service_name=service_name,
56-
tracing_settings=tracing_settings,
57-
)
62+
async for _ in get_tracing_lifespan(
63+
app, service_name=service_name, tracing_settings=tracing_settings
64+
)(app):
65+
pass
5866

5967

6068
@pytest.mark.parametrize(
@@ -128,14 +136,15 @@ async def test_tracing_setup_package_detection(
128136
app = web.Application()
129137
service_name = "simcore_service_webserver"
130138
tracing_settings = TracingSettings()
131-
setup_tracing(
132-
app,
133-
service_name=service_name,
134-
tracing_settings=tracing_settings,
135-
)
136-
# idempotency
137-
setup_tracing(
139+
async for _ in get_tracing_lifespan(
138140
app,
139141
service_name=service_name,
140142
tracing_settings=tracing_settings,
141-
)
143+
)(app):
144+
# idempotency
145+
async for _ in get_tracing_lifespan(
146+
app,
147+
service_name=service_name,
148+
tracing_settings=tracing_settings,
149+
)(app):
150+
pass

0 commit comments

Comments
 (0)