diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fa1ab2ba3..caa5776580 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2753](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2753)) - `opentelemetry-instrumentation-grpc` Fix grpc supported version ([#2845](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2845)) +- `opentelemetry-instrumentation-redis` handle connection attributes of redis.cluster.RedisCluster + ([#2626](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2626) ## Version 1.26.0/0.47b0 (2024-07-23) diff --git a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py index 08337c2d4a..959df483bc 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py @@ -127,11 +127,15 @@ def response_hook(span, instance, response): _REDIS_ASYNCIO_CLUSTER_VERSION = (4, 3, 2) -def _set_connection_attributes(span, conn): - if not span.is_recording() or not hasattr(conn, "connection_pool"): +def _set_connection_attributes(span, instance): + if hasattr(instance, "nodes_manager") and hasattr( + instance.nodes_manager.default_node, "redis_connection" + ): + instance = instance.nodes_manager.default_node.redis_connection + if not span.is_recording() or not hasattr(instance, "connection_pool"): return for key, value in _extract_conn_attributes( - conn.connection_pool.connection_kwargs + instance.connection_pool.connection_kwargs ).items(): span.set_attribute(key, value) diff --git a/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py b/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py index c436589adb..f747f0008a 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py +++ b/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py @@ -33,6 +33,65 @@ from opentelemetry.test.test_base import TestBase from opentelemetry.trace import SpanKind +default_cluster_slots = [ + [0, 8191, ["1.1.1.1", 6380, "node_0"], ["1.1.1.1", 6383, "node_3"]], + [8192, 16383, ["1.1.1.1", 6381, "node_1"], ["1.1.1.1", 6382, "node_2"]], +] + + +def get_mocked_redis_cluster_client( + *args, func=None, cluster_slots_raise_error=False, **kwargs +): # noqa + """ + Return a stable RedisCluster object that have deterministic + nodes and slots setup to remove the problem of different IP addresses + on different installations and machines. + """ + cluster_slots = kwargs.pop("cluster_slots", default_cluster_slots) + coverage_res = kwargs.pop("coverage_result", "yes") + cluster_enabled = kwargs.pop("cluster_enabled", True) + with mock.patch.object( + redis.Redis, "execute_command" + ) as execute_command_mock: + + def execute_command(*_args, **_kwargs): + if _args[0] == "CLUSTER SLOTS": + if cluster_slots_raise_error: + raise redis.exceptions.ResponseError() + mock_cluster_slots = cluster_slots + return mock_cluster_slots + if _args[0] == "COMMAND": + return {"get": [], "set": []} + if _args[0] == "INFO": + return {"cluster_enabled": cluster_enabled} + if len(_args) > 1 and _args[1] == "cluster-require-full-coverage": + return {"cluster-require-full-coverage": coverage_res} + if func is not None: + return func(*args, **kwargs) + return execute_command_mock(*_args, **_kwargs) + + execute_command_mock.side_effect = execute_command + + with mock.patch.object( + redis._parsers.CommandsParser, "initialize", autospec=True + ) as cmd_parser_initialize: + + def cmd_init_mock(self, r): + self.commands = { + "get": { + "name": "get", + "arity": 2, + "flags": ["readonly", "fast"], + "first_key_pos": 1, + "last_key_pos": 1, + "step_count": 1, + } + } + + cmd_parser_initialize.side_effect = cmd_init_mock + + return redis.RedisCluster(*args, **kwargs) + class TestRedis(TestBase): def setUp(self): @@ -317,6 +376,68 @@ def test_attributes_unix_socket(self): NetTransportValues.OTHER.value, ) + def test_attributes_redis_cluster(self): + with mock.patch.object(redis.RedisCluster, "from_url") as from_url: + + def from_url_mocked(_url, **_kwargs): + return get_mocked_redis_cluster_client(url=_url, **_kwargs) + + from_url.side_effect = from_url_mocked + redis_client = redis.RedisCluster.from_url( + "redis://foo:bar@1.1.1.1:6380/0" + ) + + with mock.patch.object( + redis._parsers.CommandsParser, "initialize", autospec=True + ) as cmd_parser_initialize: + + def cmd_init_mock(self, r): + self.commands = { + "get": { + "name": "get", + "arity": 2, + "flags": ["readonly", "fast"], + "first_key_pos": 1, + "last_key_pos": 1, + "step_count": 1, + }, + "set": { + "name": "set", + "arity": -3, + "flags": ["write", "denyoom"], + "first_key_pos": 1, + "last_key_pos": 1, + "step_count": 1, + }, + } + + cmd_parser_initialize.side_effect = cmd_init_mock + with mock.patch.object( + redis.connection.ConnectionPool, "get_connection" + ) as get_connection: + get_connection.return_value = mock.MagicMock() + redis_client.set("key", "value") + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + self.assertEqual( + span.attributes[SpanAttributes.DB_SYSTEM], + DbSystemValues.REDIS.value, + ) + self.assertEqual( + span.attributes[SpanAttributes.DB_REDIS_DATABASE_INDEX], 0 + ) + self.assertEqual( + span.attributes[SpanAttributes.NET_PEER_NAME], "1.1.1.1" + ) + self.assertEqual(span.attributes[SpanAttributes.NET_PEER_PORT], 6380) + self.assertEqual( + span.attributes[SpanAttributes.NET_TRANSPORT], + NetTransportValues.IP_TCP.value, + ) + def test_connection_error(self): server = fakeredis.FakeServer() server.connected = False @@ -422,3 +543,4 @@ async def redis_operations(): self.assertEqual(span.attributes.get("db.statement"), "SET ? ?") self.assertEqual(span.kind, SpanKind.CLIENT) self.assertEqual(span.status.status_code, trace.StatusCode.UNSET) +