Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2753](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2753))
- `opentelemetry-instrumentation-grpc` Fix grpc supported version
([#2845](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2845))
- `opentelemetry-instrumentation-redis` handle connection attributes of redis.cluster.RedisCluster
([#2626](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2626)

## Version 1.26.0/0.47b0 (2024-07-23)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,15 @@ def response_hook(span, instance, response):
_REDIS_ASYNCIO_CLUSTER_VERSION = (4, 3, 2)


def _set_connection_attributes(span, conn):
if not span.is_recording() or not hasattr(conn, "connection_pool"):
def _set_connection_attributes(span, instance):
if hasattr(instance, "nodes_manager") and hasattr(
instance.nodes_manager.default_node, "redis_connection"
):
instance = instance.nodes_manager.default_node.redis_connection
if not span.is_recording() or not hasattr(instance, "connection_pool"):
return
for key, value in _extract_conn_attributes(
conn.connection_pool.connection_kwargs
instance.connection_pool.connection_kwargs
).items():
span.set_attribute(key, value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,65 @@
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import SpanKind

default_cluster_slots = [
[0, 8191, ["1.1.1.1", 6380, "node_0"], ["1.1.1.1", 6383, "node_3"]],
[8192, 16383, ["1.1.1.1", 6381, "node_1"], ["1.1.1.1", 6382, "node_2"]],
]


def get_mocked_redis_cluster_client(
*args, func=None, cluster_slots_raise_error=False, **kwargs
): # noqa
"""
Return a stable RedisCluster object that have deterministic
nodes and slots setup to remove the problem of different IP addresses
on different installations and machines.
"""
cluster_slots = kwargs.pop("cluster_slots", default_cluster_slots)
coverage_res = kwargs.pop("coverage_result", "yes")
cluster_enabled = kwargs.pop("cluster_enabled", True)
with mock.patch.object(
redis.Redis, "execute_command"
) as execute_command_mock:

def execute_command(*_args, **_kwargs):
if _args[0] == "CLUSTER SLOTS":
if cluster_slots_raise_error:
raise redis.exceptions.ResponseError()
mock_cluster_slots = cluster_slots
return mock_cluster_slots
if _args[0] == "COMMAND":
return {"get": [], "set": []}
if _args[0] == "INFO":
return {"cluster_enabled": cluster_enabled}
if len(_args) > 1 and _args[1] == "cluster-require-full-coverage":
return {"cluster-require-full-coverage": coverage_res}
if func is not None:
return func(*args, **kwargs)
return execute_command_mock(*_args, **_kwargs)

execute_command_mock.side_effect = execute_command

with mock.patch.object(
redis._parsers.CommandsParser, "initialize", autospec=True
) as cmd_parser_initialize:

def cmd_init_mock(self, r):
self.commands = {
"get": {
"name": "get",
"arity": 2,
"flags": ["readonly", "fast"],
"first_key_pos": 1,
"last_key_pos": 1,
"step_count": 1,
}
}

cmd_parser_initialize.side_effect = cmd_init_mock

return redis.RedisCluster(*args, **kwargs)


class TestRedis(TestBase):
def setUp(self):
Expand Down Expand Up @@ -317,6 +376,68 @@ def test_attributes_unix_socket(self):
NetTransportValues.OTHER.value,
)

def test_attributes_redis_cluster(self):
with mock.patch.object(redis.RedisCluster, "from_url") as from_url:

def from_url_mocked(_url, **_kwargs):
return get_mocked_redis_cluster_client(url=_url, **_kwargs)

from_url.side_effect = from_url_mocked
redis_client = redis.RedisCluster.from_url(
"redis://foo:[email protected]:6380/0"
)

with mock.patch.object(
redis._parsers.CommandsParser, "initialize", autospec=True
) as cmd_parser_initialize:

def cmd_init_mock(self, r):
self.commands = {
"get": {
"name": "get",
"arity": 2,
"flags": ["readonly", "fast"],
"first_key_pos": 1,
"last_key_pos": 1,
"step_count": 1,
},
"set": {
"name": "set",
"arity": -3,
"flags": ["write", "denyoom"],
"first_key_pos": 1,
"last_key_pos": 1,
"step_count": 1,
},
}

cmd_parser_initialize.side_effect = cmd_init_mock
with mock.patch.object(
redis.connection.ConnectionPool, "get_connection"
) as get_connection:
get_connection.return_value = mock.MagicMock()
redis_client.set("key", "value")

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)

span = spans[0]
self.assertEqual(
span.attributes[SpanAttributes.DB_SYSTEM],
DbSystemValues.REDIS.value,
)
self.assertEqual(
span.attributes[SpanAttributes.DB_REDIS_DATABASE_INDEX], 0
)
self.assertEqual(
span.attributes[SpanAttributes.NET_PEER_NAME], "1.1.1.1"
)
self.assertEqual(span.attributes[SpanAttributes.NET_PEER_PORT], 6380)
self.assertEqual(
span.attributes[SpanAttributes.NET_TRANSPORT],
NetTransportValues.IP_TCP.value,
)

def test_connection_error(self):
server = fakeredis.FakeServer()
server.connected = False
Expand Down Expand Up @@ -422,3 +543,4 @@ async def redis_operations():
self.assertEqual(span.attributes.get("db.statement"), "SET ? ?")
self.assertEqual(span.kind, SpanKind.CLIENT)
self.assertEqual(span.status.status_code, trace.StatusCode.UNSET)