diff --git a/langgraph/store/redis/__init__.py b/langgraph/store/redis/__init__.py index cf27be0..f664211 100644 --- a/langgraph/store/redis/__init__.py +++ b/langgraph/store/redis/__init__.py @@ -367,7 +367,7 @@ def _batch_search_ops( query_vectors = dict(zip([idx for idx, _ in embedding_requests], vectors)) # Process each search operation - for (idx, op), (query_str, params) in zip(search_ops, queries): + for (idx, op), (query_str, params, limit, offset) in zip(search_ops, queries): if op.query and idx in query_vectors: # Vector similarity search vector = query_vectors[idx] @@ -376,7 +376,7 @@ def _batch_search_ops( vector_field_name="embedding", filter_expression=f"@prefix:{_namespace_to_text(op.namespace_prefix)}*", return_fields=["prefix", "key", "vector_distance"], - num_results=op.limit, + num_results=limit, # Use the user-specified limit ) vector_results = self.vector_index.query(vector_query) @@ -469,8 +469,10 @@ def _batch_search_ops( results[idx] = items else: # Regular search - query = Query(query_str) - # Get all potential matches for filtering + # Create a query with LIMIT and OFFSET parameters + query = Query(query_str).paging(offset, limit) + + # Execute search with limit and offset applied by Redis res = self.store_index.search(query) items = [] refresh_keys = [] # Track keys that need TTL refreshed @@ -505,10 +507,7 @@ def _batch_search_ops( items.append(_row_to_search_item(_decode_ns(data["prefix"]), data)) - # Apply pagination after filtering - if params: - limit, offset = params - items = items[offset : offset + limit] + # Note: Pagination is now handled by Redis, no need to slice items manually # Refresh TTL if requested if op.refresh_ttl and refresh_keys and self.ttl_config: diff --git a/langgraph/store/redis/aio.py b/langgraph/store/redis/aio.py index dae7b21..167370f 100644 --- a/langgraph/store/redis/aio.py +++ b/langgraph/store/redis/aio.py @@ -648,7 +648,7 @@ async def _batch_search_ops( query_vectors = dict(zip([idx for idx, _ in embedding_requests], vectors)) # Process each search operation - for (idx, op), (query_str, params) in zip(search_ops, queries): + for (idx, op), (query_str, params, limit, offset) in zip(search_ops, queries): if op.query and idx in query_vectors: # Vector similarity search vector = query_vectors[idx] @@ -658,7 +658,7 @@ async def _batch_search_ops( vector_field_name="embedding", filter_expression=f"@prefix:{_namespace_to_text(op.namespace_prefix)}*", return_fields=["prefix", "key", "vector_distance"], - num_results=op.limit, + num_results=limit, # Use the user-specified limit ) ) @@ -722,8 +722,10 @@ async def _batch_search_ops( results[idx] = items else: # Regular search - query = Query(query_str) - # Get all potential matches for filtering + # Create a query with LIMIT and OFFSET parameters + query = Query(query_str).paging(offset, limit) + + # Execute search with limit and offset applied by Redis res = await self.store_index.search(query) items = [] @@ -746,12 +748,9 @@ async def _batch_search_ops( continue items.append(_row_to_search_item(_decode_ns(data["prefix"]), data)) - # Apply pagination after filtering - if params: - limit, offset = params - items = items[offset : offset + limit] + # Note: Pagination is now handled by Redis, no need to slice items manually - results[idx] = items + results[idx] = items async def _batch_list_namespaces_ops( self, diff --git a/langgraph/store/redis/base.py b/langgraph/store/redis/base.py index 1ca7381..3deb60a 100644 --- a/langgraph/store/redis/base.py +++ b/langgraph/store/redis/base.py @@ -398,7 +398,7 @@ def _prepare_batch_PUT_queries( def _get_batch_search_queries( self, search_ops: Sequence[tuple[int, SearchOp]], - ) -> tuple[list[tuple[str, list]], list[tuple[int, str]]]: + ) -> tuple[list[tuple[str, list, int, int]], list[tuple[int, str]]]: """Convert search operations into Redis queries.""" queries = [] embedding_requests = [] @@ -413,8 +413,10 @@ def _get_batch_search_queries( embedding_requests.append((idx, op.query)) query = " ".join(filter_conditions) if filter_conditions else "*" - params = [op.limit, op.offset] if op.limit or op.offset else [] - queries.append((query, params)) + limit = op.limit if op.limit is not None else 10 + offset = op.offset if op.offset is not None else 0 + params = [limit, offset] + queries.append((query, params, limit, offset)) return queries, embedding_requests diff --git a/tests/test_async_search_limit.py b/tests/test_async_search_limit.py new file mode 100644 index 0000000..1c42660 --- /dev/null +++ b/tests/test_async_search_limit.py @@ -0,0 +1,73 @@ +"""Tests for AsyncRedisStore search limits.""" + +from __future__ import annotations + +import pytest +import pytest_asyncio + +from langgraph.store.redis import AsyncRedisStore + + +@pytest_asyncio.fixture(scope="function") +async def async_store(redis_url) -> AsyncRedisStore: + """Fixture to create an AsyncRedisStore.""" + async with AsyncRedisStore(redis_url) as store: + await store.setup() # Initialize indices + yield store + + +@pytest.mark.asyncio +async def test_async_search_with_larger_limit(async_store: AsyncRedisStore) -> None: + """Test async search with limit > 10.""" + # Create 15 test documents + for i in range(15): + await async_store.aput( + ("test_namespace",), f"key{i}", {"data": f"value{i}", "index": i} + ) + + # Search with a limit of 15 + results = await async_store.asearch(("test_namespace",), limit=15) + + # Should return all 15 results + assert len(results) == 15, f"Expected 15 results, got {len(results)}" + + # Verify we have all the items + result_keys = {item.key for item in results} + expected_keys = {f"key{i}" for i in range(15)} + assert result_keys == expected_keys + + +@pytest.mark.asyncio +async def test_async_vector_search_with_larger_limit(redis_url) -> None: + """Test async vector search with limit > 10.""" + from tests.embed_test_utils import CharacterEmbeddings + + # Create vector store with embeddings + embeddings = CharacterEmbeddings(dims=4) + index_config = { + "dims": embeddings.dims, + "embed": embeddings, + "distance_type": "cosine", + "fields": ["text"], + } + + async with AsyncRedisStore(redis_url, index=index_config) as store: + await store.setup() + + # Create 15 test documents + for i in range(15): + # Create documents with slightly different texts + await store.aput( + ("test_namespace",), f"key{i}", {"text": f"sample text {i}", "index": i} + ) + + # Search with a limit of 15 + results = await store.asearch(("test_namespace",), query="sample", limit=15) + + # Should return all 15 results + assert len(results) == 15, f"Expected 15 results, got {len(results)}" + + # Verify we have all the items + result_keys = {item.key for item in results} + expected_keys = {f"key{i}" for i in range(15)} + assert result_keys == expected_keys diff --git a/tests/test_search_limit.py b/tests/test_search_limit.py new file mode 100644 index 0000000..49a3098 --- /dev/null +++ b/tests/test_search_limit.py @@ -0,0 +1,68 @@ +"""Tests for RedisStore search limits.""" + +from __future__ import annotations + +import pytest + +from langgraph.store.redis import RedisStore + + +@pytest.fixture(scope="function") +def store(redis_url) -> RedisStore: + """Fixture to create a Redis store.""" + with RedisStore.from_conn_string(redis_url) as store: + store.setup() # Initialize indices + yield store + + +def test_search_with_larger_limit(store: RedisStore) -> None: + """Test search with limit > 10.""" + # Create 15 test documents + for i in range(15): + store.put(("test_namespace",), f"key{i}", {"data": f"value{i}", "index": i}) + + # Search with a limit of 15 + results = store.search(("test_namespace",), limit=15) + + # Should return all 15 results + assert len(results) == 15, f"Expected 15 results, got {len(results)}" + + # Verify we have all the items + result_keys = {item.key for item in results} + expected_keys = {f"key{i}" for i in range(15)} + assert result_keys == expected_keys + + +def test_vector_search_with_larger_limit(redis_url) -> None: + """Test vector search with limit > 10.""" + from tests.embed_test_utils import CharacterEmbeddings + + # Create vector store with embeddings + embeddings = CharacterEmbeddings(dims=4) + index_config = { + "dims": embeddings.dims, + "embed": embeddings, + "distance_type": "cosine", + "fields": ["text"], + } + + with RedisStore.from_conn_string(redis_url, index=index_config) as store: + store.setup() + + # Create 15 test documents + for i in range(15): + # Create documents with slightly different texts + store.put( + ("test_namespace",), f"key{i}", {"text": f"sample text {i}", "index": i} + ) + + # Search with a limit of 15 + results = store.search(("test_namespace",), query="sample", limit=15) + + # Should return all 15 results + assert len(results) == 15, f"Expected 15 results, got {len(results)}" + + # Verify we have all the items + result_keys = {item.key for item in results} + expected_keys = {f"key{i}" for i in range(15)} + assert result_keys == expected_keys