Skip to content

Commit 79a4932

Browse files
committed
fix(checkpoint): ensure write_keys_zset uses storage-safe sentinel values (#104)
The make_write_keys_zset_key method was not converting empty checkpoint_ns values to the "__empty__" sentinel, causing a mismatch between keys created during put_writes and keys searched during delete_thread. This resulted in write_keys_zset sorted sets not being cleaned up when delete_thread was called, leaving orphaned keys in Redis. Changes: - Update make_write_keys_zset_key to use to_storage_safe_id and to_storage_safe_str for consistent key generation - Fix key registry initialization to properly initialize after Redis client configuration in setup() method - Add comprehensive integration tests covering sync/async deletion scenarios with empty and non-empty checkpoint namespaces Fixes #104
1 parent e827bf7 commit 79a4932

File tree

3 files changed

+322
-4
lines changed

3 files changed

+322
-4
lines changed

langgraph/checkpoint/redis/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ def __init__(
8484
self._key_cache: Dict[str, str] = {}
8585
self._key_cache_max_size = 1000 # Configurable limit
8686

87-
# Initialize key registry
88-
self._key_registry = SyncCheckpointKeyRegistry(self._redis)
87+
# Key registry will be initialized in setup()
88+
self._key_registry: Optional[SyncCheckpointKeyRegistry] = None
8989

9090
def configure_client(
9191
self,

langgraph/checkpoint/redis/key_registry.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
from redis.asyncio.cluster import RedisCluster as AsyncRedisCluster
1212
from redis.cluster import RedisCluster
1313

14+
from langgraph.checkpoint.redis.util import to_storage_safe_id, to_storage_safe_str
15+
1416
WRITE_KEYS_ZSET_PREFIX = "write_keys_zset"
1517
REDIS_KEY_SEPARATOR = ":"
1618

@@ -22,9 +24,28 @@ class CheckpointKeyRegistry:
2224
def make_write_keys_zset_key(
2325
thread_id: str, checkpoint_ns: str, checkpoint_id: str
2426
) -> str:
25-
"""Create the key for the write keys sorted set for a specific checkpoint."""
27+
"""Create the key for the write keys sorted set for a specific checkpoint.
28+
29+
Args:
30+
thread_id: Thread identifier
31+
checkpoint_ns: Checkpoint namespace (will be converted to storage-safe format)
32+
checkpoint_id: Checkpoint identifier (will be converted to storage-safe format)
33+
34+
Returns:
35+
The Redis key for the write keys sorted set
36+
"""
37+
# Convert empty strings to sentinel values for RediSearch compatibility
38+
safe_thread_id = to_storage_safe_id(thread_id)
39+
safe_checkpoint_ns = to_storage_safe_str(checkpoint_ns)
40+
safe_checkpoint_id = to_storage_safe_id(checkpoint_id)
41+
2642
return REDIS_KEY_SEPARATOR.join(
27-
[WRITE_KEYS_ZSET_PREFIX, thread_id, checkpoint_ns, checkpoint_id]
43+
[
44+
WRITE_KEYS_ZSET_PREFIX,
45+
safe_thread_id,
46+
safe_checkpoint_ns,
47+
safe_checkpoint_id,
48+
]
2849
)
2950

3051

Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
"""Test for issue #104 - delete_thread should clean up write_keys_zset keys.
2+
3+
When delete_thread is called, it should remove all related keys including:
4+
- checkpoint keys
5+
- checkpoint_latest pointers
6+
- blob keys
7+
- write keys
8+
- write_keys_zset (registry) keys
9+
10+
The issue reports that write_keys_zset keys are not being deleted properly.
11+
"""
12+
13+
import pytest
14+
from langchain_core.runnables import RunnableConfig
15+
from langgraph.checkpoint.base import Checkpoint, CheckpointMetadata
16+
17+
from langgraph.checkpoint.redis import RedisSaver
18+
from langgraph.checkpoint.redis.aio import AsyncRedisSaver
19+
from langgraph.checkpoint.redis.util import to_storage_safe_id, to_storage_safe_str
20+
21+
22+
def test_delete_thread_cleans_write_keys_zset(redis_url, client):
23+
"""Test that delete_thread removes write_keys_zset keys created by put_writes."""
24+
with RedisSaver.from_conn_string(redis_url) as checkpointer:
25+
checkpointer.setup()
26+
27+
# Create a checkpoint with writes
28+
thread_id = "test-thread-zset-cleanup"
29+
checkpoint_ns = "" # Empty namespace as reported in issue
30+
config: RunnableConfig = {
31+
"configurable": {
32+
"thread_id": thread_id,
33+
"checkpoint_ns": checkpoint_ns,
34+
"checkpoint_id": "1",
35+
}
36+
}
37+
38+
checkpoint = Checkpoint(
39+
v=1,
40+
id="1",
41+
ts="2024-01-01T00:00:00Z",
42+
channel_values={"messages": ["Test"]},
43+
channel_versions={"messages": "1"},
44+
versions_seen={"agent": {"messages": "1"}},
45+
pending_sends=[],
46+
tasks=[],
47+
)
48+
49+
# Store checkpoint
50+
checkpointer.put(
51+
config=config,
52+
checkpoint=checkpoint,
53+
metadata=CheckpointMetadata(source="input", step=0, writes={}),
54+
new_versions={"messages": "1"},
55+
)
56+
57+
# Add writes which should create write_keys_zset entries
58+
checkpointer.put_writes(
59+
config=config,
60+
writes=[("messages", "Write 1"), ("messages", "Write 2")],
61+
task_id="task-1",
62+
)
63+
64+
# Construct the expected write_keys_zset key
65+
# Format: write_keys_zset:thread_id:checkpoint_ns:checkpoint_id
66+
safe_thread_id = to_storage_safe_id(thread_id)
67+
safe_checkpoint_ns = to_storage_safe_str(checkpoint_ns)
68+
safe_checkpoint_id = to_storage_safe_id("1")
69+
70+
zset_key = f"write_keys_zset:{safe_thread_id}:{safe_checkpoint_ns}:{safe_checkpoint_id}"
71+
72+
# Verify the write_keys_zset key exists
73+
zset_exists_before = client.exists(zset_key)
74+
assert (
75+
zset_exists_before == 1
76+
), f"write_keys_zset key should exist before delete: {zset_key}"
77+
78+
# Get the count of items in the zset
79+
zset_count = client.zcard(zset_key)
80+
assert (
81+
zset_count == 2
82+
), f"write_keys_zset should have 2 entries, got {zset_count}"
83+
84+
# Delete the thread
85+
checkpointer.delete_thread(thread_id)
86+
87+
# Verify checkpoint is deleted
88+
result = checkpointer.get_tuple(config)
89+
assert result is None, "Checkpoint should be deleted"
90+
91+
# Verify write_keys_zset key is also deleted (THIS IS THE BUG)
92+
zset_exists_after = client.exists(zset_key)
93+
assert (
94+
zset_exists_after == 0
95+
), f"write_keys_zset key should be deleted: {zset_key}"
96+
97+
98+
def test_delete_thread_cleans_multiple_write_keys_zsets(redis_url, client):
99+
"""Test delete_thread with multiple checkpoints and namespaces."""
100+
with RedisSaver.from_conn_string(redis_url) as checkpointer:
101+
checkpointer.setup()
102+
103+
thread_id = "test-thread-multi-zset"
104+
105+
# Create checkpoints with different namespaces
106+
checkpoints_data = [
107+
("", "1"),
108+
("", "2"),
109+
("ns1", "3"),
110+
("ns2", "4"),
111+
]
112+
113+
zset_keys = []
114+
115+
for checkpoint_ns, checkpoint_id in checkpoints_data:
116+
config: RunnableConfig = {
117+
"configurable": {
118+
"thread_id": thread_id,
119+
"checkpoint_ns": checkpoint_ns,
120+
"checkpoint_id": checkpoint_id,
121+
}
122+
}
123+
124+
checkpoint = Checkpoint(
125+
v=1,
126+
id=checkpoint_id,
127+
ts=f"2024-01-01T00:00:0{checkpoint_id}Z",
128+
channel_values={"messages": ["Test"]},
129+
channel_versions={"messages": "1"},
130+
versions_seen={"agent": {"messages": "1"}},
131+
pending_sends=[],
132+
tasks=[],
133+
)
134+
135+
checkpointer.put(
136+
config=config,
137+
checkpoint=checkpoint,
138+
metadata=CheckpointMetadata(source="input", step=0, writes={}),
139+
new_versions={"messages": "1"},
140+
)
141+
142+
# Add writes
143+
checkpointer.put_writes(
144+
config=config,
145+
writes=[("messages", f"Write for {checkpoint_id}")],
146+
task_id=f"task-{checkpoint_id}",
147+
)
148+
149+
# Track the zset key
150+
safe_thread_id = to_storage_safe_id(thread_id)
151+
safe_checkpoint_ns = to_storage_safe_str(checkpoint_ns)
152+
safe_checkpoint_id = to_storage_safe_id(checkpoint_id)
153+
zset_key = f"write_keys_zset:{safe_thread_id}:{safe_checkpoint_ns}:{safe_checkpoint_id}"
154+
zset_keys.append(zset_key)
155+
156+
# Verify all zset keys exist
157+
for zset_key in zset_keys:
158+
assert client.exists(zset_key) == 1, f"zset key should exist: {zset_key}"
159+
160+
# Delete the thread
161+
checkpointer.delete_thread(thread_id)
162+
163+
# Verify all zset keys are deleted
164+
for zset_key in zset_keys:
165+
assert (
166+
client.exists(zset_key) == 0
167+
), f"zset key should be deleted: {zset_key}"
168+
169+
170+
@pytest.mark.asyncio
171+
async def test_adelete_thread_cleans_write_keys_zset(redis_url, async_client):
172+
"""Test that adelete_thread removes write_keys_zset keys (async version)."""
173+
async with AsyncRedisSaver.from_conn_string(redis_url) as checkpointer:
174+
# Create a checkpoint with writes
175+
thread_id = "test-thread-zset-cleanup-async"
176+
checkpoint_ns = ""
177+
config: RunnableConfig = {
178+
"configurable": {
179+
"thread_id": thread_id,
180+
"checkpoint_ns": checkpoint_ns,
181+
"checkpoint_id": "1",
182+
}
183+
}
184+
185+
checkpoint = Checkpoint(
186+
v=1,
187+
id="1",
188+
ts="2024-01-01T00:00:00Z",
189+
channel_values={"messages": ["Test"]},
190+
channel_versions={"messages": "1"},
191+
versions_seen={"agent": {"messages": "1"}},
192+
pending_sends=[],
193+
tasks=[],
194+
)
195+
196+
# Store checkpoint
197+
await checkpointer.aput(
198+
config=config,
199+
checkpoint=checkpoint,
200+
metadata=CheckpointMetadata(source="input", step=0, writes={}),
201+
new_versions={"messages": "1"},
202+
)
203+
204+
# Add writes
205+
await checkpointer.aput_writes(
206+
config=config,
207+
writes=[("messages", "Write 1"), ("messages", "Write 2")],
208+
task_id="task-1",
209+
)
210+
211+
# Construct the expected write_keys_zset key
212+
safe_thread_id = to_storage_safe_id(thread_id)
213+
safe_checkpoint_ns = to_storage_safe_str(checkpoint_ns)
214+
safe_checkpoint_id = to_storage_safe_id("1")
215+
216+
zset_key = f"write_keys_zset:{safe_thread_id}:{safe_checkpoint_ns}:{safe_checkpoint_id}"
217+
218+
# Verify the write_keys_zset key exists
219+
zset_exists_before = await async_client.exists(zset_key)
220+
assert (
221+
zset_exists_before == 1
222+
), f"write_keys_zset key should exist before delete: {zset_key}"
223+
224+
# Delete the thread
225+
await checkpointer.adelete_thread(thread_id)
226+
227+
# Verify checkpoint is deleted
228+
result = await checkpointer.aget_tuple(config)
229+
assert result is None
230+
231+
# Verify write_keys_zset key is also deleted
232+
zset_exists_after = await async_client.exists(zset_key)
233+
assert (
234+
zset_exists_after == 0
235+
), f"write_keys_zset key should be deleted: {zset_key}"
236+
237+
238+
def test_delete_thread_with_only_thread_id(redis_url, client):
239+
"""Test the exact scenario from issue #104: only using thread_id."""
240+
with RedisSaver.from_conn_string(redis_url) as checkpointer:
241+
checkpointer.setup()
242+
243+
# User only provides thread_id (as mentioned in the issue)
244+
# checkpoint_ns will default to empty string when not provided
245+
thread_id = "simple-thread-id"
246+
config: RunnableConfig = {
247+
"configurable": {
248+
"thread_id": thread_id,
249+
"checkpoint_ns": "", # Explicitly set to empty string like in the issue
250+
}
251+
}
252+
253+
checkpoint = Checkpoint(
254+
v=1,
255+
id="auto-generated-id",
256+
ts="2024-01-01T00:00:00Z",
257+
channel_values={"messages": ["Test"]},
258+
channel_versions={"messages": "1"},
259+
versions_seen={"agent": {"messages": "1"}},
260+
pending_sends=[],
261+
tasks=[],
262+
)
263+
264+
# Store checkpoint
265+
result_config = checkpointer.put(
266+
config=config,
267+
checkpoint=checkpoint,
268+
metadata=CheckpointMetadata(source="input", step=0, writes={}),
269+
new_versions={"messages": "1"},
270+
)
271+
272+
# Extract the actual checkpoint_id that was used
273+
actual_checkpoint_id = result_config["configurable"]["checkpoint_id"]
274+
actual_checkpoint_ns = result_config["configurable"].get("checkpoint_ns", "")
275+
276+
# Add writes
277+
checkpointer.put_writes(
278+
config=result_config,
279+
writes=[("messages", "Some write")],
280+
task_id="task-1",
281+
)
282+
283+
# Construct the expected write_keys_zset key with empty namespace
284+
safe_thread_id = to_storage_safe_id(thread_id)
285+
safe_checkpoint_ns = to_storage_safe_str(actual_checkpoint_ns)
286+
safe_checkpoint_id = to_storage_safe_id(actual_checkpoint_id)
287+
288+
zset_key = f"write_keys_zset:{safe_thread_id}:{safe_checkpoint_ns}:{safe_checkpoint_id}"
289+
290+
# Verify zset key exists
291+
assert client.exists(zset_key) == 1, f"zset key should exist: {zset_key}"
292+
293+
# Delete using only thread_id (as user does in issue)
294+
checkpointer.delete_thread(thread_id)
295+
296+
# Verify zset key is deleted
297+
assert client.exists(zset_key) == 0, f"zset key should be deleted: {zset_key}"

0 commit comments

Comments
 (0)