Skip to content

Commit 618f494

Browse files
committed
feat: add MessageExporter for extracting conversation messages from checkpoints (#73)
- Implement MessageExporter class for extracting messages from Redis checkpoints - Add LangChainRecipe for handling LangChain message formats (HumanMessage, AIMessage, etc.) - Support both single checkpoint export and full thread export - Use orjson for efficient JSON processing
1 parent 36b503a commit 618f494

File tree

4 files changed

+777
-0
lines changed

4 files changed

+777
-0
lines changed

langgraph/checkpoint/redis/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@
3535
BaseRedisSaver,
3636
)
3737
from langgraph.checkpoint.redis.key_registry import SyncCheckpointKeyRegistry
38+
from langgraph.checkpoint.redis.message_exporter import (
39+
LangChainRecipe,
40+
MessageExporter,
41+
MessageRecipe,
42+
)
3843
from langgraph.checkpoint.redis.shallow import ShallowRedisSaver
3944
from langgraph.checkpoint.redis.util import (
4045
EMPTY_ID_SENTINEL,
@@ -1652,4 +1657,7 @@ def delete_thread(self, thread_id: str) -> None:
16521657
"BaseRedisSaver",
16531658
"ShallowRedisSaver",
16541659
"AsyncShallowRedisSaver",
1660+
"MessageExporter",
1661+
"LangChainRecipe",
1662+
"MessageRecipe",
16551663
]
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
"""Message exporter for extracting conversation messages from checkpoints."""
2+
3+
from datetime import datetime
4+
from typing import Any, Dict, List, Optional, Protocol
5+
6+
import orjson
7+
8+
9+
class MessageRecipe(Protocol):
10+
"""Protocol for message extraction recipes.
11+
12+
Implement this interface to support custom message formats.
13+
"""
14+
15+
def extract(self, message: Any) -> Optional[Dict[str, Any]]:
16+
"""Extract structured data from a message.
17+
18+
Args:
19+
message: The message to extract data from.
20+
21+
Returns:
22+
Dict with at least 'role' and 'content' keys, or None if message cannot be extracted.
23+
"""
24+
...
25+
26+
27+
class LangChainRecipe:
28+
"""Default recipe for extracting LangChain messages."""
29+
30+
def extract(self, message: Any) -> Optional[Dict[str, Any]]:
31+
"""Extract data from LangChain message objects."""
32+
try:
33+
from langchain_core.messages import BaseMessage
34+
35+
if isinstance(message, BaseMessage):
36+
# Handle actual message objects
37+
return {
38+
"role": message.__class__.__name__.replace("Message", "").lower(),
39+
"content": message.content,
40+
"type": message.__class__.__name__,
41+
"id": getattr(message, "id", None),
42+
"metadata": {
43+
"name": getattr(message, "name", None),
44+
"tool_calls": getattr(message, "tool_calls", None),
45+
"additional_kwargs": getattr(message, "additional_kwargs", {}),
46+
},
47+
}
48+
except ImportError:
49+
# langchain_core not available, handle as dict
50+
pass
51+
52+
if isinstance(message, dict):
53+
# Handle serialized LangChain format
54+
if message.get("lc") and message.get("type") == "constructor":
55+
kwargs = message.get("kwargs", {})
56+
message_type = (
57+
message.get("id", ["unknown"])[-1]
58+
if isinstance(message.get("id"), list)
59+
else "unknown"
60+
)
61+
return {
62+
"role": message_type.replace("Message", "").lower(),
63+
"content": kwargs.get("content", ""),
64+
"type": message_type,
65+
"id": kwargs.get("id"),
66+
"metadata": kwargs,
67+
}
68+
# Handle simple dict format
69+
elif "role" in message and "content" in message:
70+
return message
71+
elif isinstance(message, str):
72+
# Plain string message
73+
return {"role": "unknown", "content": message, "type": "string"}
74+
75+
return None
76+
77+
78+
class MessageExporter:
79+
"""Export messages from Redis checkpoints."""
80+
81+
def __init__(
82+
self, redis_saver: Any, recipe: Optional[MessageRecipe] = None
83+
) -> None:
84+
self.saver = redis_saver
85+
self.recipe = recipe or LangChainRecipe()
86+
87+
def export(
88+
self, thread_id: str, checkpoint_id: Optional[str] = None
89+
) -> List[Dict[str, Any]]:
90+
"""Export messages from checkpoint data.
91+
92+
Args:
93+
thread_id: The conversation thread ID
94+
checkpoint_id: Specific checkpoint ID (latest if None)
95+
96+
Returns:
97+
List of extracted message dictionaries
98+
"""
99+
# Get checkpoint
100+
if checkpoint_id:
101+
config = {
102+
"configurable": {"thread_id": thread_id, "checkpoint_id": checkpoint_id}
103+
}
104+
checkpoint = self.saver.get(config)
105+
else:
106+
# Get latest checkpoint
107+
checkpoint_tuple = self.saver.get_tuple(
108+
{"configurable": {"thread_id": thread_id}}
109+
)
110+
checkpoint = checkpoint_tuple.checkpoint if checkpoint_tuple else None
111+
112+
if not checkpoint:
113+
return []
114+
115+
# Extract messages from channel_values
116+
messages = checkpoint.get("channel_values", {}).get("messages", [])
117+
118+
extracted = []
119+
for msg in messages:
120+
extracted_msg = self.recipe.extract(msg)
121+
if extracted_msg:
122+
extracted.append(extracted_msg)
123+
124+
return extracted
125+
126+
def export_thread(self, thread_id: str) -> Dict[str, Any]:
127+
"""Export all messages from all checkpoints in a thread.
128+
129+
Args:
130+
thread_id: The conversation thread ID
131+
132+
Returns:
133+
Dict with thread_id, messages, and export timestamp
134+
"""
135+
messages = []
136+
seen_ids = set()
137+
138+
# Get all checkpoints for thread
139+
for checkpoint_tuple in self.saver.list(
140+
{"configurable": {"thread_id": thread_id}}
141+
):
142+
checkpoint_messages = checkpoint_tuple.checkpoint.get(
143+
"channel_values", {}
144+
).get("messages", [])
145+
146+
for msg in checkpoint_messages:
147+
extracted = self.recipe.extract(msg)
148+
if extracted:
149+
# Add checkpoint metadata
150+
extracted["checkpoint_id"] = checkpoint_tuple.checkpoint.get("id")
151+
extracted["checkpoint_ts"] = checkpoint_tuple.checkpoint.get("ts")
152+
153+
# Deduplicate by message ID if available
154+
msg_id = extracted.get("id")
155+
if msg_id:
156+
if msg_id in seen_ids:
157+
continue
158+
seen_ids.add(msg_id)
159+
160+
messages.append(extracted)
161+
162+
return {
163+
"thread_id": thread_id,
164+
"messages": messages,
165+
"export_timestamp": datetime.utcnow().isoformat(),
166+
}

0 commit comments

Comments
 (0)