Skip to content

Commit 8b8261e

Browse files
committed
fix: handle ClusterPipeline missing nodes_manager attribute (#365)
- Add get_protocol_version wrapper to safely handle ClusterPipeline objects - ClusterPipeline objects lack nodes_manager attribute, causing AttributeError - Fallback to protocol "3" when unable to detect version from cluster pipelines - Handle both sync and async ClusterPipeline cases Fixes #365
1 parent 547085a commit 8b8261e

File tree

5 files changed

+560
-2
lines changed

5 files changed

+560
-2
lines changed

redisvl/index/index.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@
4949

5050
from redis import __version__ as redis_version
5151
from redis.client import NEVER_DECODE
52-
from redis.commands.helpers import get_protocol_version # type: ignore
52+
53+
from redisvl.utils.redis_protocol import get_protocol_version
5354

5455
# Redis 5.x compatibility (6 fixed the import path)
5556
if redis_version.startswith("5"):

redisvl/redis/utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from redis import __version__ as redis_version
88
from redis.asyncio.cluster import RedisCluster as AsyncRedisCluster
99
from redis.client import NEVER_DECODE, Pipeline
10-
from redis.commands.helpers import get_protocol_version
1110
from redis.commands.search import AsyncSearch, Search
1211
from redis.commands.search.commands import (
1312
CREATE_CMD,
@@ -23,6 +22,8 @@
2322
)
2423
from redis.commands.search.field import Field
2524

25+
from redisvl.utils.redis_protocol import get_protocol_version
26+
2627
# Redis 5.x compatibility (6 fixed the import path)
2728
if redis_version.startswith("5"):
2829
from redis.commands.search.indexDefinition import ( # type: ignore[import-untyped]

redisvl/utils/redis_protocol.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
"""
2+
Utilities for handling Redis protocol version detection safely across different client types.
3+
4+
This module provides safe wrappers around redis-py's get_protocol_version function
5+
to handle edge cases with Redis Cluster pipelines.
6+
"""
7+
8+
from typing import Union
9+
10+
from redis.asyncio.cluster import ClusterPipeline as AsyncClusterPipeline
11+
from redis.cluster import ClusterPipeline
12+
from redis.commands.helpers import get_protocol_version as redis_get_protocol_version
13+
14+
from redisvl.utils.log import get_logger
15+
16+
logger = get_logger(__name__)
17+
18+
19+
def get_protocol_version(client) -> str:
20+
"""
21+
Wrapper for redis-py's get_protocol_version that handles ClusterPipeline.
22+
23+
ClusterPipeline doesn't have nodes_manager attribute, so we need to
24+
handle this case specially to avoid AttributeError.
25+
26+
Args:
27+
client: Redis client, pipeline, or cluster pipeline object
28+
29+
Returns:
30+
str: Protocol version ("2" or "3")
31+
32+
Note:
33+
This function addresses issue #365 where get_protocol_version() fails
34+
with ClusterPipeline objects due to missing nodes_manager attribute.
35+
"""
36+
# Handle sync ClusterPipeline
37+
if isinstance(client, ClusterPipeline):
38+
try:
39+
# Try to get protocol from the underlying cluster client
40+
if hasattr(client, "_redis_cluster") and client._redis_cluster:
41+
try:
42+
result = redis_get_protocol_version(client._redis_cluster)
43+
if result is not None:
44+
return result
45+
except (AttributeError, Exception):
46+
# If anything fails, fall back to default
47+
pass
48+
49+
logger.debug(
50+
"ClusterPipeline without valid _redis_cluster, defaulting to protocol 3"
51+
)
52+
return "3"
53+
except AttributeError as e:
54+
logger.debug(
55+
f"Failed to get protocol version from ClusterPipeline: {e}, defaulting to protocol 3"
56+
)
57+
return "3"
58+
59+
# Handle async ClusterPipeline
60+
if isinstance(client, AsyncClusterPipeline):
61+
try:
62+
# Try to get protocol from the underlying cluster client
63+
if hasattr(client, "_redis_cluster") and client._redis_cluster:
64+
try:
65+
result = redis_get_protocol_version(client._redis_cluster)
66+
if result is not None:
67+
return result
68+
except (AttributeError, Exception):
69+
# If anything fails, fall back to default
70+
pass
71+
72+
logger.debug(
73+
"AsyncClusterPipeline without valid _redis_cluster, defaulting to protocol 3"
74+
)
75+
return "3"
76+
except AttributeError as e:
77+
logger.debug(
78+
f"Failed to get protocol version from AsyncClusterPipeline: {e}, defaulting to protocol 3"
79+
)
80+
return "3"
81+
82+
# For all other client types, use the standard function
83+
try:
84+
result = redis_get_protocol_version(client)
85+
if result is None:
86+
logger.warning(
87+
f"get_protocol_version returned None for client {type(client)}, defaulting to protocol 3"
88+
)
89+
return "3"
90+
return result
91+
except AttributeError as e:
92+
logger.warning(
93+
f"Failed to get protocol version from client {type(client)}: {e}, defaulting to protocol 3"
94+
)
95+
return "3"
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
"""
2+
Integration test for issue #365: ClusterPipeline AttributeError fix
3+
https://github.com/redis/redis-vl-python/issues/365
4+
5+
This test verifies that the safe_get_protocol_version fix prevents the
6+
AttributeError: 'ClusterPipeline' object has no attribute 'nodes_manager'
7+
"""
8+
9+
from unittest.mock import Mock
10+
11+
import pytest
12+
from redis.asyncio.cluster import ClusterPipeline as AsyncClusterPipeline
13+
from redis.cluster import ClusterPipeline
14+
15+
from redisvl.index import SearchIndex
16+
from redisvl.query import FilterQuery
17+
from redisvl.schema import IndexSchema
18+
from redisvl.utils.redis_protocol import get_protocol_version
19+
20+
21+
def test_pipeline_operations_no_nodes_manager_error(redis_url):
22+
"""
23+
Test that pipeline operations don't fail with nodes_manager AttributeError.
24+
25+
Before the fix, operations that use get_protocol_version() internally would fail
26+
with AttributeError when using ClusterPipeline. This test ensures those operations
27+
now work without that specific error.
28+
"""
29+
# Create a simple schema
30+
schema_dict = {
31+
"index": {"name": "test-365-fix", "prefix": "doc", "storage_type": "hash"},
32+
"fields": [{"name": "id", "type": "tag"}, {"name": "text", "type": "text"}],
33+
}
34+
35+
schema = IndexSchema.from_dict(schema_dict)
36+
index = SearchIndex(schema, redis_url=redis_url)
37+
38+
# Create the index
39+
index.create(overwrite=True)
40+
41+
try:
42+
# Test 1: Load with batching (uses pipelines internally)
43+
test_data = [{"id": f"item{i}", "text": f"Document {i}"} for i in range(10)]
44+
45+
# This would fail with AttributeError before the fix
46+
keys = index.load(
47+
data=test_data,
48+
id_field="id",
49+
batch_size=3, # Force multiple pipeline operations
50+
)
51+
52+
assert len(keys) == 10
53+
54+
# Test 2: Batch search (uses safe_get_protocol_version internally)
55+
queries = [FilterQuery(filter_expression=f"@id:{{item{i}}}") for i in range(3)]
56+
57+
try:
58+
# The critical test: no AttributeError about nodes_manager
59+
results = index.batch_search(queries, batch_size=2)
60+
assert len(results) == 3
61+
except Exception as e:
62+
# If there's an error, it must NOT be the nodes_manager AttributeError
63+
assert "nodes_manager" not in str(
64+
e
65+
), f"Got nodes_manager error which indicates fix isn't working: {e}"
66+
67+
# Test 3: TTL operations
68+
try:
69+
index.expire_keys(keys[:3], 3600)
70+
except Exception as e:
71+
# Again, ensure no nodes_manager error
72+
assert "nodes_manager" not in str(e)
73+
74+
finally:
75+
index.delete()
76+
77+
78+
def test_json_storage_no_error(redis_url):
79+
"""Test with JSON storage type."""
80+
schema_dict = {
81+
"index": {"name": "test-365-json", "prefix": "json", "storage_type": "json"},
82+
"fields": [{"name": "id", "type": "tag"}, {"name": "data", "type": "text"}],
83+
}
84+
85+
schema = IndexSchema.from_dict(schema_dict)
86+
index = SearchIndex(schema, redis_url=redis_url)
87+
88+
index.create(overwrite=True)
89+
90+
try:
91+
# Load test data
92+
test_data = [{"id": f"doc{i}", "data": f"Document {i}"} for i in range(5)]
93+
94+
# Should work without nodes_manager AttributeError
95+
keys = index.load(data=test_data, id_field="id", batch_size=2)
96+
97+
assert len(keys) == 5
98+
99+
finally:
100+
index.delete()
101+
102+
103+
def test_clusterpipeline_with_valid_redis_cluster_attribute():
104+
"""
105+
Test get_protocol_version when ClusterPipeline has _redis_cluster attribute.
106+
"""
107+
# Create mock ClusterPipeline with _redis_cluster attribute
108+
mock_pipeline = Mock(spec=ClusterPipeline)
109+
mock_cluster = Mock()
110+
mock_cluster.nodes_manager.connection_kwargs.get.return_value = "3"
111+
mock_pipeline._redis_cluster = mock_cluster
112+
113+
# Should successfully get protocol from _redis_cluster
114+
result = get_protocol_version(mock_pipeline)
115+
assert result == "3"
116+
117+
118+
def test_clusterpipeline_with_none_redis_cluster():
119+
"""
120+
Test get_protocol_version when _redis_cluster is None.
121+
"""
122+
mock_pipeline = Mock(spec=ClusterPipeline)
123+
mock_pipeline._redis_cluster = None
124+
125+
# Should fallback to "3"
126+
result = get_protocol_version(mock_pipeline)
127+
assert result == "3"
128+
129+
130+
def test_async_clusterpipeline_without_nodes_manager():
131+
"""
132+
Test get_protocol_version with AsyncClusterPipeline missing nodes_manager.
133+
"""
134+
mock_pipeline = Mock(spec=AsyncClusterPipeline)
135+
# Ensure no nodes_manager attribute
136+
if hasattr(mock_pipeline, "nodes_manager"):
137+
delattr(mock_pipeline, "nodes_manager")
138+
mock_pipeline._redis_cluster = None
139+
140+
# Should fallback to "3" without error
141+
result = get_protocol_version(mock_pipeline)
142+
assert result == "3"

0 commit comments

Comments
 (0)