Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions langgraph/checkpoint/redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ def list(
combined_filter &= expr

# Construct the Redis query
# Sort by checkpoint_id in descending order to get most recent checkpoints first
query = FilterQuery(
filter_expression=combined_filter,
return_fields=[
Expand All @@ -275,6 +276,7 @@ def list(
"has_writes", # Include has_writes to optimize pending_writes loading
],
num_results=limit or 10000,
sort_by=("checkpoint_id", "DESC"),
)

# Execute the query
Expand Down
2 changes: 2 additions & 0 deletions langgraph/checkpoint/redis/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ async def alist(
combined_filter &= expr

# Construct the Redis query
# Sort by checkpoint_id in descending order to get most recent checkpoints first
query = FilterQuery(
filter_expression=combined_filter,
return_fields=[
Expand All @@ -705,6 +706,7 @@ async def alist(
"has_writes", # Include has_writes to optimize pending_writes loading
],
num_results=limit or 10000,
sort_by=("checkpoint_id", "DESC"),
)

# Execute the query asynchronously
Expand Down
2 changes: 2 additions & 0 deletions langgraph/checkpoint/redis/ashallow.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ async def alist(
for expr in query_filter[1:]:
combined_filter &= expr

# Sort by checkpoint_id in descending order to get most recent checkpoints first
query = FilterQuery(
filter_expression=combined_filter,
return_fields=[
Expand All @@ -380,6 +381,7 @@ async def alist(
"ts",
],
num_results=limit or 100, # Set higher limit to retrieve more results
sort_by=("checkpoint_id", "DESC"),
)

results = await self.checkpoints_index.search(query)
Expand Down
2 changes: 2 additions & 0 deletions langgraph/checkpoint/redis/shallow.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ def list(
combined_filter &= expr

# Get checkpoint data
# Sort by checkpoint_id in descending order to get most recent checkpoints first
query = FilterQuery(
filter_expression=combined_filter,
return_fields=[
Expand All @@ -330,6 +331,7 @@ def list(
"$.metadata",
],
num_results=limit or 10000,
sort_by=("checkpoint_id", "DESC"),
)

# Execute the query
Expand Down
264 changes: 264 additions & 0 deletions tests/test_alist_sort_order.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
"""Test for issue #106: alist should sort by checkpoint ID DESC."""

import asyncio
import time
from typing import AsyncGenerator, Generator

import pytest
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.base import (
Checkpoint,
CheckpointMetadata,
create_checkpoint,
empty_checkpoint,
)
from ulid import ULID

from langgraph.checkpoint.redis import RedisSaver
from langgraph.checkpoint.redis.aio import AsyncRedisSaver


@pytest.fixture
async def async_saver(redis_url: str) -> AsyncGenerator[AsyncRedisSaver, None]:
"""Async saver fixture."""
saver = AsyncRedisSaver(redis_url)
await saver.asetup()
yield saver


@pytest.fixture
def sync_saver(redis_url: str) -> Generator[RedisSaver, None, None]:
"""Sync saver fixture."""
saver = RedisSaver(redis_url)
saver.setup()
yield saver


@pytest.mark.asyncio
async def test_alist_sorts_by_checkpoint_id_desc(async_saver: AsyncRedisSaver) -> None:
"""Test that alist returns checkpoints sorted by checkpoint ID in descending order.

This is a reproducer for issue #106: when listing checkpoints, they should be
sorted by checkpoint ID (which embeds timestamp via ULID) in descending order,
so that the most recent checkpoints appear first. This allows users to efficiently
find crashed/unfinished sessions after restart.
"""
thread_id = "test-thread-sort"
checkpoint_ns = ""

# Create multiple checkpoints with increasing timestamps
# We'll use explicit checkpoint IDs with different timestamps to ensure ordering
checkpoint_ids = []

# Create 5 checkpoints with small delays between them to ensure different timestamps
for i in range(5):
# Create a checkpoint with a unique ULID
checkpoint_id = str(ULID())
checkpoint_ids.append(checkpoint_id)

config: RunnableConfig = {
"configurable": {
"thread_id": thread_id,
"checkpoint_id": checkpoint_id,
"checkpoint_ns": checkpoint_ns,
}
}

checkpoint: Checkpoint = empty_checkpoint()
checkpoint["id"] = checkpoint_id

metadata: CheckpointMetadata = {
"source": "test",
"step": i,
"writes": {},
}

await async_saver.aput(config, checkpoint, metadata, {})

# Small delay to ensure different ULID timestamps
# ULID has millisecond precision, so we need to wait at least 1ms
await asyncio.sleep(0.01)

# Now list all checkpoints for this thread
config: RunnableConfig = {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
}
}

listed_checkpoints = []
async for checkpoint_tuple in async_saver.alist(config):
listed_checkpoints.append(
checkpoint_tuple.config["configurable"]["checkpoint_id"]
)

# Verify we got all checkpoints
assert (
len(listed_checkpoints) == 5
), f"Expected 5 checkpoints, got {len(listed_checkpoints)}"

# Verify they are sorted in descending order (most recent first)
# Since we created them in chronological order, the last one created should be first
# checkpoint_ids[4] should appear first, then checkpoint_ids[3], etc.
expected_order = checkpoint_ids[::-1] # Reverse the list

assert listed_checkpoints == expected_order, (
f"Checkpoints are not sorted in descending order by checkpoint ID.\n"
f"Expected: {expected_order}\n"
f"Got: {listed_checkpoints}"
)


@pytest.mark.asyncio
async def test_alist_sorts_multiple_threads(async_saver: AsyncRedisSaver) -> None:
"""Test that alist sorts correctly when filtering by thread_id."""
# Create checkpoints for two different threads
thread1_ids = []
thread2_ids = []

# Thread 1: Create 3 checkpoints
for i in range(3):
checkpoint_id = str(ULID())
thread1_ids.append(checkpoint_id)

config: RunnableConfig = {
"configurable": {
"thread_id": "thread-1-sort",
"checkpoint_id": checkpoint_id,
"checkpoint_ns": "",
}
}

checkpoint: Checkpoint = empty_checkpoint()
checkpoint["id"] = checkpoint_id

metadata: CheckpointMetadata = {"source": "test", "step": i, "writes": {}}
await async_saver.aput(config, checkpoint, metadata, {})
await asyncio.sleep(0.01)

# Thread 2: Create 3 checkpoints (interleaved with thread 1)
for i in range(3):
checkpoint_id = str(ULID())
thread2_ids.append(checkpoint_id)

config: RunnableConfig = {
"configurable": {
"thread_id": "thread-2-sort",
"checkpoint_id": checkpoint_id,
"checkpoint_ns": "",
}
}

checkpoint: Checkpoint = empty_checkpoint()
checkpoint["id"] = checkpoint_id

metadata: CheckpointMetadata = {"source": "test", "step": i, "writes": {}}
await async_saver.aput(config, checkpoint, metadata, {})
await asyncio.sleep(0.01)

# List checkpoints for thread 1
config1: RunnableConfig = {
"configurable": {
"thread_id": "thread-1-sort",
"checkpoint_ns": "",
}
}

thread1_listed = []
async for checkpoint_tuple in async_saver.alist(config1):
thread1_listed.append(checkpoint_tuple.config["configurable"]["checkpoint_id"])

# Verify thread 1 checkpoints are in descending order
assert thread1_listed == thread1_ids[::-1], (
f"Thread 1 checkpoints not sorted correctly.\n"
f"Expected: {thread1_ids[::-1]}\n"
f"Got: {thread1_listed}"
)

# List checkpoints for thread 2
config2: RunnableConfig = {
"configurable": {
"thread_id": "thread-2-sort",
"checkpoint_ns": "",
}
}

thread2_listed = []
async for checkpoint_tuple in async_saver.alist(config2):
thread2_listed.append(checkpoint_tuple.config["configurable"]["checkpoint_id"])

# Verify thread 2 checkpoints are in descending order
assert thread2_listed == thread2_ids[::-1], (
f"Thread 2 checkpoints not sorted correctly.\n"
f"Expected: {thread2_ids[::-1]}\n"
f"Got: {thread2_listed}"
)


def test_list_sorts_by_checkpoint_id_desc(sync_saver: RedisSaver) -> None:
"""Test that list (sync) returns checkpoints sorted by checkpoint ID in descending order.

This is a sync version of the test for issue #106.
"""
thread_id = "test-thread-sort-sync"
checkpoint_ns = ""

# Create multiple checkpoints with increasing timestamps
checkpoint_ids = []

# Create 5 checkpoints with small delays between them to ensure different timestamps
for i in range(5):
# Create a checkpoint with a unique ULID
checkpoint_id = str(ULID())
checkpoint_ids.append(checkpoint_id)

config: RunnableConfig = {
"configurable": {
"thread_id": thread_id,
"checkpoint_id": checkpoint_id,
"checkpoint_ns": checkpoint_ns,
}
}

checkpoint: Checkpoint = empty_checkpoint()
checkpoint["id"] = checkpoint_id

metadata: CheckpointMetadata = {
"source": "test",
"step": i,
"writes": {},
}

sync_saver.put(config, checkpoint, metadata, {})

# Small delay to ensure different ULID timestamps
time.sleep(0.01)

# Now list all checkpoints for this thread
config: RunnableConfig = {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
}
}

listed_checkpoints = []
for checkpoint_tuple in sync_saver.list(config):
listed_checkpoints.append(
checkpoint_tuple.config["configurable"]["checkpoint_id"]
)

# Verify we got all checkpoints
assert (
len(listed_checkpoints) == 5
), f"Expected 5 checkpoints, got {len(listed_checkpoints)}"

# Verify they are sorted in descending order (most recent first)
expected_order = checkpoint_ids[::-1] # Reverse the list

assert listed_checkpoints == expected_order, (
f"Checkpoints are not sorted in descending order by checkpoint ID.\n"
f"Expected: {expected_order}\n"
f"Got: {listed_checkpoints}"
)
6 changes: 4 additions & 2 deletions tests/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ def test_streaming_values_with_redis_checkpointer(graph_with_redis_checkpointer)
assert len(results) == 11 # 5 iterations x 2 nodes + initial state

# Check state history from the checkpointer
# Note: states are now sorted DESC (newest first), so states[0] is the most recent
states = list(graph_with_redis_checkpointer.get_state_history(thread_config))
assert len(states) > 0
final_state = states[-1]
final_state = states[0] # First item is now the most recent (DESC order)
assert final_state.values["counter"] == 5
assert len(final_state.values["values"]) == 5

Expand Down Expand Up @@ -97,9 +98,10 @@ def test_streaming_updates_with_redis_checkpointer(graph_with_redis_checkpointer
assert "values" in update["values_node"]

# Check state history from the checkpointer
# Note: states are now sorted DESC (newest first), so states[0] is the most recent
states = list(graph_with_redis_checkpointer.get_state_history(thread_config))
assert len(states) > 0
final_state = states[-1]
final_state = states[0] # First item is now the most recent (DESC order)
assert final_state.values["counter"] == 5
assert len(final_state.values["values"]) == 5

Expand Down