diff --git a/.gitignore b/.gitignore index 5fdbb05..2802026 100644 --- a/.gitignore +++ b/.gitignore @@ -221,3 +221,4 @@ libs/redis/docs/.Trash* .python-version .idea/* examples/.Trash* +.claude diff --git a/CLAUDE.md b/CLAUDE.md index df3c001..02d3d68 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -5,30 +5,34 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## Development Commands ### Setup and Dependencies + ```bash -make install # Install all dependencies with poetry -make redis-start # Start Redis Stack container (includes RedisJSON and RediSearch) -make redis-stop # Stop Redis container +poetry install --all-extras # Install all dependencies with poetry (from README) +make redis-start # Start Redis Stack container (includes RedisJSON and RediSearch) +make redis-stop # Stop Redis container ``` ### Testing + ```bash -make test # Run tests with verbose output -make test-all # Run all tests including API tests +make test # Run tests with verbose output +make test-all # Run all tests including API tests pytest tests/test_specific.py # Run specific test file pytest tests/test_specific.py::test_function # Run specific test -pytest --run-api-tests # Include API integration tests +pytest --run-api-tests # Include API integration tests ``` ### Code Quality + ```bash -make lint # Format code and run type checking make format # Format code with black and isort +make lint # Run formatting, type checking, and other linters make check-types # Run mypy type checking make check # Run both linting and tests ``` ### Development + ```bash make clean # Remove cache and build artifacts ``` @@ -38,12 +42,14 @@ make clean # Remove cache and build artifacts ### Core Components **Checkpoint Savers** (`langgraph/checkpoint/redis/`): + - `base.py`: `BaseRedisSaver` - Abstract base class with shared Redis operations, schemas, and TTL management -- `__init__.py`: `RedisSaver` - Standard sync implementation +- `__init__.py`: `RedisSaver` - Standard sync implementation - `aio.py`: `AsyncRedisSaver` - Async implementation - `shallow.py` / `ashallow.py`: Shallow variants that store only latest checkpoint **Stores** (`langgraph/store/redis/`): + - `base.py`: `BaseRedisStore` - Abstract base with Redis operations, vector search, and TTL support - `__init__.py`: `RedisStore` - Sync store with key-value and vector search - `aio.py`: `AsyncRedisStore` - Async store implementation @@ -63,6 +69,7 @@ make clean # Remove cache and build artifacts **Type System**: Heavy use of generics (`BaseRedisSaver[RedisClientType, IndexType]`) to maintain type safety across sync/async variants while sharing implementation code. ### Redis Key Patterns + - Checkpoints: `checkpoint:{thread_id}:{namespace}:{checkpoint_id}` - Checkpoint blobs: `checkpoint_blob:{thread_id}:{namespace}:{channel}:{version}` - Checkpoint writes: `checkpoint_write:{thread_id}:{namespace}:{checkpoint_id}:{task_id}` @@ -70,7 +77,9 @@ make clean # Remove cache and build artifacts - Store vectors: `store_vectors:{uuid}` ### Testing Strategy + Tests are organized by functionality: + - `test_sync.py` / `test_async.py`: Core checkpoint functionality - `test_store.py` / `test_async_store.py`: Store operations - `test_cluster_mode.py`: Redis Cluster specific tests @@ -79,7 +88,8 @@ Tests are organized by functionality: - `test_semantic_search_*.py`: Vector search capabilities ### Important Dependencies + - Requires Redis with RedisJSON and RediSearch modules - Uses `redisvl` for vector operations and search index management - Uses `python-ulid` for unique document IDs -- Integrates with LangGraph's checkpoint and store base classes \ No newline at end of file +- Integrates with LangGraph's checkpoint and store base classes diff --git a/langgraph/checkpoint/redis/aio.py b/langgraph/checkpoint/redis/aio.py index df0178c..316ebe0 100644 --- a/langgraph/checkpoint/redis/aio.py +++ b/langgraph/checkpoint/redis/aio.py @@ -1003,7 +1003,7 @@ async def _aload_pending_sends( num_results=100, ) res = await self.checkpoint_writes_index.search(parent_writes_query) - + # Sort results for deterministic order docs = sorted( res.docs, @@ -1013,7 +1013,7 @@ async def _aload_pending_sends( getattr(d, "idx", 0), ), ) - + # Convert to expected format return [ (d.type.encode(), blob) diff --git a/langgraph/checkpoint/redis/ashallow.py b/langgraph/checkpoint/redis/ashallow.py index 58548c5..ec87515 100644 --- a/langgraph/checkpoint/redis/ashallow.py +++ b/langgraph/checkpoint/redis/ashallow.py @@ -34,7 +34,11 @@ REDIS_KEY_SEPARATOR, BaseRedisSaver, ) -from langgraph.checkpoint.redis.util import safely_decode +from langgraph.checkpoint.redis.util import ( + safely_decode, + to_storage_safe_id, + to_storage_safe_str, +) SCHEMAS = [ { @@ -812,7 +816,13 @@ def _make_shallow_redis_checkpoint_blob_key_pattern( ) -> str: """Create a pattern to match all blob keys for a thread and namespace.""" return ( - REDIS_KEY_SEPARATOR.join([CHECKPOINT_BLOB_PREFIX, thread_id, checkpoint_ns]) + REDIS_KEY_SEPARATOR.join( + [ + CHECKPOINT_BLOB_PREFIX, + str(to_storage_safe_id(thread_id)), + to_storage_safe_str(checkpoint_ns), + ] + ) + ":*" ) @@ -823,7 +833,11 @@ def _make_shallow_redis_checkpoint_writes_key_pattern( """Create a pattern to match all writes keys for a thread and namespace.""" return ( REDIS_KEY_SEPARATOR.join( - [CHECKPOINT_WRITE_PREFIX, thread_id, checkpoint_ns] + [ + CHECKPOINT_WRITE_PREFIX, + str(to_storage_safe_id(thread_id)), + to_storage_safe_str(checkpoint_ns), + ] ) + ":*" ) diff --git a/langgraph/checkpoint/redis/shallow.py b/langgraph/checkpoint/redis/shallow.py index 5190729..4aad031 100644 --- a/langgraph/checkpoint/redis/shallow.py +++ b/langgraph/checkpoint/redis/shallow.py @@ -26,7 +26,11 @@ REDIS_KEY_SEPARATOR, BaseRedisSaver, ) -from langgraph.checkpoint.redis.util import safely_decode +from langgraph.checkpoint.redis.util import ( + safely_decode, + to_storage_safe_id, + to_storage_safe_str, +) SCHEMAS = [ { @@ -713,7 +717,13 @@ def _make_shallow_redis_checkpoint_blob_key_pattern( ) -> str: """Create a pattern to match all blob keys for a thread and namespace.""" return ( - REDIS_KEY_SEPARATOR.join([CHECKPOINT_BLOB_PREFIX, thread_id, checkpoint_ns]) + REDIS_KEY_SEPARATOR.join( + [ + CHECKPOINT_BLOB_PREFIX, + str(to_storage_safe_id(thread_id)), + to_storage_safe_str(checkpoint_ns), + ] + ) + ":*" ) @@ -724,7 +734,11 @@ def _make_shallow_redis_checkpoint_writes_key_pattern( """Create a pattern to match all writes keys for a thread and namespace.""" return ( REDIS_KEY_SEPARATOR.join( - [CHECKPOINT_WRITE_PREFIX, thread_id, checkpoint_ns] + [ + CHECKPOINT_WRITE_PREFIX, + str(to_storage_safe_id(thread_id)), + to_storage_safe_str(checkpoint_ns), + ] ) + ":*" ) diff --git a/tests/test_shallow_sync.py b/tests/test_shallow_sync.py index ef40076..46cbeae 100644 --- a/tests/test_shallow_sync.py +++ b/tests/test_shallow_sync.py @@ -348,3 +348,283 @@ def mock_echo(self, message): assert echo_called, "echo was not called as fallback with our library name" finally: client.close() + + +def test_key_generation_inconsistency(redis_url: str) -> None: + """Test for Key generation consistency between base and shallow savers. + + This test verifies that both the base class and shallow saver cleanup patterns + use the same storage-safe transformations, ensuring cleanup works correctly. + """ + from langgraph.checkpoint.redis.base import ( + CHECKPOINT_BLOB_PREFIX, + CHECKPOINT_WRITE_PREFIX, + BaseRedisSaver, + ) + + thread_id = "test_thread" + checkpoint_ns = "" # Empty namespace - the problematic case + channel = "test_channel" + version = "1" + + # Test blob key generation + base_blob_key = BaseRedisSaver._make_redis_checkpoint_blob_key( + thread_id, checkpoint_ns, channel, version + ) + shallow_blob_pattern = ( + ShallowRedisSaver._make_shallow_redis_checkpoint_blob_key_pattern( + thread_id, checkpoint_ns + ) + ) + + # The base key uses storage-safe transformations + expected_base_key = f"{CHECKPOINT_BLOB_PREFIX}:test_thread:__empty__:test_channel:1" + assert base_blob_key == expected_base_key + + # The shallow pattern now uses storage-safe transformations (fixed!) + expected_pattern = f"{CHECKPOINT_BLOB_PREFIX}:test_thread:__empty__:*" + assert shallow_blob_pattern == expected_pattern + + # Both base key and pattern now consistently use "__empty__" (fix confirmed!) + assert "__empty__" in base_blob_key + assert "__empty__" in shallow_blob_pattern + + # Test writes key generation + checkpoint_id = "test_checkpoint" + task_id = "test_task" + + base_writes_key = BaseRedisSaver._make_redis_checkpoint_writes_key( + thread_id, checkpoint_ns, checkpoint_id, task_id, 0 + ) + shallow_writes_pattern = ( + ShallowRedisSaver._make_shallow_redis_checkpoint_writes_key_pattern( + thread_id, checkpoint_ns + ) + ) + + # The base key uses storage-safe transformations + expected_base_key = ( + f"{CHECKPOINT_WRITE_PREFIX}:test_thread:__empty__:test_checkpoint:test_task:0" + ) + assert base_writes_key == expected_base_key + + # The shallow pattern now uses storage-safe transformations (fixed!) + expected_pattern = f"{CHECKPOINT_WRITE_PREFIX}:test_thread:__empty__:*" + assert shallow_writes_pattern == expected_pattern + + # Both base key and pattern now consistently use "__empty__" (fix confirmed!) + assert "__empty__" in base_writes_key + assert "__empty__" in shallow_writes_pattern + + +def test_incomplete_blob_cleanup(redis_url: str) -> None: + """Test for Complete cleanup of blobs in ShallowRedisSaver. + + This test verifies that old blob versions are properly cleaned up + when putting new checkpoints, now that key generation is consistent. + """ + import uuid + + from langgraph.checkpoint.redis.base import CHECKPOINT_BLOB_PREFIX + + with _saver(redis_url) as saver: + # Create test data + thread_id = f"test_thread_{uuid.uuid4()}" + checkpoint_ns = "" # Empty namespace - problematic case + checkpoint_id1 = str(uuid.uuid4()) + checkpoint_id2 = str(uuid.uuid4()) + + config1: RunnableConfig = { + "configurable": { + "thread_id": thread_id, + "checkpoint_ns": checkpoint_ns, + "checkpoint_id": checkpoint_id1, + } + } + + config2: RunnableConfig = { + "configurable": { + "thread_id": thread_id, + "checkpoint_ns": checkpoint_ns, + "checkpoint_id": checkpoint_id2, + } + } + + # Create first checkpoint + checkpoint1 = { + "v": 1, + "ts": "2023-01-01T00:00:00Z", + "id": checkpoint_id1, + "channel_values": { + "test_channel": ["test_value"], + "another_channel": {"key": "value"}, + }, + "channel_versions": {"test_channel": "1", "another_channel": "2"}, + "versions_seen": {}, + "pending_sends": [], + } + metadata1: CheckpointMetadata = {"source": "input", "step": 1} + versions1 = {"test_channel": "1", "another_channel": "2"} + + saver.put(config1, checkpoint1, metadata1, versions1) + + # Check keys after first checkpoint + redis_client = Redis.from_url(redis_url, decode_responses=True) + try: + all_keys = redis_client.keys("*") + test_keys = [k for k in all_keys if thread_id in k] + blob_keys_after_first = [ + k for k in test_keys if k.startswith(CHECKPOINT_BLOB_PREFIX) + ] + + # Should have 2 blob keys + assert len(blob_keys_after_first) == 2 + + # Create second checkpoint with different channel versions + checkpoint2 = { + "v": 1, + "ts": "2023-01-01T00:00:01Z", + "id": checkpoint_id2, + "channel_values": { + "test_channel": ["test_value_new"], + "another_channel": {"key": "value_new"}, + }, + "channel_versions": {"test_channel": "3", "another_channel": "4"}, + "versions_seen": {}, + "pending_sends": [], + } + metadata2: CheckpointMetadata = {"source": "loop", "step": 2} + versions2 = { + "test_channel": "3", + "another_channel": "4", + } # Different versions + + saver.put(config2, checkpoint2, metadata2, versions2) + + # Check keys after second checkpoint + all_keys = redis_client.keys("*") + test_keys = [k for k in all_keys if thread_id in k] + blob_keys_after_second = [ + k for k in test_keys if k.startswith(CHECKPOINT_BLOB_PREFIX) + ] + + # This demonstrates the bug: we should only have 2 blob keys (latest versions) + # but we have 4 because cleanup didn't work due to key generation inconsistency + print(f"Blob keys after first put: {len(blob_keys_after_first)}") + print(f"Blob keys after second put: {len(blob_keys_after_second)}") + print("Blob keys after second put:") + for key in sorted(blob_keys_after_second): + print(f" {key}") + + # This should pass when the bug is fixed - we should only have latest blob versions + assert ( + len(blob_keys_after_second) == 2 + ), f"Bug: old blob versions not cleaned up due to key generation mismatch. Expected 2, got {len(blob_keys_after_second)}" + + finally: + redis_client.close() + + +def test_pr37_incomplete_writes_cleanup(redis_url: str) -> None: + """Test for PR #37: Complete cleanup of writes in ShallowRedisSaver. + + This test verifies that old writes are properly cleaned up + when putting new writes, now that key generation is consistent. + """ + import uuid + + from langgraph.checkpoint.redis.base import CHECKPOINT_WRITE_PREFIX + + with _saver(redis_url) as saver: + # Create test data + thread_id = f"test_thread_{uuid.uuid4()}" + checkpoint_ns = "" # Empty namespace - problematic case + checkpoint_id1 = str(uuid.uuid4()) + checkpoint_id2 = str(uuid.uuid4()) + + config1: RunnableConfig = { + "configurable": { + "thread_id": thread_id, + "checkpoint_ns": checkpoint_ns, + "checkpoint_id": checkpoint_id1, + } + } + + config2: RunnableConfig = { + "configurable": { + "thread_id": thread_id, + "checkpoint_ns": checkpoint_ns, + "checkpoint_id": checkpoint_id2, + } + } + + # Create first checkpoint + checkpoint1 = { + "v": 1, + "ts": "2023-01-01T00:00:00Z", + "id": checkpoint_id1, + "channel_values": {}, + "channel_versions": {}, + "versions_seen": {}, + "pending_sends": [], + } + metadata1: CheckpointMetadata = {"source": "input", "step": 1} + + saver.put(config1, checkpoint1, metadata1, {}) + + # Add writes for first checkpoint + writes1 = [("channel1", "value1"), ("channel2", "value2")] + saver.put_writes(config1, writes1, "task1") + + # Check writes after first checkpoint + redis_client = Redis.from_url(redis_url, decode_responses=True) + try: + all_keys = redis_client.keys("*") + test_keys = [k for k in all_keys if thread_id in k] + write_keys_after_first = [ + k for k in test_keys if k.startswith(CHECKPOINT_WRITE_PREFIX) + ] + + # Should have 2 write keys + assert len(write_keys_after_first) == 2 + + # Create second checkpoint + checkpoint2 = { + "v": 1, + "ts": "2023-01-01T00:00:01Z", + "id": checkpoint_id2, + "channel_values": {}, + "channel_versions": {}, + "versions_seen": {}, + "pending_sends": [], + } + metadata2: CheckpointMetadata = {"source": "loop", "step": 2} + + saver.put(config2, checkpoint2, metadata2, {}) + + # Add writes for second checkpoint + writes2 = [("channel3", "value3"), ("channel4", "value4")] + saver.put_writes(config2, writes2, "task2") + + # Check writes after second checkpoint + all_keys = redis_client.keys("*") + test_keys = [k for k in all_keys if thread_id in k] + write_keys_after_second = [ + k for k in test_keys if k.startswith(CHECKPOINT_WRITE_PREFIX) + ] + + print(f"Write keys after first checkpoint: {len(write_keys_after_first)}") + print(f"Write keys after second checkpoint: {len(write_keys_after_second)}") + print("Write keys after second checkpoint:") + for key in sorted(write_keys_after_second): + print(f" {key}") + + # In a proper shallow implementation, old writes for different checkpoints + # should be cleaned up. This test verifies the cleanup works correctly. + # We expect only the writes from the second checkpoint (2 writes) + assert ( + len(write_keys_after_second) == 2 + ), f"Bug: old write keys not cleaned up properly. Expected 2, got {len(write_keys_after_second)}" + + finally: + redis_client.close()