Skip to content

Commit cf6a202

Browse files
authored
fix: deserialize LangChain messages in async checkpoint operations (#87)
Add _recursive_deserialize to AsyncRedisSaver.aget_tuple() and alist() methods to properly deserialize LangChain messages from their stored format, matching the sync implementation behavior.
1 parent 618f494 commit cf6a202

File tree

2 files changed

+614
-3
lines changed

2 files changed

+614
-3
lines changed

langgraph/checkpoint/redis/aio.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -540,13 +540,13 @@ async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]:
540540
# Execute all tasks in parallel - pending_sends is optional
541541
if doc_parent_checkpoint_id:
542542
results = await asyncio.gather(*tasks)
543-
channel_values: Dict[str, Any] = results[0]
543+
channel_values: Dict[str, Any] = self._recursive_deserialize(results[0])
544544
pending_sends: List[Tuple[str, Union[str, bytes]]] = results[1]
545545
pending_writes: List[PendingWrite] = results[2]
546546
else:
547547
# Only channel_values and pending_writes tasks
548548
results = await asyncio.gather(*tasks)
549-
channel_values = results[0]
549+
channel_values = self._recursive_deserialize(results[0])
550550
pending_sends = []
551551
pending_writes = results[1]
552552

@@ -709,7 +709,7 @@ async def alist(
709709
if isinstance(checkpoint_data, dict)
710710
else orjson.loads(checkpoint_data)
711711
)
712-
channel_values = checkpoint_dict.get("channel_values", {})
712+
channel_values = self._recursive_deserialize(checkpoint_dict.get("channel_values", {}))
713713
else:
714714
# If checkpoint data is missing, the document is corrupted
715715
# Set empty channel values rather than attempting a fallback

0 commit comments

Comments
 (0)