Skip to content

Commit 67b10ee

Browse files
committed
feat: add custom store prefix support and fix runtime warnings (#99)
- Add store_prefix and vector_prefix parameters to RedisStore and AsyncRedisStore - Enable multiple isolated stores in same Redis instance with custom prefixes - Replace module-level STORE_PREFIX constants with instance attributes - Fix RuntimeWarning by moving set_client_info() from BaseRedisStore to concrete implementations - Fix DeprecationWarning by replacing datetime.utcnow() with datetime.now(timezone.utc)
1 parent 3ccdf77 commit 67b10ee

File tree

5 files changed

+406
-67
lines changed

5 files changed

+406
-67
lines changed

langgraph/checkpoint/redis/message_exporter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Message exporter for extracting conversation messages from checkpoints."""
22

3-
from datetime import datetime
3+
from datetime import datetime, timezone
44
from typing import Any, Dict, List, Optional, Protocol
55

66
import orjson
@@ -162,5 +162,5 @@ def export_thread(self, thread_id: str) -> Dict[str, Any]:
162162
return {
163163
"thread_id": thread_id,
164164
"messages": messages,
165-
"export_timestamp": datetime.utcnow().isoformat(),
165+
"export_timestamp": datetime.now(timezone.utc).isoformat(),
166166
}

langgraph/store/redis/__init__.py

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@
3333
from langgraph.store.redis.aio import AsyncRedisStore
3434
from langgraph.store.redis.base import (
3535
REDIS_KEY_SEPARATOR,
36-
STORE_PREFIX,
37-
STORE_VECTOR_PREFIX,
3836
BaseRedisStore,
3937
RedisDocument,
4038
_decode_ns,
@@ -86,11 +84,21 @@ def __init__(
8684
index: Optional[IndexConfig] = None,
8785
ttl: Optional[TTLConfig] = None,
8886
cluster_mode: Optional[bool] = None,
87+
store_prefix: str = "store",
88+
vector_prefix: str = "store_vectors",
8989
) -> None:
9090
BaseStore.__init__(self)
9191
BaseRedisStore.__init__(
92-
self, conn, index=index, ttl=ttl, cluster_mode=cluster_mode
92+
self,
93+
conn,
94+
index=index,
95+
ttl=ttl,
96+
cluster_mode=cluster_mode,
97+
store_prefix=store_prefix,
98+
vector_prefix=vector_prefix,
9399
)
100+
# Set client info for monitoring (sync store can call this safely)
101+
self.set_client_info()
94102
# Detection will happen in setup()
95103

96104
@classmethod
@@ -101,14 +109,22 @@ def from_conn_string(
101109
*,
102110
index: Optional[IndexConfig] = None,
103111
ttl: Optional[TTLConfig] = None,
112+
store_prefix: str = "store",
113+
vector_prefix: str = "store_vectors",
104114
) -> Iterator[RedisStore]:
105115
"""Create store from Redis connection string."""
106116
client = None
107117
try:
108118
client = RedisConnectionFactory.get_redis_connection(conn_string)
109-
store = cls(client, index=index, ttl=ttl)
110-
# Client info will already be set in __init__, but we set it up here
111-
# to make the method behavior consistent with AsyncRedisStore
119+
store = cls(
120+
client,
121+
index=index,
122+
ttl=ttl,
123+
store_prefix=store_prefix,
124+
vector_prefix=vector_prefix,
125+
)
126+
# Client info is set in __init__, but set it again here to ensure
127+
# it's available even if called before setup()
112128
store.set_client_info()
113129
yield store
114130
finally:
@@ -259,7 +275,7 @@ def _batch_get_ops(
259275
# Also add vector keys for the same document
260276
doc_uuid = doc_id.split(":")[-1]
261277
vector_key = (
262-
f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_uuid}"
278+
f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_uuid}"
263279
)
264280
refresh_keys_by_idx[idx].append(vector_key)
265281

@@ -338,7 +354,9 @@ def _batch_put_ops(
338354
doc_ids[(namespace, op.key)] = generated_doc_id
339355
# Track TTL for this document if specified
340356
if hasattr(op, "ttl") and op.ttl is not None:
341-
main_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{generated_doc_id}"
357+
main_key = (
358+
f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{generated_doc_id}"
359+
)
342360
ttl_tracking[main_key] = ([], op.ttl)
343361

344362
# Load store docs with explicit keys
@@ -352,7 +370,7 @@ def _batch_put_ops(
352370
doc.pop("expires_at", None)
353371

354372
store_docs.append(doc)
355-
redis_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
373+
redis_key = f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
356374
store_keys.append(redis_key)
357375

358376
if store_docs:
@@ -408,11 +426,11 @@ def _batch_put_ops(
408426
"updated_at": datetime.now(timezone.utc).timestamp(),
409427
}
410428
)
411-
redis_vector_key = f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
429+
redis_vector_key = f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
412430
vector_keys.append(redis_vector_key)
413431

414432
# Add this vector key to the related keys list for TTL
415-
main_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
433+
main_key = f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
416434
if main_key in ttl_tracking:
417435
ttl_tracking[main_key][0].append(redis_vector_key)
418436

@@ -472,7 +490,9 @@ def _batch_search_ops(
472490
)
473491
if doc_id:
474492
doc_uuid = doc_id.split(":")[1]
475-
store_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_uuid}"
493+
store_key = (
494+
f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_uuid}"
495+
)
476496
result_map[store_key] = doc
477497
# Fetch individually in cluster mode
478498
store_doc_item = self._redis.json().get(store_key)
@@ -489,7 +509,9 @@ def _batch_search_ops(
489509
if not doc_id:
490510
continue
491511
doc_uuid = doc_id.split(":")[1]
492-
store_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_uuid}"
512+
store_key = (
513+
f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_uuid}"
514+
)
493515
result_map[store_key] = doc
494516
pipe.json().get(store_key)
495517
# Execute all lookups in one batch
@@ -554,7 +576,7 @@ def _batch_search_ops(
554576
# Also find associated vector keys with same ID
555577
doc_id = store_key.split(":")[-1]
556578
vector_key = (
557-
f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
579+
f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
558580
)
559581
refresh_keys.append(vector_key)
560582

@@ -625,7 +647,7 @@ def _batch_search_ops(
625647
# Also find associated vector keys with same ID
626648
doc_id = doc.id.split(":")[-1]
627649
vector_key = (
628-
f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
650+
f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
629651
)
630652
refresh_keys.append(vector_key)
631653

langgraph/store/redis/aio.py

Lines changed: 77 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@
3131

3232
from langgraph.store.redis.base import (
3333
REDIS_KEY_SEPARATOR,
34-
STORE_PREFIX,
35-
STORE_VECTOR_PREFIX,
3634
BaseRedisStore,
3735
RedisDocument,
3836
_decode_ns,
@@ -74,8 +72,10 @@ def __init__(
7472
redis_client: Optional[AsyncRedis] = None,
7573
index: Optional[IndexConfig] = None,
7674
connection_args: Optional[dict[str, Any]] = None,
77-
ttl: Optional[dict[str, Any]] = None,
75+
ttl: Optional[TTLConfig] = None,
7876
cluster_mode: Optional[bool] = None,
77+
store_prefix: str = "store",
78+
vector_prefix: str = "store_vectors",
7979
) -> None:
8080
"""Initialize store with Redis connection and optional index config."""
8181
if redis_url is None and redis_client is None:
@@ -84,29 +84,7 @@ def __init__(
8484
# Initialize base classes
8585
AsyncBatchedBaseStore.__init__(self)
8686

87-
# Set up store configuration
88-
self.index_config = index
89-
self.ttl_config = ttl
90-
91-
if self.index_config:
92-
self.index_config = self.index_config.copy()
93-
self.embeddings = ensure_embeddings(
94-
self.index_config.get("embed"),
95-
)
96-
fields = (
97-
self.index_config.get("text_fields", ["$"])
98-
or self.index_config.get("fields", ["$"])
99-
or []
100-
)
101-
if isinstance(fields, str):
102-
fields = [fields]
103-
104-
self.index_config["__tokenized_fields"] = [
105-
(p, tokenize_path(p)) if p != "$" else (p, p)
106-
for p in (self.index_config.get("fields") or ["$"])
107-
]
108-
109-
# Configure client
87+
# Configure client first
11088
self.configure_client(
11189
redis_url=redis_url,
11290
redis_client=redis_client,
@@ -116,16 +94,60 @@ def __init__(
11694
# Validate and store cluster_mode; None means auto-detect later
11795
if cluster_mode is not None and not isinstance(cluster_mode, bool):
11896
raise TypeError("cluster_mode must be a boolean or None")
119-
self.cluster_mode: Optional[bool] = cluster_mode
12097

121-
# Create store index
98+
# Initialize BaseRedisStore with prefix parameters
99+
BaseRedisStore.__init__(
100+
self,
101+
conn=self._redis,
102+
index=index,
103+
ttl=ttl,
104+
cluster_mode=cluster_mode,
105+
store_prefix=store_prefix,
106+
vector_prefix=vector_prefix,
107+
)
108+
109+
# Update store_index to async version
110+
from copy import deepcopy
111+
112+
store_schema = {
113+
"index": {
114+
"name": self.store_prefix,
115+
"prefix": self.store_prefix + REDIS_KEY_SEPARATOR,
116+
"storage_type": "json",
117+
},
118+
"fields": [
119+
{"name": "prefix", "type": "text"},
120+
{"name": "key", "type": "tag"},
121+
{"name": "created_at", "type": "numeric"},
122+
{"name": "updated_at", "type": "numeric"},
123+
{"name": "ttl_minutes", "type": "numeric"},
124+
{"name": "expires_at", "type": "numeric"},
125+
],
126+
}
122127
self.store_index = AsyncSearchIndex.from_dict(
123-
self.SCHEMAS[0], redis_client=self._redis
128+
store_schema, redis_client=self._redis
124129
)
125130

126131
# Configure vector index if needed
127132
if self.index_config:
128-
vector_schema = self.SCHEMAS[1].copy()
133+
# Create custom vector schema with instance prefix
134+
vector_schema = {
135+
"index": {
136+
"name": self.vector_prefix,
137+
"prefix": self.vector_prefix + REDIS_KEY_SEPARATOR,
138+
"storage_type": "json",
139+
},
140+
"fields": [
141+
{"name": "prefix", "type": "text"},
142+
{"name": "key", "type": "tag"},
143+
{"name": "field_name", "type": "tag"},
144+
{"name": "embedding", "type": "vector"},
145+
{"name": "created_at", "type": "numeric"},
146+
{"name": "updated_at", "type": "numeric"},
147+
{"name": "ttl_minutes", "type": "numeric"},
148+
{"name": "expires_at", "type": "numeric"},
149+
],
150+
}
129151
vector_fields = vector_schema.get("fields", [])
130152
vector_field = None
131153
for f in vector_fields:
@@ -301,10 +323,18 @@ async def from_conn_string(
301323
conn_string: str,
302324
*,
303325
index: Optional[IndexConfig] = None,
304-
ttl: Optional[dict[str, Any]] = None,
326+
ttl: Optional[TTLConfig] = None,
327+
store_prefix: str = "store",
328+
vector_prefix: str = "store_vectors",
305329
) -> AsyncIterator[AsyncRedisStore]:
306330
"""Create store from Redis connection string."""
307-
async with cls(redis_url=conn_string, index=index, ttl=ttl) as store:
331+
async with cls(
332+
redis_url=conn_string,
333+
index=index,
334+
ttl=ttl,
335+
store_prefix=store_prefix,
336+
vector_prefix=vector_prefix,
337+
) as store:
308338
await store.setup()
309339
# Set client information after setup
310340
await store.aset_client_info()
@@ -460,7 +490,7 @@ async def _batch_get_ops(
460490
# Also add vector keys for the same document
461491
doc_uuid = doc_id.split(":")[-1]
462492
vector_key = (
463-
f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_uuid}"
493+
f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_uuid}"
464494
)
465495
refresh_keys_by_idx[idx].append(vector_key)
466496

@@ -624,7 +654,9 @@ async def _batch_put_ops(
624654
doc_ids[(namespace, op.key)] = generated_doc_id
625655
# Track TTL for this document if specified
626656
if hasattr(op, "ttl") and op.ttl is not None:
627-
main_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{generated_doc_id}"
657+
main_key = (
658+
f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{generated_doc_id}"
659+
)
628660
ttl_tracking[main_key] = ([], op.ttl)
629661

630662
# Load store docs with explicit keys
@@ -638,7 +670,7 @@ async def _batch_put_ops(
638670
doc.pop("expires_at", None)
639671

640672
store_docs.append(doc)
641-
redis_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
673+
redis_key = f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
642674
store_keys.append(redis_key)
643675

644676
if store_docs:
@@ -674,11 +706,11 @@ async def _batch_put_ops(
674706
"updated_at": datetime.now(timezone.utc).timestamp(),
675707
}
676708
)
677-
redis_vector_key = f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
709+
redis_vector_key = f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
678710
vector_keys.append(redis_vector_key)
679711

680712
# Add this vector key to the related keys list for TTL
681-
main_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
713+
main_key = f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
682714
if main_key in ttl_tracking:
683715
ttl_tracking[main_key][0].append(redis_vector_key)
684716

@@ -740,7 +772,9 @@ async def _batch_search_ops(
740772
)
741773
if doc_id:
742774
doc_uuid = doc_id.split(":")[1]
743-
store_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_uuid}"
775+
store_key = (
776+
f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_uuid}"
777+
)
744778
result_map[store_key] = doc
745779
# Fetch individually in cluster mode
746780
store_doc_item = await self._redis.json().get(store_key) # type: ignore
@@ -760,7 +794,9 @@ async def _batch_search_ops(
760794
)
761795
if doc_id:
762796
doc_uuid = doc_id.split(":")[1]
763-
store_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_uuid}"
797+
store_key = (
798+
f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_uuid}"
799+
)
764800
result_map[store_key] = doc
765801
pipeline.json().get(store_key)
766802
store_docs_raw = await pipeline.execute()
@@ -825,7 +861,7 @@ async def _batch_search_ops(
825861
# Also find associated vector keys with same ID
826862
doc_id = store_key.split(":")[-1]
827863
vector_key = (
828-
f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
864+
f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
829865
)
830866
refresh_keys.append(vector_key)
831867

@@ -897,7 +933,7 @@ async def _batch_search_ops(
897933
# Also find associated vector keys with same ID
898934
doc_id = doc.id.split(":")[-1]
899935
vector_key = (
900-
f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
936+
f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
901937
)
902938
refresh_keys.append(vector_key)
903939

0 commit comments

Comments
 (0)