diff --git a/CHANGELOG.md b/CHANGELOG.md index 9061fdeb1a..8e95bfb8f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3743](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3743)) - Add `rstcheck` to pre-commit to stop introducing invalid RST ([#3777](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3777)) +- `opentelemetry-instrumentation-redis`: Add support for semantic convention opt-in and stable Redis span attributes + ([#3826](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3826)) - `opentelemetry-exporter-credential-provider-gcp`: create this package which provides support for supplying your machine's Application Default Credentials (https://cloud.google.com/docs/authentication/application-default-credentials) to the OTLP Exporters created automatically by OpenTelemetry Python's auto instrumentation. These credentials authorize OTLP traces to be sent to `telemetry.googleapis.com`. [#3766](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3766). - `opentelemetry-instrumentation-psycopg`: Add missing parameter `capture_parameters` to instrumentor. diff --git a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py index ef61ecec2e..b4c534e3f7 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py @@ -123,6 +123,12 @@ def response_hook(span, instance, response): from wrapt import wrap_function_wrapper from opentelemetry import trace +from opentelemetry.instrumentation._semconv import ( + _OpenTelemetrySemanticConventionStability, + _OpenTelemetryStabilitySignalType, + _report_new, + _report_old, +) from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.redis.package import _instruments from opentelemetry.instrumentation.redis.util import ( @@ -185,6 +191,122 @@ def response_hook(span, instance, response): _INSTRUMENTATION_ATTR = "_is_instrumented_by_opentelemetry" +def _get_redis_conn_info(instance): + host, port, db, unix_sock = None, None, None, None + pool = getattr(instance, "connection_pool", None) + if pool: + conn_kwargs = pool.connection_kwargs + host = conn_kwargs.get("host") + port = conn_kwargs.get("port") + db = conn_kwargs.get("db", 0) + unix_sock = conn_kwargs.get("path") + return host, port, db, unix_sock + + +# Helper function to set old semantic convention attributes +def _set_old_semconv_attributes( + span, + instance, + args, + query, + command_stack, + resource, + is_pipeline, +): + span.set_attribute(DB_STATEMENT, query if not is_pipeline else resource) + _set_connection_attributes(span, instance) + if not is_pipeline: + span.set_attribute("db.redis.args_length", len(args)) + else: + span.set_attribute("db.redis.pipeline_length", len(command_stack)) + + +# Helper function to set new semantic convention attributes +def _set_new_semconv_attributes( + span, + instance, + args, + query, + command_stack, + resource, + is_pipeline, +): + span.set_attribute("db.system.name", "redis") + + if not is_pipeline: + if args and len(args) > 0: + span.set_attribute("db.operation.name", args[0]) + else: + span.set_attribute("db.operation.name", "PIPELINE") + if len(command_stack) > 1: + span.set_attribute("db.operation.batch.size", len(command_stack)) + + host, port, db, unix_sock = _get_redis_conn_info(instance) + if db is not None: + span.set_attribute("db.namespace", str(db)) + span.set_attribute("db.query.text", query if not is_pipeline else resource) + if host: + span.set_attribute("server.address", host) + span.set_attribute("network.peer.address", host) + if port: + span.set_attribute("server.port", port) + span.set_attribute("network.peer.port", port) + if unix_sock: + span.set_attribute("network.peer.address", unix_sock) + span.set_attribute("network.transport", "unix") + + # db.stored_procedure.name (only for individual commands) + if not is_pipeline: + if args and args[0] in ("EVALSHA", "FCALL") and len(args) > 1: + span.set_attribute("db.stored_procedure.name", args[1]) + + +# Helper function to set all common span attributes +def _set_span_attributes( + span, + instance, + args, # For individual commands + query, # For individual commands + command_stack, # For pipelines + resource, # For pipelines + semconv_opt_in_mode, +): + if not span.is_recording(): + return + + is_pipeline = command_stack is not None + + if _report_old(semconv_opt_in_mode): + _set_old_semconv_attributes( + span, instance, args, query, command_stack, resource, is_pipeline + ) + + if _report_new(semconv_opt_in_mode): + _set_new_semconv_attributes( + span, instance, args, query, command_stack, resource, is_pipeline + ) + + # Command-specific attributes that depend on span.name (e.g., redis.create_index) + if not is_pipeline and span.name == "redis.create_index": + _add_create_attributes(span, args) + + +# Helper function to set error attributes on a span +def _set_span_error_attributes(span, exc, semconv_opt_in_mode): + if not span.is_recording(): + return + + if _report_new(semconv_opt_in_mode): + error_type = getattr(exc, "args", [None])[0] + if error_type and isinstance(error_type, str): + prefix = error_type.split(" ")[0] + span.set_attribute("db.response.status_code", prefix) + span.set_attribute("error.type", prefix) + else: + span.set_attribute("error.type", type(exc).__qualname__) + span.set_status(StatusCode.ERROR) + + def _traced_execute_factory( tracer: Tracer, request_hook: RequestHook | None = None, @@ -198,21 +320,29 @@ def _traced_execute_command( ) -> R: query = _format_command_args(args) name = _build_span_name(instance, args) + semconv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.DATABASE + ) with tracer.start_as_current_span( name, kind=trace.SpanKind.CLIENT ) as span: - if span.is_recording(): - span.set_attribute(DB_STATEMENT, query) - _set_connection_attributes(span, instance) - span.set_attribute("db.redis.args_length", len(args)) - if span.name == "redis.create_index": - _add_create_attributes(span, args) + _set_span_attributes( + span, instance, args, query, None, None, semconv_opt_in_mode + ) + if callable(request_hook): request_hook(span, instance, args, kwargs) - response = func(*args, **kwargs) - if span.is_recording(): - if span.name == "redis.search": - _add_search_attributes(span, response, args) + try: + response = func(*args, **kwargs) + except redis.WatchError as watch_exception: + if span.is_recording(): + span.set_status(StatusCode.UNSET) + raise watch_exception + except Exception as exc: # pylint: disable=broad-exception-caught + _set_span_error_attributes(span, exc, semconv_opt_in_mode) + raise + if span.is_recording() and span.name == "redis.search": + _add_search_attributes(span, response, args) if callable(response_hook): response_hook(span, instance, response) return response @@ -237,22 +367,32 @@ def _traced_execute_pipeline( span_name, ) = _build_span_meta_data_for_pipeline(instance) exception = None + semconv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.DATABASE + ) with tracer.start_as_current_span( span_name, kind=trace.SpanKind.CLIENT ) as span: - if span.is_recording(): - span.set_attribute(DB_STATEMENT, resource) - _set_connection_attributes(span, instance) - span.set_attribute( - "db.redis.pipeline_length", len(command_stack) - ) + _set_span_attributes( + span, + instance, + None, + None, + command_stack, + resource, + semconv_opt_in_mode, + ) response = None try: response = func(*args, **kwargs) except redis.WatchError as watch_exception: - span.set_status(StatusCode.UNSET) + if span.is_recording(): + span.set_status(StatusCode.UNSET) exception = watch_exception + except Exception as exc: # pylint: disable=broad-exception-caught + _set_span_error_attributes(span, exc, semconv_opt_in_mode) + exception = exc if callable(response_hook): response_hook(span, instance, response) @@ -278,17 +418,29 @@ async def _async_traced_execute_command( ) -> Awaitable[R]: query = _format_command_args(args) name = _build_span_name(instance, args) - + semconv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.DATABASE + ) with tracer.start_as_current_span( name, kind=trace.SpanKind.CLIENT ) as span: - if span.is_recording(): - span.set_attribute(DB_STATEMENT, query) - _set_connection_attributes(span, instance) - span.set_attribute("db.redis.args_length", len(args)) + _set_span_attributes( + span, instance, args, query, None, None, semconv_opt_in_mode + ) + if callable(request_hook): request_hook(span, instance, args, kwargs) - response = await func(*args, **kwargs) + try: + response = await func(*args, **kwargs) + except redis.WatchError as watch_exception: + if span.is_recording(): + span.set_status(StatusCode.UNSET) + raise watch_exception + except Exception as exc: # pylint: disable=broad-exception-caught + _set_span_error_attributes(span, exc, semconv_opt_in_mode) + raise + if span.is_recording() and span.name == "redis.search": + _add_search_attributes(span, response, args) if callable(response_hook): response_hook(span, instance, response) return response @@ -314,28 +466,37 @@ async def _async_traced_execute_pipeline( ) = _build_span_meta_data_for_pipeline(instance) exception = None - + semconv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode( + _OpenTelemetryStabilitySignalType.DATABASE + ) with tracer.start_as_current_span( span_name, kind=trace.SpanKind.CLIENT ) as span: - if span.is_recording(): - span.set_attribute(DB_STATEMENT, resource) - _set_connection_attributes(span, instance) - span.set_attribute( - "db.redis.pipeline_length", len(command_stack) - ) + _set_span_attributes( + span, + instance, + None, + None, + command_stack, + resource, + semconv_opt_in_mode, + ) response = None try: response = await func(*args, **kwargs) except redis.WatchError as watch_exception: - span.set_status(StatusCode.UNSET) + if span.is_recording(): + span.set_status(StatusCode.UNSET) exception = watch_exception + except Exception as exc: # pylint: disable=broad-exception-caught + _set_span_error_attributes(span, exc, semconv_opt_in_mode) + exception = exc if callable(response_hook): response_hook(span, instance, response) - if exception: + if exception is not None: raise exception return response diff --git a/instrumentation/opentelemetry-instrumentation-redis/tests/_test_semconv_helper.py b/instrumentation/opentelemetry-instrumentation-redis/tests/_test_semconv_helper.py new file mode 100644 index 0000000000..36bc57220d --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-redis/tests/_test_semconv_helper.py @@ -0,0 +1,133 @@ +import os +from unittest.mock import patch + +import fakeredis +from redis.exceptions import ResponseError, WatchError + +from opentelemetry import trace +from opentelemetry.instrumentation.redis import RedisInstrumentor +from opentelemetry.semconv._incubating.attributes.db_attributes import ( + DB_STATEMENT, + DB_SYSTEM, +) +from opentelemetry.test.test_base import TestBase + + +class TestRedisSemConv(TestBase): + def setUp(self): + super().setUp() + self.conn_patcher = patch( + "opentelemetry.instrumentation.redis._get_redis_conn_info", + return_value=("localhost", 6379, 0, None), + ) + self.conn_patcher.start() + RedisInstrumentor().instrument(tracer_provider=self.tracer_provider) + self.redis_client = fakeredis.FakeStrictRedis() + self.env_mode = os.getenv("OTEL_SEMCONV_STABILITY_OPT_IN") + + def tearDown(self): + super().tearDown() + self.conn_patcher.stop() + RedisInstrumentor().uninstrument() + + def test_single_command(self): + """Tests attributes for a regular single command.""" + self.redis_client.set("key", "value") + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + if self.env_mode == "database": + self.assertEqual(span.attributes.get("db.system.name"), "redis") + self.assertEqual(span.attributes.get("db.operation.name"), "SET") + self.assertEqual(span.attributes.get("db.namespace"), "0") + self.assertEqual(span.attributes.get("db.query.text"), "SET ? ?") + self.assertNotIn(DB_SYSTEM, span.attributes) + self.assertNotIn(DB_STATEMENT, span.attributes) + elif self.env_mode == "database/dup": + self.assertEqual(span.attributes.get("db.system.name"), "redis") + self.assertEqual(span.attributes.get(DB_SYSTEM), "redis") + self.assertEqual(span.attributes.get(DB_STATEMENT), "SET ? ?") + else: # Default (old) behavior + self.assertEqual(span.attributes.get(DB_SYSTEM), "redis") + self.assertEqual(span.attributes.get(DB_STATEMENT), "SET ? ?") + self.assertNotIn("db.system.name", span.attributes) + + def test_pipeline_command(self): + """Tests attributes for a pipeline command.""" + with self.redis_client.pipeline(transaction=False) as pipe: + pipe.set("a", 1) + pipe.get("a") + pipe.execute() + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + if self.env_mode in ("database", "database/dup"): + self.assertEqual( + span.attributes.get("db.operation.name"), "PIPELINE" + ) + self.assertEqual(span.attributes.get("db.operation.batch.size"), 2) + + def test_stored_procedure_command(self): + """Tests attributes for a stored procedure command.""" + with patch.object( + self.redis_client, "parse_response", return_value=b"ok" + ): + self.redis_client.evalsha("some-sha", 0, "key", "value") + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + if self.env_mode in ("database", "database/dup"): + self.assertEqual( + span.attributes.get("db.stored_procedure.name"), "some-sha" + ) + else: + self.assertNotIn("db.stored_procedure.name", span.attributes) + + def test_generic_error(self): + """Tests attributes for a generic error.""" + with patch.object( + self.redis_client, + "parse_response", + side_effect=ResponseError("ERR unknown command"), + ): + with self.assertRaises(ResponseError): + self.redis_client.execute_command("INVALID_COMMAND") + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertEqual(span.status.status_code, trace.StatusCode.ERROR) + + if self.env_mode in ("database", "database/dup"): + self.assertEqual("ERR", span.attributes.get("error.type")) + self.assertEqual( + "ERR", span.attributes.get("db.response.status_code") + ) + else: + self.assertNotIn("error.type", span.attributes) + self.assertNotIn("db.response.status_code", span.attributes) + + def test_watch_error(self): + """Tests attributes for a WatchError.""" + with self.assertRaises(WatchError): + with self.redis_client.pipeline(transaction=True) as pipe: + pipe.watch("watched_key") + self.redis_client.set("watched_key", "modified-externally") + pipe.multi() + pipe.set("watched_key", "new-value") + pipe.get("watched_key") + with patch.object(pipe, "execute", side_effect=WatchError): + pipe.execute() + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 2) + + failed_pipeline_span = spans[-1] + self.assertEqual( + failed_pipeline_span.status.status_code, trace.StatusCode.UNSET + ) diff --git a/instrumentation/opentelemetry-instrumentation-redis/tests/test_semconv.py b/instrumentation/opentelemetry-instrumentation-redis/tests/test_semconv.py new file mode 100644 index 0000000000..26e0745556 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-redis/tests/test_semconv.py @@ -0,0 +1,42 @@ +import os +import subprocess +import sys + +import pytest + +# The full relative path from the project root +HELPER_PATH = "instrumentation/opentelemetry-instrumentation-redis/tests/_test_semconv_helper.py" + + +@pytest.mark.parametrize( + "semconv_opt_in, test_name", + [ + (None, "default"), + ("database", "new"), + ("database/dup", "dup"), + ], +) +def test_run_in_subprocess(semconv_opt_in, test_name): + """ + Runs the semantic convention test in a clean subprocess. + The `semconv_opt_in` value is used to set the env var. + """ + env = os.environ.copy() + + if semconv_opt_in is None: + if "OTEL_SEMCONV_STABILITY_OPT_IN" in env: + del env["OTEL_SEMCONV_STABILITY_OPT_IN"] + else: + env["OTEL_SEMCONV_STABILITY_OPT_IN"] = semconv_opt_in + + result = subprocess.run( + [sys.executable, "-m", "pytest", HELPER_PATH], + capture_output=True, + text=True, + env=env, + check=False, # Explicitly set check=False to satisfy pylint + ) + # Use a standard assert + assert ( + result.returncode == 0 + ), f"Subprocess for '{test_name}' mode failed with stdout:\n{result.stdout}\nstderr:\n{result.stderr}"