From c98b9fef9ce978ae35fd90ed4df7a6cf59f66240 Mon Sep 17 00:00:00 2001 From: Lukasz Ciukaj Date: Fri, 10 Oct 2025 00:21:41 -0400 Subject: [PATCH 1/6] feat(redis): support semantic convention opt-in and emit stable Redis attributes --- .../instrumentation/redis/__init__.py | 193 +++++++++++++++--- 1 file changed, 164 insertions(+), 29 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py index ef61ecec2e..a9fb9b4327 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py @@ -138,6 +138,12 @@ def response_hook(span, instance, response): from opentelemetry.semconv._incubating.attributes.db_attributes import ( DB_STATEMENT, ) +from opentelemetry.instrumentation._semconv import ( + _OpenTelemetrySemanticConventionStability, + _OpenTelemetryStabilitySignalType, + _report_new, + _report_old, +) from opentelemetry.trace import ( StatusCode, Tracer, @@ -184,6 +190,16 @@ def response_hook(span, instance, response): _INSTRUMENTATION_ATTR = "_is_instrumented_by_opentelemetry" +def _get_redis_conn_info(instance): + host, port, db, unix_sock = None, None, None, None + pool = getattr(instance, "connection_pool", None) + if pool: + conn_kwargs = pool.connection_kwargs + host = conn_kwargs.get("host") + port = conn_kwargs.get("port") + db = conn_kwargs.get("db", 0) + unix_sock = conn_kwargs.get("path") + return host, port, db, unix_sock def _traced_execute_factory( tracer: Tracer, @@ -198,21 +214,58 @@ def _traced_execute_command( ) -> R: query = _format_command_args(args) name = _build_span_name(instance, args) + semconv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.DATABASE + ) with tracer.start_as_current_span( name, kind=trace.SpanKind.CLIENT ) as span: if span.is_recording(): - span.set_attribute(DB_STATEMENT, query) - _set_connection_attributes(span, instance) - span.set_attribute("db.redis.args_length", len(args)) + # Old/existing attributes: + if _report_old(semconv_opt_in_mode): + span.set_attribute(DB_STATEMENT, query) + _set_connection_attributes(span, instance) + span.set_attribute("db.redis.args_length", len(args)) + # New semantic conventions: + if _report_new(semconv_opt_in_mode): + span.set_attribute("db.system.name", "redis") + if args and len(args) > 0: + span.set_attribute("db.operation.name", args[0]) + host, port, db, unix_sock = _get_redis_conn_info(instance) + if db is not None: + span.set_attribute("db.namespace", str(db)) + span.set_attribute("db.query.text", query) + if host: + span.set_attribute("server.address", host) + span.set_attribute("network.peer.address", host) + if port: + span.set_attribute("server.port", port) + span.set_attribute("network.peer.port", port) + if unix_sock: + span.set_attribute("network.peer.address", unix_sock) + span.set_attribute("network.transport", "unix") + if args and args[0] in ("EVALSHA", "EVAL") and len(args) > 1: + span.set_attribute("db.stored_procedure.name", args[1]) if span.name == "redis.create_index": _add_create_attributes(span, args) if callable(request_hook): request_hook(span, instance, args, kwargs) - response = func(*args, **kwargs) - if span.is_recording(): - if span.name == "redis.search": - _add_search_attributes(span, response, args) + try: + response = func(*args, **kwargs) + except Exception as exc: + if _report_new(semconv_opt_in_mode) and span.is_recording(): + error_type = getattr(exc, "args", [None])[0] + if error_type and isinstance(error_type, str): + prefix = error_type.split(" ")[0] + span.set_attribute("db.response.status_code", prefix) + span.set_attribute("error.type", prefix) + else: + span.set_attribute("error.type", type(exc).__qualname__) + if span.is_recording(): + span.set_status(StatusCode.ERROR) + raise + if span.is_recording() and span.name == "redis.search": + _add_search_attributes(span, response, args) if callable(response_hook): response_hook(span, instance, response) return response @@ -237,16 +290,35 @@ def _traced_execute_pipeline( span_name, ) = _build_span_meta_data_for_pipeline(instance) exception = None + semconv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.DATABASE + ) with tracer.start_as_current_span( span_name, kind=trace.SpanKind.CLIENT ) as span: if span.is_recording(): - span.set_attribute(DB_STATEMENT, resource) - _set_connection_attributes(span, instance) - span.set_attribute( - "db.redis.pipeline_length", len(command_stack) - ) - + if _report_old(semconv_opt_in_mode): + span.set_attribute(DB_STATEMENT, resource) + _set_connection_attributes(span, instance) + span.set_attribute("db.redis.pipeline_length", len(command_stack)) + if _report_new(semconv_opt_in_mode): + span.set_attribute("db.system.name", "redis") + span.set_attribute("db.operation.name", "PIPELINE") + host, port, db, unix_sock = _get_redis_conn_info(instance) + if db is not None: + span.set_attribute("db.namespace", str(db)) + span.set_attribute("db.query.text", resource) + if len(command_stack) > 1: + span.set_attribute("db.operation.batch.size", len(command_stack)) + if host: + span.set_attribute("server.address", host) + span.set_attribute("network.peer.address", host) + if port: + span.set_attribute("server.port", port) + span.set_attribute("network.peer.port", port) + if unix_sock: + span.set_attribute("network.peer.address", unix_sock) + span.set_attribute("network.transport", "unix") response = None try: response = func(*args, **kwargs) @@ -278,17 +350,54 @@ async def _async_traced_execute_command( ) -> Awaitable[R]: query = _format_command_args(args) name = _build_span_name(instance, args) - + semconv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.DATABASE + ) with tracer.start_as_current_span( name, kind=trace.SpanKind.CLIENT ) as span: if span.is_recording(): - span.set_attribute(DB_STATEMENT, query) - _set_connection_attributes(span, instance) - span.set_attribute("db.redis.args_length", len(args)) + if _report_old(semconv_opt_in_mode): + span.set_attribute(DB_STATEMENT, query) + _set_connection_attributes(span, instance) + span.set_attribute("db.redis.args_length", len(args)) + if _report_new(semconv_opt_in_mode): + span.set_attribute("db.system.name", "redis") + if args and len(args) > 0: + span.set_attribute("db.operation.name", args[0]) + host, port, db, unix_sock = _get_redis_conn_info(instance) + if db is not None: + span.set_attribute("db.namespace", str(db)) + span.set_attribute("db.query.text", query) + if host: + span.set_attribute("server.address", host) + span.set_attribute("network.peer.address", host) + if port: + span.set_attribute("server.port", port) + span.set_attribute("network.peer.port", port) + if unix_sock: + span.set_attribute("network.peer.address", unix_sock) + span.set_attribute("network.transport", "unix") + if args and args[0] in ("EVALSHA", "EVAL") and len(args) > 1: + span.set_attribute("db.stored_procedure.name", args[1]) if callable(request_hook): request_hook(span, instance, args, kwargs) - response = await func(*args, **kwargs) + try: + response = await func(*args, **kwargs) + except Exception as exc: + if _report_new(semconv_opt_in_mode) and span.is_recording(): + error_type = getattr(exc, "args", [None])[0] + if error_type and isinstance(error_type, str): + prefix = error_type.split(" ")[0] + span.set_attribute("db.response.status_code", prefix) + span.set_attribute("error.type", prefix) + else: + span.set_attribute("error.type", type(exc).__qualname__) + if span.is_recording(): + span.set_status(StatusCode.ERROR) + raise + if span.is_recording() and span.name == "redis.search": + _add_search_attributes(span, response, args) if callable(response_hook): response_hook(span, instance, response) return response @@ -314,24 +423,50 @@ async def _async_traced_execute_pipeline( ) = _build_span_meta_data_for_pipeline(instance) exception = None - + semconv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.DATABASE + ) with tracer.start_as_current_span( span_name, kind=trace.SpanKind.CLIENT ) as span: if span.is_recording(): - span.set_attribute(DB_STATEMENT, resource) - _set_connection_attributes(span, instance) - span.set_attribute( - "db.redis.pipeline_length", len(command_stack) - ) - + if _report_old(semconv_opt_in_mode): + span.set_attribute(DB_STATEMENT, resource) + _set_connection_attributes(span, instance) + span.set_attribute("db.redis.pipeline_length", len(command_stack)) + if _report_new(semconv_opt_in_mode): + span.set_attribute("db.system.name", "redis") + span.set_attribute("db.operation.name", "PIPELINE") + host, port, db, unix_sock = _get_redis_conn_info(instance) + if db is not None: + span.set_attribute("db.namespace", str(db)) + span.set_attribute("db.query.text", resource) + if len(command_stack) > 1: + span.set_attribute("db.operation.batch.size", len(command_stack)) + if host: + span.set_attribute("server.address", host) + span.set_attribute("network.peer.address", host) + if port: + span.set_attribute("server.port", port) + span.set_attribute("network.peer.port", port) + if unix_sock: + span.set_attribute("network.peer.address", unix_sock) + span.set_attribute("network.transport", "unix") response = None try: response = await func(*args, **kwargs) - except redis.WatchError as watch_exception: - span.set_status(StatusCode.UNSET) - exception = watch_exception - + except Exception as exc: + if _report_new(semconv_opt_in_mode) and span.is_recording(): + error_type = getattr(exc, "args", [None])[0] + if error_type and isinstance(error_type, str): + prefix = error_type.split(" ")[0] + span.set_attribute("db.response.status_code", prefix) + span.set_attribute("error.type", prefix) + else: + span.set_attribute("error.type", type(exc).__qualname__) + if span.is_recording(): + span.set_status(StatusCode.ERROR) + raise if callable(response_hook): response_hook(span, instance, response) From 5ce2d90f77a9437f77b8fe497f3461b3ec4e02d3 Mon Sep 17 00:00:00 2001 From: Lukasz Ciukaj Date: Fri, 10 Oct 2025 21:01:54 -0400 Subject: [PATCH 2/6] Update CHANGELOG.md to include PR #3826 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8349f4b942..646dafe6f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,9 +18,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3765](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3765)) - Add `rstcheck` to pre-commit to stop introducing invalid RST ([#3777](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3777)) - - `opentelemetry-exporter-credential-provider-gcp`: create this package which provides support for supplying your machine's Application Default Credentials (https://cloud.google.com/docs/authentication/application-default-credentials) to the OTLP Exporters created automatically by OpenTelemetry Python's auto instrumentation. These credentials authorize OTLP traces to be sent to `telemetry.googleapis.com`. [#3766](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3766). +- `opentelemetry-instrumentation-redis`: Add support for semantic convention opt-in and stable Redis span attributes ([#3826](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3826)) ## Version 1.37.0/0.58b0 (2025-09-11) From 2bea23ef6da6c5374dc8f3aabcffa1b8a3a40d50 Mon Sep 17 00:00:00 2001 From: Lukasz Ciukaj Date: Fri, 10 Oct 2025 21:28:39 -0400 Subject: [PATCH 3/6] auto-fix import order and formatting via pre-commit (ruff) (#3826) --- .../instrumentation/redis/__init__.py | 56 +++++++++++++------ 1 file changed, 40 insertions(+), 16 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py index a9fb9b4327..d73f3d955c 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py @@ -123,6 +123,12 @@ def response_hook(span, instance, response): from wrapt import wrap_function_wrapper from opentelemetry import trace +from opentelemetry.instrumentation._semconv import ( + _OpenTelemetrySemanticConventionStability, + _OpenTelemetryStabilitySignalType, + _report_new, + _report_old, +) from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.redis.package import _instruments from opentelemetry.instrumentation.redis.util import ( @@ -138,12 +144,6 @@ def response_hook(span, instance, response): from opentelemetry.semconv._incubating.attributes.db_attributes import ( DB_STATEMENT, ) -from opentelemetry.instrumentation._semconv import ( - _OpenTelemetrySemanticConventionStability, - _OpenTelemetryStabilitySignalType, - _report_new, - _report_old, -) from opentelemetry.trace import ( StatusCode, Tracer, @@ -190,6 +190,7 @@ def response_hook(span, instance, response): _INSTRUMENTATION_ATTR = "_is_instrumented_by_opentelemetry" + def _get_redis_conn_info(instance): host, port, db, unix_sock = None, None, None, None pool = getattr(instance, "connection_pool", None) @@ -201,6 +202,7 @@ def _get_redis_conn_info(instance): unix_sock = conn_kwargs.get("path") return host, port, db, unix_sock + def _traced_execute_factory( tracer: Tracer, request_hook: RequestHook | None = None, @@ -244,7 +246,11 @@ def _traced_execute_command( if unix_sock: span.set_attribute("network.peer.address", unix_sock) span.set_attribute("network.transport", "unix") - if args and args[0] in ("EVALSHA", "EVAL") and len(args) > 1: + if ( + args + and args[0] in ("EVALSHA", "EVAL") + and len(args) > 1 + ): span.set_attribute("db.stored_procedure.name", args[1]) if span.name == "redis.create_index": _add_create_attributes(span, args) @@ -260,7 +266,9 @@ def _traced_execute_command( span.set_attribute("db.response.status_code", prefix) span.set_attribute("error.type", prefix) else: - span.set_attribute("error.type", type(exc).__qualname__) + span.set_attribute( + "error.type", type(exc).__qualname__ + ) if span.is_recording(): span.set_status(StatusCode.ERROR) raise @@ -300,7 +308,9 @@ def _traced_execute_pipeline( if _report_old(semconv_opt_in_mode): span.set_attribute(DB_STATEMENT, resource) _set_connection_attributes(span, instance) - span.set_attribute("db.redis.pipeline_length", len(command_stack)) + span.set_attribute( + "db.redis.pipeline_length", len(command_stack) + ) if _report_new(semconv_opt_in_mode): span.set_attribute("db.system.name", "redis") span.set_attribute("db.operation.name", "PIPELINE") @@ -309,7 +319,9 @@ def _traced_execute_pipeline( span.set_attribute("db.namespace", str(db)) span.set_attribute("db.query.text", resource) if len(command_stack) > 1: - span.set_attribute("db.operation.batch.size", len(command_stack)) + span.set_attribute( + "db.operation.batch.size", len(command_stack) + ) if host: span.set_attribute("server.address", host) span.set_attribute("network.peer.address", host) @@ -378,7 +390,11 @@ async def _async_traced_execute_command( if unix_sock: span.set_attribute("network.peer.address", unix_sock) span.set_attribute("network.transport", "unix") - if args and args[0] in ("EVALSHA", "EVAL") and len(args) > 1: + if ( + args + and args[0] in ("EVALSHA", "EVAL") + and len(args) > 1 + ): span.set_attribute("db.stored_procedure.name", args[1]) if callable(request_hook): request_hook(span, instance, args, kwargs) @@ -392,7 +408,9 @@ async def _async_traced_execute_command( span.set_attribute("db.response.status_code", prefix) span.set_attribute("error.type", prefix) else: - span.set_attribute("error.type", type(exc).__qualname__) + span.set_attribute( + "error.type", type(exc).__qualname__ + ) if span.is_recording(): span.set_status(StatusCode.ERROR) raise @@ -433,7 +451,9 @@ async def _async_traced_execute_pipeline( if _report_old(semconv_opt_in_mode): span.set_attribute(DB_STATEMENT, resource) _set_connection_attributes(span, instance) - span.set_attribute("db.redis.pipeline_length", len(command_stack)) + span.set_attribute( + "db.redis.pipeline_length", len(command_stack) + ) if _report_new(semconv_opt_in_mode): span.set_attribute("db.system.name", "redis") span.set_attribute("db.operation.name", "PIPELINE") @@ -442,7 +462,9 @@ async def _async_traced_execute_pipeline( span.set_attribute("db.namespace", str(db)) span.set_attribute("db.query.text", resource) if len(command_stack) > 1: - span.set_attribute("db.operation.batch.size", len(command_stack)) + span.set_attribute( + "db.operation.batch.size", len(command_stack) + ) if host: span.set_attribute("server.address", host) span.set_attribute("network.peer.address", host) @@ -463,14 +485,16 @@ async def _async_traced_execute_pipeline( span.set_attribute("db.response.status_code", prefix) span.set_attribute("error.type", prefix) else: - span.set_attribute("error.type", type(exc).__qualname__) + span.set_attribute( + "error.type", type(exc).__qualname__ + ) if span.is_recording(): span.set_status(StatusCode.ERROR) raise if callable(response_hook): response_hook(span, instance, response) - if exception: + if exception is not None: raise exception return response From 420b76c58cc4ce4c9bb72c2a1428de6fd0bf9baf Mon Sep 17 00:00:00 2001 From: Lukasz Ciukaj Date: Sat, 11 Oct 2025 18:22:54 -0400 Subject: [PATCH 4/6] feat(redis): Enhance OTel semconv compliance for Redis error handling and script attributes (#3826) --- .../instrumentation/redis/__init__.py | 43 +++++++++++++------ 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py index d73f3d955c..eb6bcdb988 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py @@ -248,7 +248,7 @@ def _traced_execute_command( span.set_attribute("network.transport", "unix") if ( args - and args[0] in ("EVALSHA", "EVAL") + and args[0] in ("EVALSHA", "FCALL") and len(args) > 1 ): span.set_attribute("db.stored_procedure.name", args[1]) @@ -258,6 +258,10 @@ def _traced_execute_command( request_hook(span, instance, args, kwargs) try: response = func(*args, **kwargs) + except redis.WatchError as watch_exception: + if span.is_recording(): + span.set_status(StatusCode.UNSET) + raise watch_exception except Exception as exc: if _report_new(semconv_opt_in_mode) and span.is_recording(): error_type = getattr(exc, "args", [None])[0] @@ -392,7 +396,7 @@ async def _async_traced_execute_command( span.set_attribute("network.transport", "unix") if ( args - and args[0] in ("EVALSHA", "EVAL") + and args[0] in ("EVALSHA", "FCALL") and len(args) > 1 ): span.set_attribute("db.stored_procedure.name", args[1]) @@ -400,6 +404,10 @@ async def _async_traced_execute_command( request_hook(span, instance, args, kwargs) try: response = await func(*args, **kwargs) + except redis.WatchError as watch_exception: + if span.is_recording(): + span.set_status(StatusCode.UNSET) + raise watch_exception except Exception as exc: if _report_new(semconv_opt_in_mode) and span.is_recording(): error_type = getattr(exc, "args", [None])[0] @@ -477,20 +485,27 @@ async def _async_traced_execute_pipeline( response = None try: response = await func(*args, **kwargs) - except Exception as exc: - if _report_new(semconv_opt_in_mode) and span.is_recording(): - error_type = getattr(exc, "args", [None])[0] - if error_type and isinstance(error_type, str): - prefix = error_type.split(" ")[0] - span.set_attribute("db.response.status_code", prefix) - span.set_attribute("error.type", prefix) - else: - span.set_attribute( - "error.type", type(exc).__qualname__ - ) + except redis.WatchError as watch_exception: + if span.is_recording(): + span.set_status(StatusCode.UNSET) + exception = watch_exception + except Exception as exc: # pylint: disable=broad-exception-caught if span.is_recording(): + if _report_new(semconv_opt_in_mode): + error_type = getattr(exc, "args", [None])[0] + if error_type and isinstance(error_type, str): + prefix = error_type.split(" ")[0] + span.set_attribute( + "db.response.status_code", prefix + ) + span.set_attribute("error.type", prefix) + else: + span.set_attribute( + "error.type", type(exc).__qualname__ + ) span.set_status(StatusCode.ERROR) - raise + exception = exc + if callable(response_hook): response_hook(span, instance, response) From 2b2670d18aab21adce58c20283f5c66c6a892318 Mon Sep 17 00:00:00 2001 From: Lukasz Ciukaj Date: Sun, 12 Oct 2025 22:03:36 -0400 Subject: [PATCH 5/6] feat(redis): fix lint errors to pass tests (#3826) --- .../instrumentation/redis/__init__.py | 297 +++++++++--------- 1 file changed, 142 insertions(+), 155 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py index eb6bcdb988..b4c534e3f7 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py @@ -203,6 +203,110 @@ def _get_redis_conn_info(instance): return host, port, db, unix_sock +# Helper function to set old semantic convention attributes +def _set_old_semconv_attributes( + span, + instance, + args, + query, + command_stack, + resource, + is_pipeline, +): + span.set_attribute(DB_STATEMENT, query if not is_pipeline else resource) + _set_connection_attributes(span, instance) + if not is_pipeline: + span.set_attribute("db.redis.args_length", len(args)) + else: + span.set_attribute("db.redis.pipeline_length", len(command_stack)) + + +# Helper function to set new semantic convention attributes +def _set_new_semconv_attributes( + span, + instance, + args, + query, + command_stack, + resource, + is_pipeline, +): + span.set_attribute("db.system.name", "redis") + + if not is_pipeline: + if args and len(args) > 0: + span.set_attribute("db.operation.name", args[0]) + else: + span.set_attribute("db.operation.name", "PIPELINE") + if len(command_stack) > 1: + span.set_attribute("db.operation.batch.size", len(command_stack)) + + host, port, db, unix_sock = _get_redis_conn_info(instance) + if db is not None: + span.set_attribute("db.namespace", str(db)) + span.set_attribute("db.query.text", query if not is_pipeline else resource) + if host: + span.set_attribute("server.address", host) + span.set_attribute("network.peer.address", host) + if port: + span.set_attribute("server.port", port) + span.set_attribute("network.peer.port", port) + if unix_sock: + span.set_attribute("network.peer.address", unix_sock) + span.set_attribute("network.transport", "unix") + + # db.stored_procedure.name (only for individual commands) + if not is_pipeline: + if args and args[0] in ("EVALSHA", "FCALL") and len(args) > 1: + span.set_attribute("db.stored_procedure.name", args[1]) + + +# Helper function to set all common span attributes +def _set_span_attributes( + span, + instance, + args, # For individual commands + query, # For individual commands + command_stack, # For pipelines + resource, # For pipelines + semconv_opt_in_mode, +): + if not span.is_recording(): + return + + is_pipeline = command_stack is not None + + if _report_old(semconv_opt_in_mode): + _set_old_semconv_attributes( + span, instance, args, query, command_stack, resource, is_pipeline + ) + + if _report_new(semconv_opt_in_mode): + _set_new_semconv_attributes( + span, instance, args, query, command_stack, resource, is_pipeline + ) + + # Command-specific attributes that depend on span.name (e.g., redis.create_index) + if not is_pipeline and span.name == "redis.create_index": + _add_create_attributes(span, args) + + +# Helper function to set error attributes on a span +def _set_span_error_attributes(span, exc, semconv_opt_in_mode): + if not span.is_recording(): + return + + if _report_new(semconv_opt_in_mode): + error_type = getattr(exc, "args", [None])[0] + if error_type and isinstance(error_type, str): + prefix = error_type.split(" ")[0] + span.set_attribute("db.response.status_code", prefix) + span.set_attribute("error.type", prefix) + else: + span.set_attribute("error.type", type(exc).__qualname__) + span.set_status(StatusCode.ERROR) + + def _traced_execute_factory( tracer: Tracer, request_hook: RequestHook | None = None, @@ -222,38 +326,10 @@ def _traced_execute_command( with tracer.start_as_current_span( name, kind=trace.SpanKind.CLIENT ) as span: - if span.is_recording(): - # Old/existing attributes: - if _report_old(semconv_opt_in_mode): - span.set_attribute(DB_STATEMENT, query) - _set_connection_attributes(span, instance) - span.set_attribute("db.redis.args_length", len(args)) - # New semantic conventions: - if _report_new(semconv_opt_in_mode): - span.set_attribute("db.system.name", "redis") - if args and len(args) > 0: - span.set_attribute("db.operation.name", args[0]) - host, port, db, unix_sock = _get_redis_conn_info(instance) - if db is not None: - span.set_attribute("db.namespace", str(db)) - span.set_attribute("db.query.text", query) - if host: - span.set_attribute("server.address", host) - span.set_attribute("network.peer.address", host) - if port: - span.set_attribute("server.port", port) - span.set_attribute("network.peer.port", port) - if unix_sock: - span.set_attribute("network.peer.address", unix_sock) - span.set_attribute("network.transport", "unix") - if ( - args - and args[0] in ("EVALSHA", "FCALL") - and len(args) > 1 - ): - span.set_attribute("db.stored_procedure.name", args[1]) - if span.name == "redis.create_index": - _add_create_attributes(span, args) + _set_span_attributes( + span, instance, args, query, None, None, semconv_opt_in_mode + ) + if callable(request_hook): request_hook(span, instance, args, kwargs) try: @@ -262,19 +338,8 @@ def _traced_execute_command( if span.is_recording(): span.set_status(StatusCode.UNSET) raise watch_exception - except Exception as exc: - if _report_new(semconv_opt_in_mode) and span.is_recording(): - error_type = getattr(exc, "args", [None])[0] - if error_type and isinstance(error_type, str): - prefix = error_type.split(" ")[0] - span.set_attribute("db.response.status_code", prefix) - span.set_attribute("error.type", prefix) - else: - span.set_attribute( - "error.type", type(exc).__qualname__ - ) - if span.is_recording(): - span.set_status(StatusCode.ERROR) + except Exception as exc: # pylint: disable=broad-exception-caught + _set_span_error_attributes(span, exc, semconv_opt_in_mode) raise if span.is_recording() and span.name == "redis.search": _add_search_attributes(span, response, args) @@ -308,39 +373,26 @@ def _traced_execute_pipeline( with tracer.start_as_current_span( span_name, kind=trace.SpanKind.CLIENT ) as span: - if span.is_recording(): - if _report_old(semconv_opt_in_mode): - span.set_attribute(DB_STATEMENT, resource) - _set_connection_attributes(span, instance) - span.set_attribute( - "db.redis.pipeline_length", len(command_stack) - ) - if _report_new(semconv_opt_in_mode): - span.set_attribute("db.system.name", "redis") - span.set_attribute("db.operation.name", "PIPELINE") - host, port, db, unix_sock = _get_redis_conn_info(instance) - if db is not None: - span.set_attribute("db.namespace", str(db)) - span.set_attribute("db.query.text", resource) - if len(command_stack) > 1: - span.set_attribute( - "db.operation.batch.size", len(command_stack) - ) - if host: - span.set_attribute("server.address", host) - span.set_attribute("network.peer.address", host) - if port: - span.set_attribute("server.port", port) - span.set_attribute("network.peer.port", port) - if unix_sock: - span.set_attribute("network.peer.address", unix_sock) - span.set_attribute("network.transport", "unix") + _set_span_attributes( + span, + instance, + None, + None, + command_stack, + resource, + semconv_opt_in_mode, + ) + response = None try: response = func(*args, **kwargs) except redis.WatchError as watch_exception: - span.set_status(StatusCode.UNSET) + if span.is_recording(): + span.set_status(StatusCode.UNSET) exception = watch_exception + except Exception as exc: # pylint: disable=broad-exception-caught + _set_span_error_attributes(span, exc, semconv_opt_in_mode) + exception = exc if callable(response_hook): response_hook(span, instance, response) @@ -372,34 +424,10 @@ async def _async_traced_execute_command( with tracer.start_as_current_span( name, kind=trace.SpanKind.CLIENT ) as span: - if span.is_recording(): - if _report_old(semconv_opt_in_mode): - span.set_attribute(DB_STATEMENT, query) - _set_connection_attributes(span, instance) - span.set_attribute("db.redis.args_length", len(args)) - if _report_new(semconv_opt_in_mode): - span.set_attribute("db.system.name", "redis") - if args and len(args) > 0: - span.set_attribute("db.operation.name", args[0]) - host, port, db, unix_sock = _get_redis_conn_info(instance) - if db is not None: - span.set_attribute("db.namespace", str(db)) - span.set_attribute("db.query.text", query) - if host: - span.set_attribute("server.address", host) - span.set_attribute("network.peer.address", host) - if port: - span.set_attribute("server.port", port) - span.set_attribute("network.peer.port", port) - if unix_sock: - span.set_attribute("network.peer.address", unix_sock) - span.set_attribute("network.transport", "unix") - if ( - args - and args[0] in ("EVALSHA", "FCALL") - and len(args) > 1 - ): - span.set_attribute("db.stored_procedure.name", args[1]) + _set_span_attributes( + span, instance, args, query, None, None, semconv_opt_in_mode + ) + if callable(request_hook): request_hook(span, instance, args, kwargs) try: @@ -408,19 +436,8 @@ async def _async_traced_execute_command( if span.is_recording(): span.set_status(StatusCode.UNSET) raise watch_exception - except Exception as exc: - if _report_new(semconv_opt_in_mode) and span.is_recording(): - error_type = getattr(exc, "args", [None])[0] - if error_type and isinstance(error_type, str): - prefix = error_type.split(" ")[0] - span.set_attribute("db.response.status_code", prefix) - span.set_attribute("error.type", prefix) - else: - span.set_attribute( - "error.type", type(exc).__qualname__ - ) - if span.is_recording(): - span.set_status(StatusCode.ERROR) + except Exception as exc: # pylint: disable=broad-exception-caught + _set_span_error_attributes(span, exc, semconv_opt_in_mode) raise if span.is_recording() and span.name == "redis.search": _add_search_attributes(span, response, args) @@ -455,33 +472,16 @@ async def _async_traced_execute_pipeline( with tracer.start_as_current_span( span_name, kind=trace.SpanKind.CLIENT ) as span: - if span.is_recording(): - if _report_old(semconv_opt_in_mode): - span.set_attribute(DB_STATEMENT, resource) - _set_connection_attributes(span, instance) - span.set_attribute( - "db.redis.pipeline_length", len(command_stack) - ) - if _report_new(semconv_opt_in_mode): - span.set_attribute("db.system.name", "redis") - span.set_attribute("db.operation.name", "PIPELINE") - host, port, db, unix_sock = _get_redis_conn_info(instance) - if db is not None: - span.set_attribute("db.namespace", str(db)) - span.set_attribute("db.query.text", resource) - if len(command_stack) > 1: - span.set_attribute( - "db.operation.batch.size", len(command_stack) - ) - if host: - span.set_attribute("server.address", host) - span.set_attribute("network.peer.address", host) - if port: - span.set_attribute("server.port", port) - span.set_attribute("network.peer.port", port) - if unix_sock: - span.set_attribute("network.peer.address", unix_sock) - span.set_attribute("network.transport", "unix") + _set_span_attributes( + span, + instance, + None, + None, + command_stack, + resource, + semconv_opt_in_mode, + ) + response = None try: response = await func(*args, **kwargs) @@ -490,20 +490,7 @@ async def _async_traced_execute_pipeline( span.set_status(StatusCode.UNSET) exception = watch_exception except Exception as exc: # pylint: disable=broad-exception-caught - if span.is_recording(): - if _report_new(semconv_opt_in_mode): - error_type = getattr(exc, "args", [None])[0] - if error_type and isinstance(error_type, str): - prefix = error_type.split(" ")[0] - span.set_attribute( - "db.response.status_code", prefix - ) - span.set_attribute("error.type", prefix) - else: - span.set_attribute( - "error.type", type(exc).__qualname__ - ) - span.set_status(StatusCode.ERROR) + _set_span_error_attributes(span, exc, semconv_opt_in_mode) exception = exc if callable(response_hook): From e20cea6fa1aaf9320b299c5561a6fba215cb975e Mon Sep 17 00:00:00 2001 From: Lukasz Ciukaj Date: Mon, 13 Oct 2025 00:31:36 -0400 Subject: [PATCH 6/6] feat(redis): add unit tests for semconv opt-in stable Redis attributes (#3826) --- .../tests/_test_semconv_helper.py | 133 ++++++++++++++++++ .../tests/test_semconv.py | 42 ++++++ 2 files changed, 175 insertions(+) create mode 100644 instrumentation/opentelemetry-instrumentation-redis/tests/_test_semconv_helper.py create mode 100644 instrumentation/opentelemetry-instrumentation-redis/tests/test_semconv.py diff --git a/instrumentation/opentelemetry-instrumentation-redis/tests/_test_semconv_helper.py b/instrumentation/opentelemetry-instrumentation-redis/tests/_test_semconv_helper.py new file mode 100644 index 0000000000..36bc57220d --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-redis/tests/_test_semconv_helper.py @@ -0,0 +1,133 @@ +import os +from unittest.mock import patch + +import fakeredis +from redis.exceptions import ResponseError, WatchError + +from opentelemetry import trace +from opentelemetry.instrumentation.redis import RedisInstrumentor +from opentelemetry.semconv._incubating.attributes.db_attributes import ( + DB_STATEMENT, + DB_SYSTEM, +) +from opentelemetry.test.test_base import TestBase + + +class TestRedisSemConv(TestBase): + def setUp(self): + super().setUp() + self.conn_patcher = patch( + "opentelemetry.instrumentation.redis._get_redis_conn_info", + return_value=("localhost", 6379, 0, None), + ) + self.conn_patcher.start() + RedisInstrumentor().instrument(tracer_provider=self.tracer_provider) + self.redis_client = fakeredis.FakeStrictRedis() + self.env_mode = os.getenv("OTEL_SEMCONV_STABILITY_OPT_IN") + + def tearDown(self): + super().tearDown() + self.conn_patcher.stop() + RedisInstrumentor().uninstrument() + + def test_single_command(self): + """Tests attributes for a regular single command.""" + self.redis_client.set("key", "value") + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + if self.env_mode == "database": + self.assertEqual(span.attributes.get("db.system.name"), "redis") + self.assertEqual(span.attributes.get("db.operation.name"), "SET") + self.assertEqual(span.attributes.get("db.namespace"), "0") + self.assertEqual(span.attributes.get("db.query.text"), "SET ? ?") + self.assertNotIn(DB_SYSTEM, span.attributes) + self.assertNotIn(DB_STATEMENT, span.attributes) + elif self.env_mode == "database/dup": + self.assertEqual(span.attributes.get("db.system.name"), "redis") + self.assertEqual(span.attributes.get(DB_SYSTEM), "redis") + self.assertEqual(span.attributes.get(DB_STATEMENT), "SET ? ?") + else: # Default (old) behavior + self.assertEqual(span.attributes.get(DB_SYSTEM), "redis") + self.assertEqual(span.attributes.get(DB_STATEMENT), "SET ? ?") + self.assertNotIn("db.system.name", span.attributes) + + def test_pipeline_command(self): + """Tests attributes for a pipeline command.""" + with self.redis_client.pipeline(transaction=False) as pipe: + pipe.set("a", 1) + pipe.get("a") + pipe.execute() + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + if self.env_mode in ("database", "database/dup"): + self.assertEqual( + span.attributes.get("db.operation.name"), "PIPELINE" + ) + self.assertEqual(span.attributes.get("db.operation.batch.size"), 2) + + def test_stored_procedure_command(self): + """Tests attributes for a stored procedure command.""" + with patch.object( + self.redis_client, "parse_response", return_value=b"ok" + ): + self.redis_client.evalsha("some-sha", 0, "key", "value") + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + if self.env_mode in ("database", "database/dup"): + self.assertEqual( + span.attributes.get("db.stored_procedure.name"), "some-sha" + ) + else: + self.assertNotIn("db.stored_procedure.name", span.attributes) + + def test_generic_error(self): + """Tests attributes for a generic error.""" + with patch.object( + self.redis_client, + "parse_response", + side_effect=ResponseError("ERR unknown command"), + ): + with self.assertRaises(ResponseError): + self.redis_client.execute_command("INVALID_COMMAND") + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual(span.status.status_code, trace.StatusCode.ERROR) + + if self.env_mode in ("database", "database/dup"): + self.assertEqual("ERR", span.attributes.get("error.type")) + self.assertEqual( + "ERR", span.attributes.get("db.response.status_code") + ) + else: + self.assertNotIn("error.type", span.attributes) + self.assertNotIn("db.response.status_code", span.attributes) + + def test_watch_error(self): + """Tests attributes for a WatchError.""" + with self.assertRaises(WatchError): + with self.redis_client.pipeline(transaction=True) as pipe: + pipe.watch("watched_key") + self.redis_client.set("watched_key", "modified-externally") + pipe.multi() + pipe.set("watched_key", "new-value") + pipe.get("watched_key") + with patch.object(pipe, "execute", side_effect=WatchError): + pipe.execute() + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 2) + + failed_pipeline_span = spans[-1] + self.assertEqual( + failed_pipeline_span.status.status_code, trace.StatusCode.UNSET + ) diff --git a/instrumentation/opentelemetry-instrumentation-redis/tests/test_semconv.py b/instrumentation/opentelemetry-instrumentation-redis/tests/test_semconv.py new file mode 100644 index 0000000000..26e0745556 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-redis/tests/test_semconv.py @@ -0,0 +1,42 @@ +import os +import subprocess +import sys + +import pytest + +# The full relative path from the project root +HELPER_PATH = "instrumentation/opentelemetry-instrumentation-redis/tests/_test_semconv_helper.py" + + +@pytest.mark.parametrize( + "semconv_opt_in, test_name", + [ + (None, "default"), + ("database", "new"), + ("database/dup", "dup"), + ], +) +def test_run_in_subprocess(semconv_opt_in, test_name): + """ + Runs the semantic convention test in a clean subprocess. + The `semconv_opt_in` value is used to set the env var. + """ + env = os.environ.copy() + + if semconv_opt_in is None: + if "OTEL_SEMCONV_STABILITY_OPT_IN" in env: + del env["OTEL_SEMCONV_STABILITY_OPT_IN"] + else: + env["OTEL_SEMCONV_STABILITY_OPT_IN"] = semconv_opt_in + + result = subprocess.run( + [sys.executable, "-m", "pytest", HELPER_PATH], + capture_output=True, + text=True, + env=env, + check=False, # Explicitly set check=False to satisfy pylint + ) + # Use a standard assert + assert ( + result.returncode == 0 + ), f"Subprocess for '{test_name}' mode failed with stdout:\n{result.stdout}\nstderr:\n{result.stderr}"