Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions langgraph/checkpoint/redis/jsonplus_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ class JsonPlusRedisSerializer(JsonPlusSerializer):

def dumps(self, obj: Any) -> bytes:
"""Use orjson for simple objects, fallback to parent for complex objects."""
try:
# Check if this is an Interrupt object that needs special handling
from langgraph.types import Interrupt

if isinstance(obj, Interrupt):
# Serialize Interrupt as a constructor format for proper deserialization
return super().dumps(obj)
except ImportError:
pass

try:
# Fast path: Use orjson for JSON-serializable objects
return orjson.dumps(obj)
Expand Down Expand Up @@ -67,6 +77,10 @@ def _revive_if_needed(self, obj: Any) -> Any:
'lc', 'type', and 'constructor' fields, causing errors when the application
expects actual message objects with 'role' and 'content' attributes.

This also handles Interrupt objects that may be stored as plain dictionaries
with 'value' and 'id' keys, reconstructing them as proper Interrupt instances
to prevent AttributeError when accessing the 'id' attribute.

Args:
obj: The object to potentially revive, which may be a dict, list, or primitive.

Expand All @@ -80,6 +94,25 @@ def _revive_if_needed(self, obj: Any) -> Any:
# This converts {'lc': 1, 'type': 'constructor', ...} back to
# the actual LangChain object (e.g., HumanMessage, AIMessage)
return self._reviver(obj)

# Check if this looks like an Interrupt object stored as a plain dict
# Interrupt objects have 'value' and 'id' keys, and possibly nothing else
# We need to be careful not to accidentally convert other dicts
if (
"value" in obj
and "id" in obj
and len(obj) == 2
and isinstance(obj.get("id"), str)
):
# Try to reconstruct as an Interrupt object
try:
from langgraph.types import Interrupt

return Interrupt(value=obj["value"], id=obj["id"]) # type: ignore[call-arg]
except (ImportError, TypeError, ValueError):
# If we can't import or construct Interrupt, fall through
pass

# Recursively process nested dicts
return {k: self._revive_if_needed(v) for k, v in obj.items()}
elif isinstance(obj, list):
Expand Down
8 changes: 1 addition & 7 deletions tests/test_async_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,7 @@
from uuid import uuid4

import pytest
from langgraph.store.base import (
GetOp,
Item,
ListNamespacesOp,
PutOp,
SearchOp,
)
from langgraph.store.base import GetOp, Item, ListNamespacesOp, PutOp, SearchOp

from langgraph.store.redis import AsyncRedisStore
from tests.embed_test_utils import CharacterEmbeddings
Expand Down
5 changes: 1 addition & 4 deletions tests/test_crossslot_integration.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
"""Integration tests for CrossSlot error fix in checkpoint operations."""

from langgraph.checkpoint.base import (
create_checkpoint,
empty_checkpoint,
)
from langgraph.checkpoint.base import create_checkpoint, empty_checkpoint

from langgraph.checkpoint.redis import RedisSaver

Expand Down
Loading