Skip to content

Commit c98b9fe

Browse files
committed
feat(redis): support semantic convention opt-in and emit stable Redis attributes
1 parent 229d969 commit c98b9fe

File tree

1 file changed

+164
-29
lines changed
  • instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis

1 file changed

+164
-29
lines changed

instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py

Lines changed: 164 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,12 @@ def response_hook(span, instance, response):
138138
from opentelemetry.semconv._incubating.attributes.db_attributes import (
139139
DB_STATEMENT,
140140
)
141+
from opentelemetry.instrumentation._semconv import (
142+
_OpenTelemetrySemanticConventionStability,
143+
_OpenTelemetryStabilitySignalType,
144+
_report_new,
145+
_report_old,
146+
)
141147
from opentelemetry.trace import (
142148
StatusCode,
143149
Tracer,
@@ -184,6 +190,16 @@ def response_hook(span, instance, response):
184190

185191
_INSTRUMENTATION_ATTR = "_is_instrumented_by_opentelemetry"
186192

193+
def _get_redis_conn_info(instance):
194+
host, port, db, unix_sock = None, None, None, None
195+
pool = getattr(instance, "connection_pool", None)
196+
if pool:
197+
conn_kwargs = pool.connection_kwargs
198+
host = conn_kwargs.get("host")
199+
port = conn_kwargs.get("port")
200+
db = conn_kwargs.get("db", 0)
201+
unix_sock = conn_kwargs.get("path")
202+
return host, port, db, unix_sock
187203

188204
def _traced_execute_factory(
189205
tracer: Tracer,
@@ -198,21 +214,58 @@ def _traced_execute_command(
198214
) -> R:
199215
query = _format_command_args(args)
200216
name = _build_span_name(instance, args)
217+
semconv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
218+
_OpenTelemetryStabilitySignalType.DATABASE
219+
)
201220
with tracer.start_as_current_span(
202221
name, kind=trace.SpanKind.CLIENT
203222
) as span:
204223
if span.is_recording():
205-
span.set_attribute(DB_STATEMENT, query)
206-
_set_connection_attributes(span, instance)
207-
span.set_attribute("db.redis.args_length", len(args))
224+
# Old/existing attributes:
225+
if _report_old(semconv_opt_in_mode):
226+
span.set_attribute(DB_STATEMENT, query)
227+
_set_connection_attributes(span, instance)
228+
span.set_attribute("db.redis.args_length", len(args))
229+
# New semantic conventions:
230+
if _report_new(semconv_opt_in_mode):
231+
span.set_attribute("db.system.name", "redis")
232+
if args and len(args) > 0:
233+
span.set_attribute("db.operation.name", args[0])
234+
host, port, db, unix_sock = _get_redis_conn_info(instance)
235+
if db is not None:
236+
span.set_attribute("db.namespace", str(db))
237+
span.set_attribute("db.query.text", query)
238+
if host:
239+
span.set_attribute("server.address", host)
240+
span.set_attribute("network.peer.address", host)
241+
if port:
242+
span.set_attribute("server.port", port)
243+
span.set_attribute("network.peer.port", port)
244+
if unix_sock:
245+
span.set_attribute("network.peer.address", unix_sock)
246+
span.set_attribute("network.transport", "unix")
247+
if args and args[0] in ("EVALSHA", "EVAL") and len(args) > 1:
248+
span.set_attribute("db.stored_procedure.name", args[1])
208249
if span.name == "redis.create_index":
209250
_add_create_attributes(span, args)
210251
if callable(request_hook):
211252
request_hook(span, instance, args, kwargs)
212-
response = func(*args, **kwargs)
213-
if span.is_recording():
214-
if span.name == "redis.search":
215-
_add_search_attributes(span, response, args)
253+
try:
254+
response = func(*args, **kwargs)
255+
except Exception as exc:
256+
if _report_new(semconv_opt_in_mode) and span.is_recording():
257+
error_type = getattr(exc, "args", [None])[0]
258+
if error_type and isinstance(error_type, str):
259+
prefix = error_type.split(" ")[0]
260+
span.set_attribute("db.response.status_code", prefix)
261+
span.set_attribute("error.type", prefix)
262+
else:
263+
span.set_attribute("error.type", type(exc).__qualname__)
264+
if span.is_recording():
265+
span.set_status(StatusCode.ERROR)
266+
raise
267+
if span.is_recording() and span.name == "redis.search":
268+
_add_search_attributes(span, response, args)
216269
if callable(response_hook):
217270
response_hook(span, instance, response)
218271
return response
@@ -237,16 +290,35 @@ def _traced_execute_pipeline(
237290
span_name,
238291
) = _build_span_meta_data_for_pipeline(instance)
239292
exception = None
293+
semconv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
294+
_OpenTelemetryStabilitySignalType.DATABASE
295+
)
240296
with tracer.start_as_current_span(
241297
span_name, kind=trace.SpanKind.CLIENT
242298
) as span:
243299
if span.is_recording():
244-
span.set_attribute(DB_STATEMENT, resource)
245-
_set_connection_attributes(span, instance)
246-
span.set_attribute(
247-
"db.redis.pipeline_length", len(command_stack)
248-
)
249-
300+
if _report_old(semconv_opt_in_mode):
301+
span.set_attribute(DB_STATEMENT, resource)
302+
_set_connection_attributes(span, instance)
303+
span.set_attribute("db.redis.pipeline_length", len(command_stack))
304+
if _report_new(semconv_opt_in_mode):
305+
span.set_attribute("db.system.name", "redis")
306+
span.set_attribute("db.operation.name", "PIPELINE")
307+
host, port, db, unix_sock = _get_redis_conn_info(instance)
308+
if db is not None:
309+
span.set_attribute("db.namespace", str(db))
310+
span.set_attribute("db.query.text", resource)
311+
if len(command_stack) > 1:
312+
span.set_attribute("db.operation.batch.size", len(command_stack))
313+
if host:
314+
span.set_attribute("server.address", host)
315+
span.set_attribute("network.peer.address", host)
316+
if port:
317+
span.set_attribute("server.port", port)
318+
span.set_attribute("network.peer.port", port)
319+
if unix_sock:
320+
span.set_attribute("network.peer.address", unix_sock)
321+
span.set_attribute("network.transport", "unix")
250322
response = None
251323
try:
252324
response = func(*args, **kwargs)
@@ -278,17 +350,54 @@ async def _async_traced_execute_command(
278350
) -> Awaitable[R]:
279351
query = _format_command_args(args)
280352
name = _build_span_name(instance, args)
281-
353+
semconv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
354+
_OpenTelemetryStabilitySignalType.DATABASE
355+
)
282356
with tracer.start_as_current_span(
283357
name, kind=trace.SpanKind.CLIENT
284358
) as span:
285359
if span.is_recording():
286-
span.set_attribute(DB_STATEMENT, query)
287-
_set_connection_attributes(span, instance)
288-
span.set_attribute("db.redis.args_length", len(args))
360+
if _report_old(semconv_opt_in_mode):
361+
span.set_attribute(DB_STATEMENT, query)
362+
_set_connection_attributes(span, instance)
363+
span.set_attribute("db.redis.args_length", len(args))
364+
if _report_new(semconv_opt_in_mode):
365+
span.set_attribute("db.system.name", "redis")
366+
if args and len(args) > 0:
367+
span.set_attribute("db.operation.name", args[0])
368+
host, port, db, unix_sock = _get_redis_conn_info(instance)
369+
if db is not None:
370+
span.set_attribute("db.namespace", str(db))
371+
span.set_attribute("db.query.text", query)
372+
if host:
373+
span.set_attribute("server.address", host)
374+
span.set_attribute("network.peer.address", host)
375+
if port:
376+
span.set_attribute("server.port", port)
377+
span.set_attribute("network.peer.port", port)
378+
if unix_sock:
379+
span.set_attribute("network.peer.address", unix_sock)
380+
span.set_attribute("network.transport", "unix")
381+
if args and args[0] in ("EVALSHA", "EVAL") and len(args) > 1:
382+
span.set_attribute("db.stored_procedure.name", args[1])
289383
if callable(request_hook):
290384
request_hook(span, instance, args, kwargs)
291-
response = await func(*args, **kwargs)
385+
try:
386+
response = await func(*args, **kwargs)
387+
except Exception as exc:
388+
if _report_new(semconv_opt_in_mode) and span.is_recording():
389+
error_type = getattr(exc, "args", [None])[0]
390+
if error_type and isinstance(error_type, str):
391+
prefix = error_type.split(" ")[0]
392+
span.set_attribute("db.response.status_code", prefix)
393+
span.set_attribute("error.type", prefix)
394+
else:
395+
span.set_attribute("error.type", type(exc).__qualname__)
396+
if span.is_recording():
397+
span.set_status(StatusCode.ERROR)
398+
raise
399+
if span.is_recording() and span.name == "redis.search":
400+
_add_search_attributes(span, response, args)
292401
if callable(response_hook):
293402
response_hook(span, instance, response)
294403
return response
@@ -314,24 +423,50 @@ async def _async_traced_execute_pipeline(
314423
) = _build_span_meta_data_for_pipeline(instance)
315424

316425
exception = None
317-
426+
semconv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
427+
_OpenTelemetryStabilitySignalType.DATABASE
428+
)
318429
with tracer.start_as_current_span(
319430
span_name, kind=trace.SpanKind.CLIENT
320431
) as span:
321432
if span.is_recording():
322-
span.set_attribute(DB_STATEMENT, resource)
323-
_set_connection_attributes(span, instance)
324-
span.set_attribute(
325-
"db.redis.pipeline_length", len(command_stack)
326-
)
327-
433+
if _report_old(semconv_opt_in_mode):
434+
span.set_attribute(DB_STATEMENT, resource)
435+
_set_connection_attributes(span, instance)
436+
span.set_attribute("db.redis.pipeline_length", len(command_stack))
437+
if _report_new(semconv_opt_in_mode):
438+
span.set_attribute("db.system.name", "redis")
439+
span.set_attribute("db.operation.name", "PIPELINE")
440+
host, port, db, unix_sock = _get_redis_conn_info(instance)
441+
if db is not None:
442+
span.set_attribute("db.namespace", str(db))
443+
span.set_attribute("db.query.text", resource)
444+
if len(command_stack) > 1:
445+
span.set_attribute("db.operation.batch.size", len(command_stack))
446+
if host:
447+
span.set_attribute("server.address", host)
448+
span.set_attribute("network.peer.address", host)
449+
if port:
450+
span.set_attribute("server.port", port)
451+
span.set_attribute("network.peer.port", port)
452+
if unix_sock:
453+
span.set_attribute("network.peer.address", unix_sock)
454+
span.set_attribute("network.transport", "unix")
328455
response = None
329456
try:
330457
response = await func(*args, **kwargs)
331-
except redis.WatchError as watch_exception:
332-
span.set_status(StatusCode.UNSET)
333-
exception = watch_exception
334-
458+
except Exception as exc:
459+
if _report_new(semconv_opt_in_mode) and span.is_recording():
460+
error_type = getattr(exc, "args", [None])[0]
461+
if error_type and isinstance(error_type, str):
462+
prefix = error_type.split(" ")[0]
463+
span.set_attribute("db.response.status_code", prefix)
464+
span.set_attribute("error.type", prefix)
465+
else:
466+
span.set_attribute("error.type", type(exc).__qualname__)
467+
if span.is_recording():
468+
span.set_status(StatusCode.ERROR)
469+
raise
335470
if callable(response_hook):
336471
response_hook(span, instance, response)
337472

0 commit comments

Comments
 (0)