From cd08efb984cecbd55b3342a91670dc606e6bb5f4 Mon Sep 17 00:00:00 2001 From: Brian Sam-Bodden Date: Wed, 18 Jun 2025 13:43:05 -0700 Subject: [PATCH] feat(store): add hash storage support for vector data Add vector_storage_type configuration option to enable Redis hash storage for improved memory efficiency. Supports both "json" (default) and "hash" storage types with automatic vector serialization. - Add vector_storage_type field to IndexConfig for storage type selection - Implement byte string serialization for hash storage using array_to_buffer - Maintain backward compatibility with JSON storage as default - Add comprehensive test coverage for hash storage functionality - Ensure type safety with proper schema copying and type annotations Hash storage provides memory savings for vector data while maintaining full compatibility with existing vector search operations. --- langgraph/store/redis/__init__.py | 27 +++- langgraph/store/redis/base.py | 29 +++- tests/test_store.py | 228 ++++++++++++++++++++++++++++++ 3 files changed, 276 insertions(+), 8 deletions(-) diff --git a/langgraph/store/redis/__init__.py b/langgraph/store/redis/__init__.py index dd6a820..78669e2 100644 --- a/langgraph/store/redis/__init__.py +++ b/langgraph/store/redis/__init__.py @@ -372,17 +372,38 @@ def _batch_put_ops( vector_docs: list[dict[str, Any]] = [] vector_keys: list[str] = [] + + # Check if we're using hash storage for vectors + vector_storage_type = "json" # default + if self.index_config: + index_dict = dict(self.index_config) + vector_storage_type = index_dict.get("vector_storage_type", "json") + for (ns, key, path, _), vector in zip(text_params, vectors): vector_key: tuple[str, str] = (ns, key) doc_id = doc_ids[vector_key] + + # Prepare vector based on storage type + if vector_storage_type == "hash": + # For hash storage, convert vector to byte string + from redisvl.redis.utils import array_to_buffer + + vector_list = ( + vector.tolist() if hasattr(vector, "tolist") else vector + ) + embedding_value = array_to_buffer(vector_list, "float32") + else: + # For JSON storage, keep as list + embedding_value = ( + vector.tolist() if hasattr(vector, "tolist") else vector + ) + vector_docs.append( { "prefix": ns, "key": key, "field_name": path, - "embedding": ( - vector.tolist() if hasattr(vector, "tolist") else vector - ), + "embedding": embedding_value, "created_at": datetime.now(timezone.utc).timestamp(), "updated_at": datetime.now(timezone.utc).timestamp(), } diff --git a/langgraph/store/redis/base.py b/langgraph/store/redis/base.py index a511417..e1204c9 100644 --- a/langgraph/store/redis/base.py +++ b/langgraph/store/redis/base.py @@ -2,11 +2,22 @@ from __future__ import annotations +import copy import logging import threading from collections import defaultdict from datetime import datetime, timedelta, timezone -from typing import Any, Generic, Iterable, Optional, Sequence, TypedDict, TypeVar, Union +from typing import ( + Any, + Dict, + Generic, + Iterable, + Optional, + Sequence, + TypedDict, + TypeVar, + Union, +) from langgraph.store.base import ( GetOp, @@ -222,7 +233,15 @@ def __init__( # Configure vector index if needed if self.index_config: - vector_schema = self.SCHEMAS[1].copy() + # Get storage type from index config, default to "json" for backward compatibility + # Cast to dict to safely access potential extra fields + index_dict = dict(self.index_config) + vector_storage_type = index_dict.get("vector_storage_type", "json") + + vector_schema: Dict[str, Any] = copy.deepcopy(self.SCHEMAS[1]) + # Update storage type in schema + vector_schema["index"]["storage_type"] = vector_storage_type + vector_fields = vector_schema.get("fields", []) vector_field = None for f in vector_fields: @@ -243,14 +262,14 @@ def __init__( "l2": "L2", }[ _ensure_string_or_literal( - self.index_config.get("distance_type", "cosine") + index_dict.get("distance_type", "cosine") ) ], } # Apply any additional vector type config - if "ann_index_config" in self.index_config: - vector_field["attrs"].update(self.index_config["ann_index_config"]) + if "ann_index_config" in index_dict: + vector_field["attrs"].update(index_dict["ann_index_config"]) self.vector_index = SearchIndex.from_dict( vector_schema, redis_client=self._redis diff --git a/tests/test_store.py b/tests/test_store.py index ec3fc64..c2835c3 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -642,3 +642,231 @@ def mock_echo(self, message): finally: client.close() client.connection_pool.disconnect() + + +def test_vector_storage_json(redis_url, fake_embeddings: CharacterEmbeddings) -> None: + """Test JSON vector storage (default behavior).""" + # Test data + docs = [ + ("doc1", {"text": "hello world"}), + ("doc2", {"text": "hello universe"}), + ("doc3", {"text": "goodbye world"}), + ] + + index_config = { + "dims": fake_embeddings.dims, + "embed": fake_embeddings, + "distance_type": "cosine", + "fields": ["text"], + # vector_storage_type defaults to "json" + } + + ttl_config = {"default_ttl": 2, "refresh_on_read": True} + + with RedisStore.from_conn_string( + redis_url, index=index_config, ttl=ttl_config + ) as store: + store.setup() + + # Insert documents + for key, value in docs: + store.put(("test_json",), key, value) + + # Test vector search functionality + results = store.search(("test_json",), query="hello") + assert len(results) >= 2, "Vector search failed for JSON storage" + + # Verify both hello documents are found + doc_keys = [r.key for r in results] + assert "doc1" in doc_keys, "doc1 not found in JSON storage" + assert "doc2" in doc_keys, "doc2 not found in JSON storage" + + # Test that scores are reasonable (should be > 0 for cosine similarity) + for result in results: + if result.key in ["doc1", "doc2"]: + assert ( + result.score > 0 + ), f"Invalid score for JSON storage: {result.score}" + + # Test retrieval by key still works + item = store.get(("test_json",), "doc1") + assert item is not None, "Get operation failed for JSON storage" + assert ( + item.value["text"] == "hello world" + ), "Retrieved wrong value for JSON storage" + + +def test_vector_storage_hash(redis_url, fake_embeddings: CharacterEmbeddings) -> None: + """Test hash vector storage for improved memory efficiency.""" + # Test data + docs = [ + ("doc1", {"text": "hello world"}), + ("doc2", {"text": "hello universe"}), + ("doc3", {"text": "goodbye world"}), + ] + + index_config = { + "dims": fake_embeddings.dims, + "embed": fake_embeddings, + "distance_type": "cosine", + "fields": ["text"], + "vector_storage_type": "hash", # Enable hash storage + } + + ttl_config = {"default_ttl": 2, "refresh_on_read": True} + + with RedisStore.from_conn_string( + redis_url, index=index_config, ttl=ttl_config + ) as store: + store.setup() + + # Insert documents + for key, value in docs: + store.put(("test_hash",), key, value) + + # Test vector search functionality + results = store.search(("test_hash",), query="hello") + assert len(results) >= 2, "Vector search failed for hash storage" + + # Verify both hello documents are found + doc_keys = [r.key for r in results] + assert "doc1" in doc_keys, "doc1 not found in hash storage" + assert "doc2" in doc_keys, "doc2 not found in hash storage" + + # Test that scores are reasonable (should be > 0 for cosine similarity) + for result in results: + if result.key in ["doc1", "doc2"]: + assert ( + result.score > 0 + ), f"Invalid score for hash storage: {result.score}" + + # Test retrieval by key still works + item = store.get(("test_hash",), "doc1") + assert item is not None, "Get operation failed for hash storage" + assert ( + item.value["text"] == "hello world" + ), "Retrieved wrong value for hash storage" + + +def test_vector_search_hash(redis_url, fake_embeddings: CharacterEmbeddings) -> None: + """Test vector search functionality with hash storage.""" + index_config = { + "dims": fake_embeddings.dims, + "embed": fake_embeddings, + "distance_type": "cosine", + "fields": ["text"], + "vector_storage_type": "hash", + } + + ttl_config = {"default_ttl": 2, "refresh_on_read": True} + + with RedisStore.from_conn_string( + redis_url, index=index_config, ttl=ttl_config + ) as store: + store.setup() + + # Insert documents with text that can be embedded + docs = [ + ("doc1", {"text": "short text"}), + ("doc2", {"text": "longer text document"}), + ("doc3", {"text": "longest text document here"}), + ] + + for key, value in docs: + store.put(("test",), key, value) + + # Search with query + results = store.search(("test",), query="longer text") + assert len(results) >= 2 + + # Doc2 and doc3 should be closer matches to "longer text" + doc_keys = [r.key for r in results] + assert "doc2" in doc_keys + assert "doc3" in doc_keys + + +def test_vector_search_with_filters_hash( + redis_url, fake_embeddings: CharacterEmbeddings +) -> None: + """Test vector search with additional filters using hash storage.""" + index_config = { + "dims": fake_embeddings.dims, + "embed": fake_embeddings, + "distance_type": "cosine", + "fields": ["text"], + "vector_storage_type": "hash", + } + + ttl_config = {"default_ttl": 2, "refresh_on_read": True} + + with RedisStore.from_conn_string( + redis_url, index=index_config, ttl=ttl_config + ) as store: + store.setup() + + # Insert test documents + docs = [ + ("doc1", {"text": "red apple", "color": "red", "score": 4.5}), + ("doc2", {"text": "red car", "color": "red", "score": 3.0}), + ("doc3", {"text": "green apple", "color": "green", "score": 4.0}), + ("doc4", {"text": "blue car", "color": "blue", "score": 3.5}), + ] + + for key, value in docs: + store.put(("test",), key, value) + + # Search for "apple" within red items + results = store.search(("test",), query="apple", filter={"color": "red"}) + assert len(results) >= 1 + # Doc1 should be the closest match for "apple" with color=red + assert results[0].key == "doc1" + + # Search for "car" within red items + results = store.search(("test",), query="car", filter={"color": "red"}) + assert len(results) >= 1 + # Doc2 should be the closest match for "car" with color=red + assert results[0].key == "doc2" + + +def test_vector_update_with_score_verification_hash( + redis_url, fake_embeddings: CharacterEmbeddings +) -> None: + """Test that updating items properly updates their embeddings with hash storage.""" + index_config = { + "dims": fake_embeddings.dims, + "embed": fake_embeddings, + "distance_type": "cosine", + "fields": ["text"], + "vector_storage_type": "hash", + } + + ttl_config = {"default_ttl": 2, "refresh_on_read": True} + + with RedisStore.from_conn_string( + redis_url, index=index_config, ttl=ttl_config + ) as store: + store.setup() + + store.put(("test",), "doc1", {"text": "zany zebra xylophone"}) + store.put(("test",), "doc2", {"text": "something about dogs"}) + + # Search for a term similar to doc1's content + results_initial = store.search(("test",), query="zany xylophone") + assert len(results_initial) >= 1 + assert results_initial[0].key == "doc1" + initial_score = results_initial[0].score + + # Update doc1 to be about dogs instead + store.put(("test",), "doc1", {"text": "new text about dogs"}) + + # The original query should now match doc1 less strongly + results_after = store.search(("test",), query="zany xylophone") + assert len(results_after) >= 1 + after_score = next((r.score for r in results_after if r.key == "doc1"), None) + if after_score is not None: + assert after_score < initial_score + + # A dog-related query should now match doc1 more strongly + results_new = store.search(("test",), query="dogs text") + doc1_score = next((r.score for r in results_new if r.key == "doc1"), None) + assert doc1_score is not None