Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions langgraph/checkpoint/redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1155,7 +1155,7 @@ def _load_pending_sends(
thread_id: str,
checkpoint_ns: str,
parent_checkpoint_id: str,
) -> List[Tuple[str, bytes]]:
) -> List[Tuple[str, Union[str, bytes]]]:
"""Load pending sends for a parent checkpoint.

Args:
Expand Down Expand Up @@ -1437,7 +1437,7 @@ def _load_pending_sends_with_registry_check(
thread_id: str,
checkpoint_ns: str,
parent_checkpoint_id: str,
) -> List[Tuple[str, bytes]]:
) -> List[Tuple[str, Union[str, bytes]]]:
"""Load pending sends for a parent checkpoint with pre-computed registry check."""
if not parent_checkpoint_id:
return []
Expand Down
8 changes: 4 additions & 4 deletions langgraph/checkpoint/redis/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
if doc_parent_checkpoint_id:
results = await asyncio.gather(*tasks)
channel_values: Dict[str, Any] = results[0]
pending_sends: List[Tuple[str, bytes]] = results[1]
pending_sends: List[Tuple[str, Union[str, bytes]]] = results[1]
pending_writes: List[PendingWrite] = results[2]
else:
# Only channel_values and pending_writes tasks
Expand Down Expand Up @@ -772,7 +772,7 @@ async def alist(
parent_checkpoint_id = doc_data["parent_checkpoint_id"]

# Get pending_sends from batch results
pending_sends: List[Tuple[str, bytes]] = []
pending_sends: List[Tuple[str, Union[str, bytes]]] = []
if parent_checkpoint_id:
batch_key = (thread_id, checkpoint_ns, parent_checkpoint_id)
pending_sends = pending_sends_map.get(batch_key, [])
Expand Down Expand Up @@ -1443,7 +1443,7 @@ async def aget_channel_values(

async def _aload_pending_sends(
self, thread_id: str, checkpoint_ns: str = "", parent_checkpoint_id: str = ""
) -> List[Tuple[str, bytes]]:
) -> List[Tuple[str, Union[str, bytes]]]:
"""Load pending sends for a parent checkpoint.

Args:
Expand Down Expand Up @@ -1640,7 +1640,7 @@ async def _aload_pending_writes(

async def _abatch_load_pending_sends(
self, batch_keys: List[Tuple[str, str, str]]
) -> Dict[Tuple[str, str, str], List[Tuple[str, bytes]]]:
) -> Dict[Tuple[str, str, str], List[Tuple[str, Union[str, bytes]]]]:
"""Batch load pending sends for multiple parent checkpoints.

Args:
Expand Down
141 changes: 141 additions & 0 deletions tests/test_checkpoint_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,147 @@ def _saver(redis_url: str):
del saver


def test_issue_83_command_resume_no_warning(redis_url: str) -> None:
"""Test that Command(resume={...}) doesn't cause 'invalid packet type' warning (issue #83).

The user reported that Command(resume={'interrupt_id': {'some': 'result'}})
caused warning: "Ignoring invalid packet type <class 'dict'> in pending sends"
This test verifies our fix prevents that warning.
"""
import warnings

from langgraph.constants import TASKS

with _saver(redis_url) as saver:
# Create interrupted checkpoint
interrupted_config = {
"configurable": {
"thread_id": "test-thread-83",
"checkpoint_ns": "",
"checkpoint_id": "interrupted-checkpoint",
}
}

interrupted_checkpoint = {
"v": 1,
"ts": "2024-01-01T00:00:00+00:00",
"id": "interrupted-checkpoint",
"channel_values": {"messages": ["before interrupt"]},
"channel_versions": {},
"versions_seen": {},
"pending_sends": [],
}

metadata = {"source": "loop", "step": 1, "writes": {}}

# Save the interrupted checkpoint
saver.put(interrupted_config, interrupted_checkpoint, metadata, {})

# Simulate Command(resume={'interrupt_id': {'some': 'result'}})
resume_data = {"interrupt_id": {"some": "result"}}
saver.put_writes(
interrupted_config,
[(TASKS, resume_data)], # This puts a dict into TASKS
task_id="resume_task",
)

# Create resumed checkpoint with parent reference
resumed_config = {
"configurable": {
"thread_id": "test-thread-83",
"checkpoint_ns": "",
"checkpoint_id": "resumed-checkpoint",
}
}

resumed_checkpoint = {
"v": 1,
"ts": "2024-01-01T00:01:00+00:00",
"id": "resumed-checkpoint",
"channel_values": {"messages": ["after resume"]},
"channel_versions": {},
"versions_seen": {},
"pending_sends": [],
}

resumed_metadata = {
"source": "loop",
"step": 2,
"writes": {},
"parent_config": interrupted_config,
}

saver.put(resumed_config, resumed_checkpoint, resumed_metadata, {})

# Load resumed checkpoint - check for warning
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")

result = saver.get_tuple(resumed_config)

# Check if we get the warning about invalid packet type
dict_warnings = [
warning
for warning in w
if "Ignoring invalid packet type" in str(warning.message)
and "dict" in str(warning.message)
]

# Our fix should prevent this warning
assert len(dict_warnings) == 0, f"Got warning: {dict_warnings}"

assert result is not None
assert result.checkpoint["id"] == "resumed-checkpoint"


def test_issue_83_pending_sends_type_compatibility(redis_url: str) -> None:
"""Test that pending_sends work with string blobs from Redis JSON (issue #83).

Issue #83 was caused by type mismatch where _load_pending_sends returned
List[Tuple[str, Union[str, bytes]]] but was annotated as List[Tuple[str, bytes]].
This test verifies the fix works correctly.
"""
with _saver(redis_url) as saver:
checkpoint_dict = {
"v": 1,
"ts": "2024-01-01T00:00:00+00:00",
"id": "test-checkpoint",
"channel_versions": {},
"versions_seen": {},
}

channel_values = {}

# Test with string blobs (what Redis JSON returns)
pending_sends_with_strings = [
("json", '{"test": "value"}'), # String blob from Redis JSON
]

# This should work without type errors
result = saver._load_checkpoint(
checkpoint_dict, channel_values, pending_sends_with_strings
)

assert "pending_sends" in result
assert len(result["pending_sends"]) == 1
assert result["pending_sends"][0] == {"test": "value"}

# Test JsonPlusRedisSerializer compatibility
test_data = {"some": "result", "user_input": "continue"}

# Serialize
type_str, blob = saver.serde.dumps_typed(test_data)
assert isinstance(type_str, str)
assert isinstance(blob, str) # JsonPlusRedisSerializer returns strings

# Deserialize - should work with both string and bytes
result1 = saver.serde.loads_typed((type_str, blob))
result2 = saver.serde.loads_typed((type_str, blob.encode())) # bytes version

assert result1 == test_data
assert result2 == test_data


def test_load_blobs_method(redis_url: str) -> None:
"""Test _load_blobs method with various scenarios.

Expand Down