diff --git a/langgraph/checkpoint/redis/aio.py b/langgraph/checkpoint/redis/aio.py index a6c7fe8..1edb067 100644 --- a/langgraph/checkpoint/redis/aio.py +++ b/langgraph/checkpoint/redis/aio.py @@ -1009,29 +1009,33 @@ async def _aload_pending_sends( """ # Query checkpoint_writes for parent checkpoint's TASKS channel parent_writes_query = FilterQuery( - filter_expression=(Tag("thread_id") == to_storage_safe_id(thread_id)) - & (Tag("checkpoint_ns") == to_storage_safe_str(checkpoint_ns)) - & (Tag("checkpoint_id") == to_storage_safe_id(parent_checkpoint_id)) - & (Tag("channel") == TASKS), - return_fields=["type", "blob", "task_path", "task_id", "idx"], - num_results=100, # Adjust as needed - ) - parent_writes_results = await self.checkpoint_writes_index.search( - parent_writes_query + filter_expression=( + (Tag("thread_id") == to_storage_safe_id(thread_id)) + & (Tag("checkpoint_ns") == checkpoint_ns) + & (Tag("checkpoint_id") == to_storage_safe_id(parent_checkpoint_id)) + & (Tag("channel") == TASKS) + ), + return_fields=["type", "$.blob", "task_path", "task_id", "idx"], + num_results=100, ) - - # Sort results by task_path, task_id, idx (matching Postgres implementation) - sorted_writes = sorted( - parent_writes_results.docs, - key=lambda x: ( - getattr(x, "task_path", ""), - getattr(x, "task_id", ""), - getattr(x, "idx", 0), + res = await self.checkpoint_writes_index.search(parent_writes_query) + + # Sort results for deterministic order + docs = sorted( + res.docs, + key=lambda d: ( + getattr(d, "task_path", ""), + getattr(d, "task_id", ""), + getattr(d, "idx", 0), ), ) - - # Extract type and blob pairs - return [(doc.type, doc.blob) for doc in sorted_writes] + + # Convert to expected format + return [ + (d.type.encode(), blob) + for d in docs + if (blob := getattr(d, "$.blob", getattr(d, "blob", None))) is not None + ] async def _aload_pending_writes( self,