Skip to content

Commit 62fd939

Browse files
committed
feat(checkpoint)!: sort list/alist by checkpoint ID descending (#106)
Add sort_by parameter to FilterQuery in all list/alist implementations to return checkpoints in descending order (newest first). This matches SQLite checkpointer behavior and enables efficient lookup of recent/ crashed sessions without fetching all checkpoints into memory. Updated implementations: - AsyncRedisSaver.alist() (aio.py:709) - RedisSaver.list() (__init__.py:279) - AsyncShallowRedisSaver.alist() (ashallow.py:384) - ShallowRedisSaver.list() (shallow.py:334) BREAKING CHANGE: Checkpoints are now returned in DESC order (newest first) instead of ASC order (oldest first). Code that accessed the most recent checkpoint with `list(saver.list(config))[-1]` must now use `list(saver.list(config))[0]`. Fixes #106
1 parent 79a4932 commit 62fd939

File tree

6 files changed

+276
-2
lines changed

6 files changed

+276
-2
lines changed

langgraph/checkpoint/redis/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ def list(
263263
combined_filter &= expr
264264

265265
# Construct the Redis query
266+
# Sort by checkpoint_id in descending order to get most recent checkpoints first
266267
query = FilterQuery(
267268
filter_expression=combined_filter,
268269
return_fields=[
@@ -275,6 +276,7 @@ def list(
275276
"has_writes", # Include has_writes to optimize pending_writes loading
276277
],
277278
num_results=limit or 10000,
279+
sort_by=("checkpoint_id", "DESC"),
278280
)
279281

280282
# Execute the query

langgraph/checkpoint/redis/aio.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,7 @@ async def alist(
693693
combined_filter &= expr
694694

695695
# Construct the Redis query
696+
# Sort by checkpoint_id in descending order to get most recent checkpoints first
696697
query = FilterQuery(
697698
filter_expression=combined_filter,
698699
return_fields=[
@@ -705,6 +706,7 @@ async def alist(
705706
"has_writes", # Include has_writes to optimize pending_writes loading
706707
],
707708
num_results=limit or 10000,
709+
sort_by=("checkpoint_id", "DESC"),
708710
)
709711

710712
# Execute the query asynchronously

langgraph/checkpoint/redis/ashallow.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ async def alist(
368368
for expr in query_filter[1:]:
369369
combined_filter &= expr
370370

371+
# Sort by checkpoint_id in descending order to get most recent checkpoints first
371372
query = FilterQuery(
372373
filter_expression=combined_filter,
373374
return_fields=[
@@ -380,6 +381,7 @@ async def alist(
380381
"ts",
381382
],
382383
num_results=limit or 100, # Set higher limit to retrieve more results
384+
sort_by=("checkpoint_id", "DESC"),
383385
)
384386

385387
results = await self.checkpoints_index.search(query)

langgraph/checkpoint/redis/shallow.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ def list(
321321
combined_filter &= expr
322322

323323
# Get checkpoint data
324+
# Sort by checkpoint_id in descending order to get most recent checkpoints first
324325
query = FilterQuery(
325326
filter_expression=combined_filter,
326327
return_fields=[
@@ -330,6 +331,7 @@ def list(
330331
"$.metadata",
331332
],
332333
num_results=limit or 10000,
334+
sort_by=("checkpoint_id", "DESC"),
333335
)
334336

335337
# Execute the query

tests/test_alist_sort_order.py

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
"""Test for issue #106: alist should sort by checkpoint ID DESC."""
2+
3+
import asyncio
4+
import time
5+
from typing import AsyncGenerator, Generator
6+
7+
import pytest
8+
from langchain_core.runnables import RunnableConfig
9+
from langgraph.checkpoint.base import (
10+
Checkpoint,
11+
CheckpointMetadata,
12+
create_checkpoint,
13+
empty_checkpoint,
14+
)
15+
from ulid import ULID
16+
17+
from langgraph.checkpoint.redis import RedisSaver
18+
from langgraph.checkpoint.redis.aio import AsyncRedisSaver
19+
20+
21+
@pytest.fixture
22+
async def async_saver(redis_url: str) -> AsyncGenerator[AsyncRedisSaver, None]:
23+
"""Async saver fixture."""
24+
saver = AsyncRedisSaver(redis_url)
25+
await saver.asetup()
26+
yield saver
27+
28+
29+
@pytest.fixture
30+
def sync_saver(redis_url: str) -> Generator[RedisSaver, None, None]:
31+
"""Sync saver fixture."""
32+
saver = RedisSaver(redis_url)
33+
saver.setup()
34+
yield saver
35+
36+
37+
@pytest.mark.asyncio
38+
async def test_alist_sorts_by_checkpoint_id_desc(async_saver: AsyncRedisSaver) -> None:
39+
"""Test that alist returns checkpoints sorted by checkpoint ID in descending order.
40+
41+
This is a reproducer for issue #106: when listing checkpoints, they should be
42+
sorted by checkpoint ID (which embeds timestamp via ULID) in descending order,
43+
so that the most recent checkpoints appear first. This allows users to efficiently
44+
find crashed/unfinished sessions after restart.
45+
"""
46+
thread_id = "test-thread-sort"
47+
checkpoint_ns = ""
48+
49+
# Create multiple checkpoints with increasing timestamps
50+
# We'll use explicit checkpoint IDs with different timestamps to ensure ordering
51+
checkpoint_ids = []
52+
53+
# Create 5 checkpoints with small delays between them to ensure different timestamps
54+
for i in range(5):
55+
# Create a checkpoint with a unique ULID
56+
checkpoint_id = str(ULID())
57+
checkpoint_ids.append(checkpoint_id)
58+
59+
config: RunnableConfig = {
60+
"configurable": {
61+
"thread_id": thread_id,
62+
"checkpoint_id": checkpoint_id,
63+
"checkpoint_ns": checkpoint_ns,
64+
}
65+
}
66+
67+
checkpoint: Checkpoint = empty_checkpoint()
68+
checkpoint["id"] = checkpoint_id
69+
70+
metadata: CheckpointMetadata = {
71+
"source": "test",
72+
"step": i,
73+
"writes": {},
74+
}
75+
76+
await async_saver.aput(config, checkpoint, metadata, {})
77+
78+
# Small delay to ensure different ULID timestamps
79+
# ULID has millisecond precision, so we need to wait at least 1ms
80+
await asyncio.sleep(0.01)
81+
82+
# Now list all checkpoints for this thread
83+
config: RunnableConfig = {
84+
"configurable": {
85+
"thread_id": thread_id,
86+
"checkpoint_ns": checkpoint_ns,
87+
}
88+
}
89+
90+
listed_checkpoints = []
91+
async for checkpoint_tuple in async_saver.alist(config):
92+
listed_checkpoints.append(
93+
checkpoint_tuple.config["configurable"]["checkpoint_id"]
94+
)
95+
96+
# Verify we got all checkpoints
97+
assert (
98+
len(listed_checkpoints) == 5
99+
), f"Expected 5 checkpoints, got {len(listed_checkpoints)}"
100+
101+
# Verify they are sorted in descending order (most recent first)
102+
# Since we created them in chronological order, the last one created should be first
103+
# checkpoint_ids[4] should appear first, then checkpoint_ids[3], etc.
104+
expected_order = checkpoint_ids[::-1] # Reverse the list
105+
106+
assert listed_checkpoints == expected_order, (
107+
f"Checkpoints are not sorted in descending order by checkpoint ID.\n"
108+
f"Expected: {expected_order}\n"
109+
f"Got: {listed_checkpoints}"
110+
)
111+
112+
113+
@pytest.mark.asyncio
114+
async def test_alist_sorts_multiple_threads(async_saver: AsyncRedisSaver) -> None:
115+
"""Test that alist sorts correctly when filtering by thread_id."""
116+
# Create checkpoints for two different threads
117+
thread1_ids = []
118+
thread2_ids = []
119+
120+
# Thread 1: Create 3 checkpoints
121+
for i in range(3):
122+
checkpoint_id = str(ULID())
123+
thread1_ids.append(checkpoint_id)
124+
125+
config: RunnableConfig = {
126+
"configurable": {
127+
"thread_id": "thread-1-sort",
128+
"checkpoint_id": checkpoint_id,
129+
"checkpoint_ns": "",
130+
}
131+
}
132+
133+
checkpoint: Checkpoint = empty_checkpoint()
134+
checkpoint["id"] = checkpoint_id
135+
136+
metadata: CheckpointMetadata = {"source": "test", "step": i, "writes": {}}
137+
await async_saver.aput(config, checkpoint, metadata, {})
138+
await asyncio.sleep(0.01)
139+
140+
# Thread 2: Create 3 checkpoints (interleaved with thread 1)
141+
for i in range(3):
142+
checkpoint_id = str(ULID())
143+
thread2_ids.append(checkpoint_id)
144+
145+
config: RunnableConfig = {
146+
"configurable": {
147+
"thread_id": "thread-2-sort",
148+
"checkpoint_id": checkpoint_id,
149+
"checkpoint_ns": "",
150+
}
151+
}
152+
153+
checkpoint: Checkpoint = empty_checkpoint()
154+
checkpoint["id"] = checkpoint_id
155+
156+
metadata: CheckpointMetadata = {"source": "test", "step": i, "writes": {}}
157+
await async_saver.aput(config, checkpoint, metadata, {})
158+
await asyncio.sleep(0.01)
159+
160+
# List checkpoints for thread 1
161+
config1: RunnableConfig = {
162+
"configurable": {
163+
"thread_id": "thread-1-sort",
164+
"checkpoint_ns": "",
165+
}
166+
}
167+
168+
thread1_listed = []
169+
async for checkpoint_tuple in async_saver.alist(config1):
170+
thread1_listed.append(checkpoint_tuple.config["configurable"]["checkpoint_id"])
171+
172+
# Verify thread 1 checkpoints are in descending order
173+
assert thread1_listed == thread1_ids[::-1], (
174+
f"Thread 1 checkpoints not sorted correctly.\n"
175+
f"Expected: {thread1_ids[::-1]}\n"
176+
f"Got: {thread1_listed}"
177+
)
178+
179+
# List checkpoints for thread 2
180+
config2: RunnableConfig = {
181+
"configurable": {
182+
"thread_id": "thread-2-sort",
183+
"checkpoint_ns": "",
184+
}
185+
}
186+
187+
thread2_listed = []
188+
async for checkpoint_tuple in async_saver.alist(config2):
189+
thread2_listed.append(checkpoint_tuple.config["configurable"]["checkpoint_id"])
190+
191+
# Verify thread 2 checkpoints are in descending order
192+
assert thread2_listed == thread2_ids[::-1], (
193+
f"Thread 2 checkpoints not sorted correctly.\n"
194+
f"Expected: {thread2_ids[::-1]}\n"
195+
f"Got: {thread2_listed}"
196+
)
197+
198+
199+
def test_list_sorts_by_checkpoint_id_desc(sync_saver: RedisSaver) -> None:
200+
"""Test that list (sync) returns checkpoints sorted by checkpoint ID in descending order.
201+
202+
This is a sync version of the test for issue #106.
203+
"""
204+
thread_id = "test-thread-sort-sync"
205+
checkpoint_ns = ""
206+
207+
# Create multiple checkpoints with increasing timestamps
208+
checkpoint_ids = []
209+
210+
# Create 5 checkpoints with small delays between them to ensure different timestamps
211+
for i in range(5):
212+
# Create a checkpoint with a unique ULID
213+
checkpoint_id = str(ULID())
214+
checkpoint_ids.append(checkpoint_id)
215+
216+
config: RunnableConfig = {
217+
"configurable": {
218+
"thread_id": thread_id,
219+
"checkpoint_id": checkpoint_id,
220+
"checkpoint_ns": checkpoint_ns,
221+
}
222+
}
223+
224+
checkpoint: Checkpoint = empty_checkpoint()
225+
checkpoint["id"] = checkpoint_id
226+
227+
metadata: CheckpointMetadata = {
228+
"source": "test",
229+
"step": i,
230+
"writes": {},
231+
}
232+
233+
sync_saver.put(config, checkpoint, metadata, {})
234+
235+
# Small delay to ensure different ULID timestamps
236+
time.sleep(0.01)
237+
238+
# Now list all checkpoints for this thread
239+
config: RunnableConfig = {
240+
"configurable": {
241+
"thread_id": thread_id,
242+
"checkpoint_ns": checkpoint_ns,
243+
}
244+
}
245+
246+
listed_checkpoints = []
247+
for checkpoint_tuple in sync_saver.list(config):
248+
listed_checkpoints.append(
249+
checkpoint_tuple.config["configurable"]["checkpoint_id"]
250+
)
251+
252+
# Verify we got all checkpoints
253+
assert (
254+
len(listed_checkpoints) == 5
255+
), f"Expected 5 checkpoints, got {len(listed_checkpoints)}"
256+
257+
# Verify they are sorted in descending order (most recent first)
258+
expected_order = checkpoint_ids[::-1] # Reverse the list
259+
260+
assert listed_checkpoints == expected_order, (
261+
f"Checkpoints are not sorted in descending order by checkpoint ID.\n"
262+
f"Expected: {expected_order}\n"
263+
f"Got: {listed_checkpoints}"
264+
)

tests/test_streaming.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,10 @@ def test_streaming_values_with_redis_checkpointer(graph_with_redis_checkpointer)
6565
assert len(results) == 11 # 5 iterations x 2 nodes + initial state
6666

6767
# Check state history from the checkpointer
68+
# Note: states are now sorted DESC (newest first), so states[0] is the most recent
6869
states = list(graph_with_redis_checkpointer.get_state_history(thread_config))
6970
assert len(states) > 0
70-
final_state = states[-1]
71+
final_state = states[0] # First item is now the most recent (DESC order)
7172
assert final_state.values["counter"] == 5
7273
assert len(final_state.values["values"]) == 5
7374

@@ -97,9 +98,10 @@ def test_streaming_updates_with_redis_checkpointer(graph_with_redis_checkpointer
9798
assert "values" in update["values_node"]
9899

99100
# Check state history from the checkpointer
101+
# Note: states are now sorted DESC (newest first), so states[0] is the most recent
100102
states = list(graph_with_redis_checkpointer.get_state_history(thread_config))
101103
assert len(states) > 0
102-
final_state = states[-1]
104+
final_state = states[0] # First item is now the most recent (DESC order)
103105
assert final_state.values["counter"] == 5
104106
assert len(final_state.values["values"]) == 5
105107

0 commit comments

Comments
 (0)