66from contextlib import asynccontextmanager
77from datetime import datetime , timedelta , timezone
88from types import TracebackType
9- from typing import Any , AsyncIterator , Iterable , Optional , Sequence , cast
9+ from typing import Any , AsyncIterator , Iterable , Optional , Sequence , cast , Union
1010
1111from langgraph .store .base import (
12- BaseStore ,
1312 GetOp ,
1413 IndexConfig ,
1514 ListNamespacesOp ,
2221 get_text_at_path ,
2322 tokenize_path ,
2423)
25- from langgraph .store .base .batch import AsyncBatchedBaseStore , _dedupe_ops
24+ from langgraph .store .base .batch import AsyncBatchedBaseStore
2625from redis import ResponseError
2726from redis .asyncio import Redis as AsyncRedis
2827from redis .commands .search .query import Query
5049
5150_token_escaper = TokenEscaper ()
5251_token_unescaper = TokenUnescaper ()
52+ from redis .asyncio .cluster import RedisCluster as AsyncRedisCluster
5353
5454
5555class AsyncRedisStore (
@@ -65,7 +65,8 @@ class AsyncRedisStore(
6565 _async_ttl_stop_event : asyncio .Event | None = None
6666 _ttl_sweeper_task : asyncio .Task | None = None
6767 ttl_config : Optional [TTLConfig ] = None
68- cluster_mode : bool = False
68+ # Whether to assume the Redis server is a cluster; None triggers auto-detection
69+ cluster_mode : Optional [bool ] = None
6970
7071 def __init__ (
7172 self ,
@@ -75,6 +76,7 @@ def __init__(
7576 index : Optional [IndexConfig ] = None ,
7677 connection_args : Optional [dict [str , Any ]] = None ,
7778 ttl : Optional [dict [str , Any ]] = None ,
79+ cluster_mode : Optional [bool ] = None ,
7880 ) -> None :
7981 """Initialize store with Redis connection and optional index config."""
8082 if redis_url is None and redis_client is None :
@@ -112,6 +114,11 @@ def __init__(
112114 connection_args = connection_args or {},
113115 )
114116
117+ # Validate and store cluster_mode; None means auto-detect later
118+ if cluster_mode is not None and not isinstance (cluster_mode , bool ):
119+ raise TypeError ("cluster_mode must be a boolean or None" )
120+ self .cluster_mode : Optional [bool ] = cluster_mode
121+
115122 # Create store index
116123 self .store_index = AsyncSearchIndex .from_dict (
117124 self .SCHEMAS [0 ], redis_client = self ._redis
@@ -184,24 +191,28 @@ async def setup(self) -> None:
184191 self .index_config .get ("embed" ),
185192 )
186193
187- await self ._detect_cluster_mode ()
194+ # Auto-detect cluster mode if not explicitly set
195+ if self .cluster_mode is None :
196+ await self ._detect_cluster_mode ()
197+ else :
198+ logger .info (
199+ f"Redis cluster_mode explicitly set to { self .cluster_mode } , skipping detection."
200+ )
188201
189202 # Create indices in Redis
190203 await self .store_index .create (overwrite = False )
191204 if self .index_config :
192205 await self .vector_index .create (overwrite = False )
193206
194207 async def _detect_cluster_mode (self ) -> None :
195- """Detect if the Redis client is a cluster client."""
196- try :
197- # Try to run a cluster command
198- # This will succeed for cluster clients and fail for non-cluster clients
199- await self ._redis .cluster ("info" )
208+ """Detect if the Redis client is a cluster client by inspecting its class."""
209+ # Determine cluster mode based on client class
210+ if isinstance (self ._redis , AsyncRedisCluster ):
200211 self .cluster_mode = True
201- logger .info ("Redis cluster mode detected for AsyncRedisStore." )
202- except ( ResponseError , AttributeError ) :
212+ logger .info ("Redis cluster client detected for AsyncRedisStore." )
213+ else :
203214 self .cluster_mode = False
204- logger .info ("Redis standalone mode detected for AsyncRedisStore." )
215+ logger .info ("Redis standalone client detected for AsyncRedisStore." )
205216
206217 # This can't be properly typed due to covariance issues with async methods
207218 async def _apply_ttl_to_keys (
0 commit comments