Skip to content

Commit 8602add

Browse files
author
Aymen Tilfani
committed
fix: encode type and handle missing blob in Redis checkpoint writes
1 parent b4fad1c commit 8602add

File tree

1 file changed

+24
-20
lines changed
  • langgraph/checkpoint/redis

1 file changed

+24
-20
lines changed

langgraph/checkpoint/redis/aio.py

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,29 +1009,33 @@ async def _aload_pending_sends(
10091009
"""
10101010
# Query checkpoint_writes for parent checkpoint's TASKS channel
10111011
parent_writes_query = FilterQuery(
1012-
filter_expression=(Tag("thread_id") == to_storage_safe_id(thread_id))
1013-
& (Tag("checkpoint_ns") == to_storage_safe_str(checkpoint_ns))
1014-
& (Tag("checkpoint_id") == to_storage_safe_id(parent_checkpoint_id))
1015-
& (Tag("channel") == TASKS),
1016-
return_fields=["type", "blob", "task_path", "task_id", "idx"],
1017-
num_results=100, # Adjust as needed
1018-
)
1019-
parent_writes_results = await self.checkpoint_writes_index.search(
1020-
parent_writes_query
1012+
filter_expression=(
1013+
(Tag("thread_id") == to_storage_safe_id(thread_id))
1014+
& (Tag("checkpoint_ns") == checkpoint_ns)
1015+
& (Tag("checkpoint_id") == to_storage_safe_id(parent_checkpoint_id))
1016+
& (Tag("channel") == TASKS)
1017+
),
1018+
return_fields=["type", "$.blob", "task_path", "task_id", "idx"],
1019+
num_results=100,
10211020
)
1022-
1023-
# Sort results by task_path, task_id, idx (matching Postgres implementation)
1024-
sorted_writes = sorted(
1025-
parent_writes_results.docs,
1026-
key=lambda x: (
1027-
getattr(x, "task_path", ""),
1028-
getattr(x, "task_id", ""),
1029-
getattr(x, "idx", 0),
1021+
res = await self.checkpoint_writes_index.search(parent_writes_query)
1022+
1023+
# Sort results for deterministic order
1024+
docs = sorted(
1025+
res.docs,
1026+
key=lambda d: (
1027+
getattr(d, "task_path", ""),
1028+
getattr(d, "task_id", ""),
1029+
getattr(d, "idx", 0),
10301030
),
10311031
)
1032-
1033-
# Extract type and blob pairs
1034-
return [(doc.type, doc.blob) for doc in sorted_writes]
1032+
1033+
# Convert to expected format
1034+
return [
1035+
(d.type.encode(), blob)
1036+
for d in docs
1037+
if (blob := getattr(d, "$.blob", getattr(d, "blob", None))) is not None
1038+
]
10351039

10361040
async def _aload_pending_writes(
10371041
self,

0 commit comments

Comments
 (0)