diff --git a/langgraph/checkpoint/redis/ashallow.py b/langgraph/checkpoint/redis/ashallow.py index bd55658..32e8bc5 100644 --- a/langgraph/checkpoint/redis/ashallow.py +++ b/langgraph/checkpoint/redis/ashallow.py @@ -646,12 +646,11 @@ async def _aload_pending_sends( # Extract type and blob pairs # Handle both direct attribute access and JSON path access + # Filter out documents where blob is None (similar to AsyncRedisSaver in aio.py) return [ - ( - getattr(doc, "type", ""), - getattr(doc, "$.blob", getattr(doc, "blob", b"")), - ) + (getattr(doc, "type", ""), blob) for doc in sorted_writes + if (blob := getattr(doc, "$.blob", getattr(doc, "blob", None))) is not None ] async def _aload_pending_writes( diff --git a/langgraph/checkpoint/redis/shallow.py b/langgraph/checkpoint/redis/shallow.py index 0d4d0cc..396dc14 100644 --- a/langgraph/checkpoint/redis/shallow.py +++ b/langgraph/checkpoint/redis/shallow.py @@ -675,12 +675,11 @@ def _load_pending_sends( # Extract type and blob pairs # Handle both direct attribute access and JSON path access + # Filter out documents where blob is None (similar to RedisSaver in __init__.py) return [ - ( - getattr(doc, "type", ""), - getattr(doc, "$.blob", getattr(doc, "blob", b"")), - ) + (getattr(doc, "type", ""), blob) for doc in sorted_writes + if (blob := getattr(doc, "$.blob", getattr(doc, "blob", None))) is not None ] def _make_shallow_redis_checkpoint_key_cached( diff --git a/tests/test_blob_encoding_error_handling.py b/tests/test_blob_encoding_error_handling.py index 060c9ae..9f909bd 100644 --- a/tests/test_blob_encoding_error_handling.py +++ b/tests/test_blob_encoding_error_handling.py @@ -248,6 +248,77 @@ def test_load_writes_empty_cases(redis_url: str) -> None: assert len(result2.pending_writes) == 0 +def test_empty_blob_in_pending_sends(redis_url: str) -> None: + """Test handling of empty byte string (b"") in pending sends as per PR #82 review suggestion.""" + with _saver(redis_url) as saver: + thread_id = str(uuid4()) + + # Create a checkpoint + config: RunnableConfig = { + "configurable": { + "thread_id": thread_id, + "checkpoint_ns": "", + "checkpoint_id": "test-checkpoint", + } + } + + checkpoint = create_checkpoint( + checkpoint=empty_checkpoint(), + channels={}, + step=1, + ) + + saved_config = saver.put( + config, checkpoint, {"source": "test", "step": 1, "writes": {}}, {} + ) + + # Test 1: Write with empty byte string blob + from langgraph.constants import TASKS + + # This tests the edge case where blob is b"" + empty_blob_data = b"" + saver.put_writes( + saved_config, + [(TASKS, empty_blob_data)], # Empty blob + task_id="empty_blob_task", + ) + + # Test 2: Write with None-like values that should be handled gracefully + none_like_values = [ + ("channel1", ""), # Empty string + ("channel2", b""), # Empty bytes + ("channel3", {}), # Empty dict + ("channel4", []), # Empty list + ] + + saver.put_writes( + saved_config, + none_like_values, + task_id="empty_values_task", + ) + + # Retrieve and verify all writes are handled correctly + result = saver.get_tuple(saved_config) + assert result is not None + + # Check pending writes + pending_writes = result.pending_writes + + # Find our writes + empty_blob_writes = [w for w in pending_writes if w[0] == "empty_blob_task"] + empty_values_writes = [w for w in pending_writes if w[0] == "empty_values_task"] + + # Verify empty blob is handled correctly + assert len(empty_blob_writes) == 1 + assert empty_blob_writes[0][1] == TASKS + assert empty_blob_writes[0][2] == b"" # Should preserve empty bytes + + # Verify other empty values are handled + assert len(empty_values_writes) == 4 + for write in empty_values_writes: + assert write[2] is not None # Should not be None even for empty values + + def test_checkpoint_with_special_characters(redis_url: str) -> None: """Test handling of special characters and null bytes in data. diff --git a/tests/test_shallow_blob_attribute.py b/tests/test_shallow_blob_attribute.py new file mode 100644 index 0000000..a02456d --- /dev/null +++ b/tests/test_shallow_blob_attribute.py @@ -0,0 +1,391 @@ +"""Test for handling missing blob attribute in shallow checkpoint savers. + +This test verifies the fix for GitHub issue #80 where AsyncShallowRedisSaver +and ShallowRedisSaver would raise AttributeError when encountering documents +without a blob attribute. +""" + +import asyncio +from typing import Any, Dict +from unittest.mock import MagicMock, patch + +import pytest +from langchain_core.runnables import RunnableConfig +from langgraph.checkpoint.base import CheckpointMetadata +from langgraph.constants import TASKS +from testcontainers.redis import RedisContainer + +from langgraph.checkpoint.redis.ashallow import AsyncShallowRedisSaver +from langgraph.checkpoint.redis.shallow import ShallowRedisSaver + + +class MockDocument: + """Mock document that simulates missing blob attribute.""" + + def __init__(self, data: Dict[str, Any]): + self.type = data.get("type", "") + self.task_path = data.get("task_path", "") + self.task_id = data.get("task_id", "") + self.idx = data.get("idx", 0) + # Intentionally NOT setting blob attribute to simulate the issue + # In some cases, we'll set it to None to test that case too + if "blob" in data: + self.blob = data["blob"] + + +@pytest.fixture +def redis_url(): + """Create a Redis container for testing.""" + redis_container = RedisContainer("redis:8") + redis_container.start() + yield f"redis://{redis_container.get_container_host_ip()}:{redis_container.get_exposed_port(6379)}" + redis_container.stop() + + +@pytest.mark.asyncio +async def test_async_shallow_missing_blob_attribute(redis_url: str) -> None: + """Test that AsyncShallowRedisSaver handles documents without blob attribute.""" + async with AsyncShallowRedisSaver(redis_url=redis_url) as saver: + # Create mock search result with documents missing blob attribute + mock_search_result = MagicMock() + mock_search_result.docs = [ + MockDocument( + { + "type": "test_type1", + "task_path": "path1", + "task_id": "task1", + "idx": 0, + } + ), + MockDocument( + { + "type": "test_type2", + "task_path": "path2", + "task_id": "task2", + "idx": 1, + } + ), + MockDocument( + { + "type": "test_type3", + "task_path": "path3", + "task_id": "task3", + "idx": 2, + "blob": None, + } + ), + ] + + # Mock the search method + original_search = saver.checkpoint_writes_index.search + + async def mock_search(_): + return mock_search_result + + saver.checkpoint_writes_index.search = mock_search + + try: + # This should NOT raise AttributeError with the fix + result = await saver._aload_pending_sends("test_thread", "test_ns") + + # Result should be empty list since all docs have missing or None blob + assert ( + result == [] + ), f"Expected empty list when blob is missing/None, got {result}" + + finally: + # Restore original search method + saver.checkpoint_writes_index.search = original_search + + +@pytest.mark.asyncio +async def test_async_shallow_with_valid_blob(redis_url: str) -> None: + """Test that AsyncShallowRedisSaver correctly processes documents with valid blob.""" + async with AsyncShallowRedisSaver(redis_url=redis_url) as saver: + # Create mock search result with documents having valid blob data + mock_search_result = MagicMock() + mock_search_result.docs = [ + MockDocument( + { + "type": "test_type1", + "task_path": "path1", + "task_id": "task1", + "idx": 0, + "blob": b"data1", + } + ), + MockDocument( + { + "type": "test_type2", + "task_path": "path2", + "task_id": "task2", + "idx": 1, + "blob": b"data2", + } + ), + MockDocument( + { + "type": "test_type3", + "task_path": "path3", + "task_id": "task3", + "idx": 2, + "blob": None, + } + ), + MockDocument( + { + "type": "test_type4", + "task_path": "path4", + "task_id": "task4", + "idx": 3, + } + ), # Missing blob + ] + + # Mock the search method + original_search = saver.checkpoint_writes_index.search + + async def mock_search(_): + return mock_search_result + + saver.checkpoint_writes_index.search = mock_search + + try: + result = await saver._aload_pending_sends("test_thread", "test_ns") + + # Should only include documents with valid (non-None) blob + assert ( + len(result) == 2 + ), f"Expected 2 results with valid blobs, got {len(result)}" + assert result[0] == ("test_type1", b"data1") + assert result[1] == ("test_type2", b"data2") + + finally: + saver.checkpoint_writes_index.search = original_search + + +def test_sync_shallow_missing_blob_attribute(redis_url: str) -> None: + """Test that ShallowRedisSaver handles documents without blob attribute.""" + with ShallowRedisSaver.from_conn_string(redis_url) as saver: + # Create mock search result with documents missing blob attribute + mock_search_result = MagicMock() + mock_search_result.docs = [ + MockDocument( + { + "type": "test_type1", + "task_path": "path1", + "task_id": "task1", + "idx": 0, + } + ), + MockDocument( + { + "type": "test_type2", + "task_path": "path2", + "task_id": "task2", + "idx": 1, + } + ), + MockDocument( + { + "type": "test_type3", + "task_path": "path3", + "task_id": "task3", + "idx": 2, + "blob": None, + } + ), + ] + + # Mock the search method + original_search = saver.checkpoint_writes_index.search + saver.checkpoint_writes_index.search = lambda _: mock_search_result + + try: + # This should NOT raise AttributeError with the fix + result = saver._load_pending_sends("test_thread", "test_ns") + + # Result should be empty list since all docs have missing or None blob + assert ( + result == [] + ), f"Expected empty list when blob is missing/None, got {result}" + + finally: + # Restore original search method + saver.checkpoint_writes_index.search = original_search + + +def test_sync_shallow_with_valid_blob(redis_url: str) -> None: + """Test that ShallowRedisSaver correctly processes documents with valid blob.""" + with ShallowRedisSaver.from_conn_string(redis_url) as saver: + # Create mock search result with documents having valid blob data + mock_search_result = MagicMock() + mock_search_result.docs = [ + MockDocument( + { + "type": "test_type1", + "task_path": "path1", + "task_id": "task1", + "idx": 0, + "blob": b"data1", + } + ), + MockDocument( + { + "type": "test_type2", + "task_path": "path2", + "task_id": "task2", + "idx": 1, + "blob": b"data2", + } + ), + MockDocument( + { + "type": "test_type3", + "task_path": "path3", + "task_id": "task3", + "idx": 2, + "blob": None, + } + ), + MockDocument( + { + "type": "test_type4", + "task_path": "path4", + "task_id": "task4", + "idx": 3, + } + ), # Missing blob + ] + + # Mock the search method + original_search = saver.checkpoint_writes_index.search + saver.checkpoint_writes_index.search = lambda _: mock_search_result + + try: + result = saver._load_pending_sends("test_thread", "test_ns") + + # Should only include documents with valid (non-None) blob + assert ( + len(result) == 2 + ), f"Expected 2 results with valid blobs, got {len(result)}" + assert result[0] == ("test_type1", b"data1") + assert result[1] == ("test_type2", b"data2") + + finally: + saver.checkpoint_writes_index.search = original_search + + +@pytest.mark.asyncio +async def test_async_shallow_json_path_blob_attribute(redis_url: str) -> None: + """Test handling of $.blob JSON path attribute in AsyncShallowRedisSaver.""" + async with AsyncShallowRedisSaver(redis_url=redis_url) as saver: + # Create mock documents with $.blob attribute (JSON path syntax) + class MockDocumentWithJsonPath: + def __init__(self, data: Dict[str, Any]): + self.type = data.get("type", "") + self.task_path = data.get("task_path", "") + self.task_id = data.get("task_id", "") + self.idx = data.get("idx", 0) + # Set $.blob attribute (JSON path syntax from Redis) + if "json_blob" in data: + setattr(self, "$.blob", data["json_blob"]) + + mock_search_result = MagicMock() + mock_search_result.docs = [ + MockDocumentWithJsonPath( + { + "type": "test_type1", + "task_path": "p1", + "task_id": "t1", + "idx": 0, + "json_blob": b"json_data1", + } + ), + MockDocumentWithJsonPath( + {"type": "test_type2", "task_path": "p2", "task_id": "t2", "idx": 1} + ), # No blob at all + MockDocumentWithJsonPath( + { + "type": "test_type3", + "task_path": "p3", + "task_id": "t3", + "idx": 2, + "json_blob": None, + } + ), + ] + + original_search = saver.checkpoint_writes_index.search + + async def mock_search(_): + return mock_search_result + + saver.checkpoint_writes_index.search = mock_search + + try: + result = await saver._aload_pending_sends("test_thread", "test_ns") + + # Should only include the document with valid $.blob + assert ( + len(result) == 1 + ), f"Expected 1 result with valid $.blob, got {len(result)}" + assert result[0] == ("test_type1", b"json_data1") + + finally: + saver.checkpoint_writes_index.search = original_search + + +def test_sync_shallow_json_path_blob_attribute(redis_url: str) -> None: + """Test handling of $.blob JSON path attribute in ShallowRedisSaver.""" + with ShallowRedisSaver.from_conn_string(redis_url) as saver: + # Create mock documents with $.blob attribute (JSON path syntax) + class MockDocumentWithJsonPath: + def __init__(self, data: Dict[str, Any]): + self.type = data.get("type", "") + self.task_path = data.get("task_path", "") + self.task_id = data.get("task_id", "") + self.idx = data.get("idx", 0) + # Set $.blob attribute (JSON path syntax from Redis) + if "json_blob" in data: + setattr(self, "$.blob", data["json_blob"]) + + mock_search_result = MagicMock() + mock_search_result.docs = [ + MockDocumentWithJsonPath( + { + "type": "test_type1", + "task_path": "p1", + "task_id": "t1", + "idx": 0, + "json_blob": b"json_data1", + } + ), + MockDocumentWithJsonPath( + {"type": "test_type2", "task_path": "p2", "task_id": "t2", "idx": 1} + ), # No blob at all + MockDocumentWithJsonPath( + { + "type": "test_type3", + "task_path": "p3", + "task_id": "t3", + "idx": 2, + "json_blob": None, + } + ), + ] + + original_search = saver.checkpoint_writes_index.search + saver.checkpoint_writes_index.search = lambda _: mock_search_result + + try: + result = saver._load_pending_sends("test_thread", "test_ns") + + # Should only include the document with valid $.blob + assert ( + len(result) == 1 + ), f"Expected 1 result with valid $.blob, got {len(result)}" + assert result[0] == ("test_type1", b"json_data1") + + finally: + saver.checkpoint_writes_index.search = original_search