From dda99f8ff33b2c6722a7e12544343c1f85870c55 Mon Sep 17 00:00:00 2001 From: Brian Sam-Bodden Date: Mon, 11 Aug 2025 15:14:32 -0700 Subject: [PATCH] fix: resolve Command(resume) warning about invalid packet type dict (#83) Issue #83: Command with resume argument failing in v0.1.0 Users reported warning 'Ignoring invalid packet type in pending sends' when using Command(resume={'interrupt_id': {'some': 'result'}}) after upgrading from 0.0.8 to 0.1.0. Root cause: Type annotation mismatch in _load_pending_sends methods. They were annotated as returning List[Tuple[str, bytes]] but Redis JSON actually returns strings for blob data, not bytes, causing List[Tuple[str, Union[str, bytes]]]. This type mismatch caused the warning when Command(resume) tried to process pending sends containing dict values through the TASKS channel. Changes: - Updated return type hints for _load_pending_sends methods in both sync and async - Updated _load_pending_sends_with_registry_check type hints - Updated _abatch_load_pending_sends and local variable annotations in async - Added test that simulates Command(resume) scenario and verifies no warning - Added test for type compatibility with Redis JSON string blobs The fix ensures Command(resume) works without warnings while maintaining backward compatibility with code that passes bytes. --- langgraph/checkpoint/redis/__init__.py | 4 +- langgraph/checkpoint/redis/aio.py | 8 +- tests/test_checkpoint_serialization.py | 141 +++++++++++++++++++++++++ 3 files changed, 147 insertions(+), 6 deletions(-) diff --git a/langgraph/checkpoint/redis/__init__.py b/langgraph/checkpoint/redis/__init__.py index dd1f3a5..2d28abd 100644 --- a/langgraph/checkpoint/redis/__init__.py +++ b/langgraph/checkpoint/redis/__init__.py @@ -1155,7 +1155,7 @@ def _load_pending_sends( thread_id: str, checkpoint_ns: str, parent_checkpoint_id: str, - ) -> List[Tuple[str, bytes]]: + ) -> List[Tuple[str, Union[str, bytes]]]: """Load pending sends for a parent checkpoint. Args: @@ -1437,7 +1437,7 @@ def _load_pending_sends_with_registry_check( thread_id: str, checkpoint_ns: str, parent_checkpoint_id: str, - ) -> List[Tuple[str, bytes]]: + ) -> List[Tuple[str, Union[str, bytes]]]: """Load pending sends for a parent checkpoint with pre-computed registry check.""" if not parent_checkpoint_id: return [] diff --git a/langgraph/checkpoint/redis/aio.py b/langgraph/checkpoint/redis/aio.py index f9b207e..879af1c 100644 --- a/langgraph/checkpoint/redis/aio.py +++ b/langgraph/checkpoint/redis/aio.py @@ -541,7 +541,7 @@ async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]: if doc_parent_checkpoint_id: results = await asyncio.gather(*tasks) channel_values: Dict[str, Any] = results[0] - pending_sends: List[Tuple[str, bytes]] = results[1] + pending_sends: List[Tuple[str, Union[str, bytes]]] = results[1] pending_writes: List[PendingWrite] = results[2] else: # Only channel_values and pending_writes tasks @@ -772,7 +772,7 @@ async def alist( parent_checkpoint_id = doc_data["parent_checkpoint_id"] # Get pending_sends from batch results - pending_sends: List[Tuple[str, bytes]] = [] + pending_sends: List[Tuple[str, Union[str, bytes]]] = [] if parent_checkpoint_id: batch_key = (thread_id, checkpoint_ns, parent_checkpoint_id) pending_sends = pending_sends_map.get(batch_key, []) @@ -1443,7 +1443,7 @@ async def aget_channel_values( async def _aload_pending_sends( self, thread_id: str, checkpoint_ns: str = "", parent_checkpoint_id: str = "" - ) -> List[Tuple[str, bytes]]: + ) -> List[Tuple[str, Union[str, bytes]]]: """Load pending sends for a parent checkpoint. Args: @@ -1640,7 +1640,7 @@ async def _aload_pending_writes( async def _abatch_load_pending_sends( self, batch_keys: List[Tuple[str, str, str]] - ) -> Dict[Tuple[str, str, str], List[Tuple[str, bytes]]]: + ) -> Dict[Tuple[str, str, str], List[Tuple[str, Union[str, bytes]]]]: """Batch load pending sends for multiple parent checkpoints. Args: diff --git a/tests/test_checkpoint_serialization.py b/tests/test_checkpoint_serialization.py index 77984f2..4da7863 100644 --- a/tests/test_checkpoint_serialization.py +++ b/tests/test_checkpoint_serialization.py @@ -35,6 +35,147 @@ def _saver(redis_url: str): del saver +def test_issue_83_command_resume_no_warning(redis_url: str) -> None: + """Test that Command(resume={...}) doesn't cause 'invalid packet type' warning (issue #83). + + The user reported that Command(resume={'interrupt_id': {'some': 'result'}}) + caused warning: "Ignoring invalid packet type in pending sends" + This test verifies our fix prevents that warning. + """ + import warnings + + from langgraph.constants import TASKS + + with _saver(redis_url) as saver: + # Create interrupted checkpoint + interrupted_config = { + "configurable": { + "thread_id": "test-thread-83", + "checkpoint_ns": "", + "checkpoint_id": "interrupted-checkpoint", + } + } + + interrupted_checkpoint = { + "v": 1, + "ts": "2024-01-01T00:00:00+00:00", + "id": "interrupted-checkpoint", + "channel_values": {"messages": ["before interrupt"]}, + "channel_versions": {}, + "versions_seen": {}, + "pending_sends": [], + } + + metadata = {"source": "loop", "step": 1, "writes": {}} + + # Save the interrupted checkpoint + saver.put(interrupted_config, interrupted_checkpoint, metadata, {}) + + # Simulate Command(resume={'interrupt_id': {'some': 'result'}}) + resume_data = {"interrupt_id": {"some": "result"}} + saver.put_writes( + interrupted_config, + [(TASKS, resume_data)], # This puts a dict into TASKS + task_id="resume_task", + ) + + # Create resumed checkpoint with parent reference + resumed_config = { + "configurable": { + "thread_id": "test-thread-83", + "checkpoint_ns": "", + "checkpoint_id": "resumed-checkpoint", + } + } + + resumed_checkpoint = { + "v": 1, + "ts": "2024-01-01T00:01:00+00:00", + "id": "resumed-checkpoint", + "channel_values": {"messages": ["after resume"]}, + "channel_versions": {}, + "versions_seen": {}, + "pending_sends": [], + } + + resumed_metadata = { + "source": "loop", + "step": 2, + "writes": {}, + "parent_config": interrupted_config, + } + + saver.put(resumed_config, resumed_checkpoint, resumed_metadata, {}) + + # Load resumed checkpoint - check for warning + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + + result = saver.get_tuple(resumed_config) + + # Check if we get the warning about invalid packet type + dict_warnings = [ + warning + for warning in w + if "Ignoring invalid packet type" in str(warning.message) + and "dict" in str(warning.message) + ] + + # Our fix should prevent this warning + assert len(dict_warnings) == 0, f"Got warning: {dict_warnings}" + + assert result is not None + assert result.checkpoint["id"] == "resumed-checkpoint" + + +def test_issue_83_pending_sends_type_compatibility(redis_url: str) -> None: + """Test that pending_sends work with string blobs from Redis JSON (issue #83). + + Issue #83 was caused by type mismatch where _load_pending_sends returned + List[Tuple[str, Union[str, bytes]]] but was annotated as List[Tuple[str, bytes]]. + This test verifies the fix works correctly. + """ + with _saver(redis_url) as saver: + checkpoint_dict = { + "v": 1, + "ts": "2024-01-01T00:00:00+00:00", + "id": "test-checkpoint", + "channel_versions": {}, + "versions_seen": {}, + } + + channel_values = {} + + # Test with string blobs (what Redis JSON returns) + pending_sends_with_strings = [ + ("json", '{"test": "value"}'), # String blob from Redis JSON + ] + + # This should work without type errors + result = saver._load_checkpoint( + checkpoint_dict, channel_values, pending_sends_with_strings + ) + + assert "pending_sends" in result + assert len(result["pending_sends"]) == 1 + assert result["pending_sends"][0] == {"test": "value"} + + # Test JsonPlusRedisSerializer compatibility + test_data = {"some": "result", "user_input": "continue"} + + # Serialize + type_str, blob = saver.serde.dumps_typed(test_data) + assert isinstance(type_str, str) + assert isinstance(blob, str) # JsonPlusRedisSerializer returns strings + + # Deserialize - should work with both string and bytes + result1 = saver.serde.loads_typed((type_str, blob)) + result2 = saver.serde.loads_typed((type_str, blob.encode())) # bytes version + + assert result1 == test_data + assert result2 == test_data + + def test_load_blobs_method(redis_url: str) -> None: """Test _load_blobs method with various scenarios.