Skip to content

Commit 827c6d9

Browse files
committed
Clean up index schema organization
1 parent 5a908ef commit 827c6d9

File tree

5 files changed

+72
-116
lines changed

5 files changed

+72
-116
lines changed

langgraph/checkpoint/redis/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,13 @@ def configure_client(
119119

120120
def create_indexes(self) -> None:
121121
self.checkpoints_index = SearchIndex.from_dict(
122-
self.SCHEMAS[0], redis_client=self._redis
122+
self.checkpoints_schema, redis_client=self._redis
123123
)
124124
self.checkpoint_blobs_index = SearchIndex.from_dict(
125-
self.SCHEMAS[1], redis_client=self._redis
125+
self.blobs_schema, redis_client=self._redis
126126
)
127127
self.checkpoint_writes_index = SearchIndex.from_dict(
128-
self.SCHEMAS[2], redis_client=self._redis
128+
self.writes_schema, redis_client=self._redis
129129
)
130130

131131
def _make_redis_checkpoint_key_cached(

langgraph/checkpoint/redis/aio.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,13 @@ def configure_client(
129129
def create_indexes(self) -> None:
130130
"""Create indexes without connecting to Redis."""
131131
self.checkpoints_index = AsyncSearchIndex.from_dict(
132-
self.SCHEMAS[0], redis_client=self._redis
132+
self.checkpoints_schema, redis_client=self._redis
133133
)
134134
self.checkpoint_blobs_index = AsyncSearchIndex.from_dict(
135-
self.SCHEMAS[1], redis_client=self._redis
135+
self.blobs_schema, redis_client=self._redis
136136
)
137137
self.checkpoint_writes_index = AsyncSearchIndex.from_dict(
138-
self.SCHEMAS[2], redis_client=self._redis
138+
self.writes_schema, redis_client=self._redis
139139
)
140140

141141
def _make_redis_checkpoint_key_cached(

langgraph/checkpoint/redis/ashallow.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -696,14 +696,14 @@ def configure_client(
696696
def create_indexes(self) -> None:
697697
"""Create indexes without connecting to Redis."""
698698
self.checkpoints_index = AsyncSearchIndex.from_dict(
699-
self.SCHEMAS[0], redis_client=self._redis
699+
self.checkpoints_schema, redis_client=self._redis
700700
)
701701
# Shallow implementation doesn't use blobs, but base class requires the attribute
702702
self.checkpoint_blobs_index = AsyncSearchIndex.from_dict(
703-
self.SCHEMAS[1], redis_client=self._redis
703+
self.blobs_schema, redis_client=self._redis
704704
)
705705
self.checkpoint_writes_index = AsyncSearchIndex.from_dict(
706-
self.SCHEMAS[2], redis_client=self._redis
706+
self.writes_schema, redis_client=self._redis
707707
)
708708

709709
def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:

langgraph/checkpoint/redis/base.py

Lines changed: 60 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -35,57 +35,6 @@
3535
CHECKPOINT_BLOB_PREFIX = "checkpoint_blob"
3636
CHECKPOINT_WRITE_PREFIX = "checkpoint_write"
3737

38-
SCHEMAS = [
39-
{
40-
"index": {
41-
"name": "checkpoints",
42-
"prefix": CHECKPOINT_PREFIX + REDIS_KEY_SEPARATOR,
43-
"storage_type": "json",
44-
},
45-
"fields": [
46-
{"name": "thread_id", "type": "tag"},
47-
{"name": "checkpoint_ns", "type": "tag"},
48-
{"name": "checkpoint_id", "type": "tag"},
49-
{"name": "parent_checkpoint_id", "type": "tag"},
50-
{"name": "checkpoint_ts", "type": "numeric"},
51-
{"name": "source", "type": "tag"},
52-
{"name": "step", "type": "numeric"},
53-
{"name": "has_writes", "type": "tag"},
54-
],
55-
},
56-
{
57-
"index": {
58-
"name": "checkpoints_blobs",
59-
"prefix": CHECKPOINT_BLOB_PREFIX + REDIS_KEY_SEPARATOR,
60-
"storage_type": "json",
61-
},
62-
"fields": [
63-
{"name": "thread_id", "type": "tag"},
64-
{"name": "checkpoint_ns", "type": "tag"},
65-
{"name": "checkpoint_id", "type": "tag"},
66-
{"name": "channel", "type": "tag"},
67-
{"name": "version", "type": "tag"},
68-
{"name": "type", "type": "tag"},
69-
],
70-
},
71-
{
72-
"index": {
73-
"name": "checkpoint_writes",
74-
"prefix": CHECKPOINT_WRITE_PREFIX + REDIS_KEY_SEPARATOR,
75-
"storage_type": "json",
76-
},
77-
"fields": [
78-
{"name": "thread_id", "type": "tag"},
79-
{"name": "checkpoint_ns", "type": "tag"},
80-
{"name": "checkpoint_id", "type": "tag"},
81-
{"name": "task_id", "type": "tag"},
82-
{"name": "idx", "type": "numeric"},
83-
{"name": "channel", "type": "tag"},
84-
{"name": "type", "type": "tag"},
85-
],
86-
},
87-
]
88-
8938

9039
class BaseRedisSaver(BaseCheckpointSaver[str], Generic[RedisClientType, IndexType]):
9140
"""Base Redis implementation for checkpoint saving.
@@ -96,7 +45,6 @@ class BaseRedisSaver(BaseCheckpointSaver[str], Generic[RedisClientType, IndexTyp
9645
_redis: RedisClientType
9746
_owns_its_client: bool = False
9847
_key_registry: Optional[Any] = None
99-
SCHEMAS = SCHEMAS
10048

10149
checkpoints_index: IndexType
10250
checkpoint_blobs_index: IndexType
@@ -138,58 +86,6 @@ def __init__(
13886
self._checkpoint_blob_prefix = checkpoint_blob_prefix
13987
self._checkpoint_write_prefix = checkpoint_write_prefix
14088

141-
# Create instance-level schemas with custom prefixes
142-
self.SCHEMAS = [
143-
{
144-
"index": {
145-
"name": self._checkpoint_prefix,
146-
"prefix": self._checkpoint_prefix + REDIS_KEY_SEPARATOR,
147-
"storage_type": "json",
148-
},
149-
"fields": [
150-
{"name": "thread_id", "type": "tag"},
151-
{"name": "checkpoint_ns", "type": "tag"},
152-
{"name": "checkpoint_id", "type": "tag"},
153-
{"name": "parent_checkpoint_id", "type": "tag"},
154-
{"name": "checkpoint_ts", "type": "numeric"},
155-
{"name": "source", "type": "tag"},
156-
{"name": "step", "type": "numeric"},
157-
{"name": "has_writes", "type": "tag"},
158-
],
159-
},
160-
{
161-
"index": {
162-
"name": self._checkpoint_blob_prefix,
163-
"prefix": self._checkpoint_blob_prefix + REDIS_KEY_SEPARATOR,
164-
"storage_type": "json",
165-
},
166-
"fields": [
167-
{"name": "thread_id", "type": "tag"},
168-
{"name": "checkpoint_ns", "type": "tag"},
169-
{"name": "checkpoint_id", "type": "tag"},
170-
{"name": "channel", "type": "tag"},
171-
{"name": "version", "type": "tag"},
172-
{"name": "type", "type": "tag"},
173-
],
174-
},
175-
{
176-
"index": {
177-
"name": self._checkpoint_write_prefix,
178-
"prefix": self._checkpoint_write_prefix + REDIS_KEY_SEPARATOR,
179-
"storage_type": "json",
180-
},
181-
"fields": [
182-
{"name": "thread_id", "type": "tag"},
183-
{"name": "checkpoint_ns", "type": "tag"},
184-
{"name": "checkpoint_id", "type": "tag"},
185-
{"name": "task_id", "type": "tag"},
186-
{"name": "idx", "type": "numeric"},
187-
{"name": "channel", "type": "tag"},
188-
{"name": "type", "type": "tag"},
189-
],
190-
},
191-
]
192-
19389
self.configure_client(
19490
redis_url=redis_url,
19591
redis_client=redis_client,
@@ -202,6 +98,66 @@ def __init__(
20298
self.checkpoint_writes_index: IndexType
20399
self.create_indexes()
204100

101+
@property
102+
def checkpoints_schema(self) -> Dict[str, Any]:
103+
"""Schema for the checkpoints index."""
104+
return {
105+
"index": {
106+
"name": "checkpoints",
107+
"prefix": self._checkpoint_prefix + REDIS_KEY_SEPARATOR,
108+
"storage_type": "json",
109+
},
110+
"fields": [
111+
{"name": "thread_id", "type": "tag"},
112+
{"name": "checkpoint_ns", "type": "tag"},
113+
{"name": "checkpoint_id", "type": "tag"},
114+
{"name": "parent_checkpoint_id", "type": "tag"},
115+
{"name": "checkpoint_ts", "type": "numeric"},
116+
{"name": "source", "type": "tag"},
117+
{"name": "step", "type": "numeric"},
118+
{"name": "has_writes", "type": "tag"},
119+
],
120+
}
121+
122+
@property
123+
def blobs_schema(self) -> Dict[str, Any]:
124+
"""Schema for the checkpoint blobs index."""
125+
return {
126+
"index": {
127+
"name": "checkpoints_blobs",
128+
"prefix": self._checkpoint_blob_prefix + REDIS_KEY_SEPARATOR,
129+
"storage_type": "json",
130+
},
131+
"fields": [
132+
{"name": "thread_id", "type": "tag"},
133+
{"name": "checkpoint_ns", "type": "tag"},
134+
{"name": "checkpoint_id", "type": "tag"},
135+
{"name": "channel", "type": "tag"},
136+
{"name": "version", "type": "tag"},
137+
{"name": "type", "type": "tag"},
138+
],
139+
}
140+
141+
@property
142+
def writes_schema(self) -> Dict[str, Any]:
143+
"""Schema for the checkpoint writes index."""
144+
return {
145+
"index": {
146+
"name": "checkpoint_writes",
147+
"prefix": self._checkpoint_write_prefix + REDIS_KEY_SEPARATOR,
148+
"storage_type": "json",
149+
},
150+
"fields": [
151+
{"name": "thread_id", "type": "tag"},
152+
{"name": "checkpoint_ns", "type": "tag"},
153+
{"name": "checkpoint_id", "type": "tag"},
154+
{"name": "task_id", "type": "tag"},
155+
{"name": "idx", "type": "numeric"},
156+
{"name": "channel", "type": "tag"},
157+
{"name": "type", "type": "tag"},
158+
],
159+
}
160+
205161
@abstractmethod
206162
def create_indexes(self) -> None:
207163
"""Create appropriate SearchIndex instances."""

langgraph/checkpoint/redis/shallow.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -445,14 +445,14 @@ def configure_client(
445445

446446
def create_indexes(self) -> None:
447447
self.checkpoints_index = SearchIndex.from_dict(
448-
self.SCHEMAS[0], redis_client=self._redis
448+
self.checkpoints_schema, redis_client=self._redis
449449
)
450450
# Shallow implementation doesn't use blobs, but base class requires the attribute
451451
self.checkpoint_blobs_index = SearchIndex.from_dict(
452-
self.SCHEMAS[1], redis_client=self._redis
452+
self.blobs_schema, redis_client=self._redis
453453
)
454454
self.checkpoint_writes_index = SearchIndex.from_dict(
455-
self.SCHEMAS[2], redis_client=self._redis
455+
self.writes_schema, redis_client=self._redis
456456
)
457457

458458
def setup(self) -> None:

0 commit comments

Comments
 (0)