Skip to content

Commit 552376f

Browse files
author
Nitin Kanukolanu
committed
Formatting
1 parent be6b347 commit 552376f

File tree

1 file changed

+54
-55
lines changed

1 file changed

+54
-55
lines changed

redisvl/utils/migration.py

Lines changed: 54 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -26,26 +26,26 @@
2626

2727
class IndexMigrator:
2828
"""Helper class to migrate indices to SVS-VAMANA with compression.
29-
29+
3030
This class provides utilities to migrate existing FLAT or HNSW vector indices
3131
to SVS-VAMANA indices with compression, enabling significant memory savings.
32-
32+
3333
Example:
3434
.. code-block:: python
35-
35+
3636
from redisvl.index import SearchIndex
3737
from redisvl.utils import IndexMigrator
38-
38+
3939
# Load existing index
4040
old_index = SearchIndex.from_existing("my_flat_index")
41-
41+
4242
# Migrate to SVS-VAMANA with LVQ compression
4343
new_index = IndexMigrator.migrate_to_svs(
4444
old_index,
4545
compression="LVQ4x4",
4646
batch_size=1000
4747
)
48-
48+
4949
print(f"Migrated {new_index.info()['num_docs']} documents")
5050
"""
5151

@@ -60,10 +60,10 @@ def migrate_to_svs(
6060
progress_callback: Optional[Callable[[int, int], None]] = None,
6161
) -> "SearchIndex":
6262
"""Migrate an existing index to SVS-VAMANA with compression.
63-
63+
6464
This method creates a new SVS-VAMANA index and copies all data from the
6565
old index in batches. The old index is not modified or deleted.
66-
66+
6767
Args:
6868
old_index: The existing SearchIndex to migrate from.
6969
new_index_name: Name for the new index. If None, uses "{old_name}_svs".
@@ -74,20 +74,20 @@ def migrate_to_svs(
7474
batch_size: Number of documents to migrate per batch. Default: 1000.
7575
overwrite: Whether to overwrite the new index if it exists. Default: False.
7676
progress_callback: Optional callback function(current, total) for progress tracking.
77-
77+
7878
Returns:
7979
SearchIndex: The new SVS-VAMANA index with migrated data.
80-
80+
8181
Raises:
8282
RedisModuleVersionError: If Redis version doesn't support SVS-VAMANA.
8383
ValueError: If the old index has no vector fields or invalid parameters.
84-
84+
8585
Example:
8686
.. code-block:: python
87-
87+
8888
def progress(current, total):
8989
print(f"Migrated {current}/{total} documents")
90-
90+
9191
new_index = IndexMigrator.migrate_to_svs(
9292
old_index,
9393
compression="LVQ4x4",
@@ -106,35 +106,35 @@ def progress(current, total):
106106
raise RedisModuleVersionError.for_svs_vamana(
107107
caps.redis_version, caps.search_version
108108
)
109-
109+
110110
# Find vector fields in the old schema
111111
vector_fields = [
112112
(name, field)
113113
for name, field in old_index.schema.fields.items()
114114
if hasattr(field, "attrs") and hasattr(field.attrs, "algorithm")
115115
]
116-
116+
117117
if not vector_fields:
118118
raise ValueError("Old index has no vector fields to migrate")
119-
119+
120120
# Create new schema based on old schema
121121
new_schema_dict = old_index.schema.to_dict()
122-
122+
123123
# Update index name
124124
if new_index_name is None:
125125
new_index_name = f"{old_index.name}_svs"
126126
new_schema_dict["index"]["name"] = new_index_name
127127
new_schema_dict["index"]["prefix"] = new_index_name
128-
128+
129129
# Update vector fields to use SVS-VAMANA
130130
for field_dict in new_schema_dict["fields"]:
131131
if field_dict["type"] == "vector":
132132
attrs = field_dict.get("attrs", {})
133133
dims = attrs.get("dims")
134-
134+
135135
if dims is None:
136136
raise ValueError(f"Vector field '{field_dict['name']}' has no dims")
137-
137+
138138
# Use CompressionAdvisor if compression not specified
139139
if compression is None:
140140
config = CompressionAdvisor.recommend(
@@ -149,14 +149,14 @@ def progress(current, total):
149149
f"CompressionAdvisor recommended: {compression} "
150150
f"(reduce={reduce}) for {dims} dims"
151151
)
152-
152+
153153
# Update to SVS-VAMANA
154154
attrs["algorithm"] = "svs-vamana"
155155
attrs["compression"] = compression
156-
156+
157157
if reduce is not None:
158158
attrs["reduce"] = reduce
159-
159+
160160
# Set default SVS parameters if not present
161161
if "graph_max_degree" not in attrs:
162162
attrs["graph_max_degree"] = 40
@@ -168,31 +168,31 @@ def progress(current, total):
168168
# Default is 10240, minimum is 1024 (DEFAULT_BLOCK_SIZE)
169169
if "training_threshold" not in attrs:
170170
attrs["training_threshold"] = 1024
171-
171+
172172
# Create new index
173173
new_schema = IndexSchema.from_dict(new_schema_dict)
174174
new_index = SearchIndex(schema=new_schema, redis_client=old_index._redis_client)
175175
new_index.create(overwrite=overwrite)
176-
176+
177177
logger.info(f"Created new SVS-VAMANA index: {new_index_name}")
178-
178+
179179
# Get total document count
180180
old_info = old_index.info()
181181
total_docs = int(old_info.get("num_docs", 0))
182-
182+
183183
if total_docs == 0:
184184
logger.warning("Old index has no documents to migrate")
185185
return new_index
186-
186+
187187
logger.info(f"Migrating {total_docs} documents in batches of {batch_size}")
188-
188+
189189
# Migrate data in batches using pagination
190190
migrated_count = 0
191191
query = FilterQuery(
192192
filter_expression=FilterExpression("*"),
193193
return_fields=list(old_index.schema.fields.keys()),
194194
)
195-
195+
196196
for batch in old_index.paginate(query, page_size=batch_size):
197197
if batch:
198198
# The 'id' field contains the full Redis key (e.g., "prefix:ulid")
@@ -223,7 +223,7 @@ def progress(current, total):
223223
progress_callback(migrated_count, total_docs)
224224

225225
logger.debug(f"Migrated {migrated_count}/{total_docs} documents")
226-
226+
227227
logger.info(f"Migration complete: {migrated_count} documents migrated")
228228

229229
# Verify migration by checking Redis keys (not index count)
@@ -262,10 +262,10 @@ async def migrate_to_svs_async(
262262
progress_callback: Optional[Callable[[int, int], None]] = None,
263263
) -> "AsyncSearchIndex":
264264
"""Asynchronously migrate an existing index to SVS-VAMANA with compression.
265-
265+
266266
This is the async version of migrate_to_svs(). See migrate_to_svs() for
267267
detailed documentation.
268-
268+
269269
Args:
270270
old_index: The existing AsyncSearchIndex to migrate from.
271271
new_index_name: Name for the new index. If None, uses "{old_name}_svs".
@@ -274,16 +274,16 @@ async def migrate_to_svs_async(
274274
batch_size: Number of documents to migrate per batch. Default: 1000.
275275
overwrite: Whether to overwrite the new index if it exists. Default: False.
276276
progress_callback: Optional callback function(current, total) for progress.
277-
277+
278278
Returns:
279279
AsyncSearchIndex: The new SVS-VAMANA index with migrated data.
280-
280+
281281
Example:
282282
.. code-block:: python
283-
283+
284284
async def progress(current, total):
285285
print(f"Migrated {current}/{total} documents")
286-
286+
287287
new_index = await IndexMigrator.migrate_to_svs_async(
288288
old_index,
289289
compression="LVQ4x4",
@@ -303,34 +303,34 @@ async def progress(current, total):
303303
raise RedisModuleVersionError.for_svs_vamana(
304304
caps.redis_version, caps.search_version
305305
)
306-
306+
307307
# Find vector fields
308308
vector_fields = [
309309
(name, field)
310310
for name, field in old_index.schema.fields.items()
311311
if hasattr(field, "attrs") and hasattr(field.attrs, "algorithm")
312312
]
313-
313+
314314
if not vector_fields:
315315
raise ValueError("Old index has no vector fields to migrate")
316-
316+
317317
# Create new schema
318318
new_schema_dict = old_index.schema.to_dict()
319-
319+
320320
if new_index_name is None:
321321
new_index_name = f"{old_index.name}_svs"
322322
new_schema_dict["index"]["name"] = new_index_name
323323
new_schema_dict["index"]["prefix"] = new_index_name
324-
324+
325325
# Update vector fields
326326
for field_dict in new_schema_dict["fields"]:
327327
if field_dict["type"] == "vector":
328328
attrs = field_dict.get("attrs", {})
329329
dims = attrs.get("dims")
330-
330+
331331
if dims is None:
332332
raise ValueError(f"Vector field '{field_dict['name']}' has no dims")
333-
333+
334334
if compression is None:
335335
config = CompressionAdvisor.recommend(
336336
dims=dims,
@@ -344,13 +344,13 @@ async def progress(current, total):
344344
f"CompressionAdvisor recommended: {compression} "
345345
f"(reduce={reduce}) for {dims} dims"
346346
)
347-
347+
348348
attrs["algorithm"] = "svs-vamana"
349349
attrs["compression"] = compression
350-
350+
351351
if reduce is not None:
352352
attrs["reduce"] = reduce
353-
353+
354354
if "graph_max_degree" not in attrs:
355355
attrs["graph_max_degree"] = 40
356356
if "construction_window_size" not in attrs:
@@ -364,26 +364,26 @@ async def progress(current, total):
364364
new_schema = IndexSchema.from_dict(new_schema_dict)
365365
new_index = AsyncSearchIndex(schema=new_schema, redis_client=client)
366366
await new_index.create(overwrite=overwrite)
367-
367+
368368
logger.info(f"Created new SVS-VAMANA index: {new_index_name}")
369-
369+
370370
# Get total document count
371371
old_info = await old_index.info()
372372
total_docs = int(old_info.get("num_docs", 0))
373-
373+
374374
if total_docs == 0:
375375
logger.warning("Old index has no documents to migrate")
376376
return new_index
377-
377+
378378
logger.info(f"Migrating {total_docs} documents in batches of {batch_size}")
379-
379+
380380
# Migrate data in batches
381381
migrated_count = 0
382382
query = FilterQuery(
383383
filter_expression=FilterExpression("*"),
384384
return_fields=list(old_index.schema.fields.keys()),
385385
)
386-
386+
387387
async for batch in old_index.paginate(query, page_size=batch_size):
388388
if batch:
389389
# Extract document IDs from full Redis keys
@@ -406,7 +406,7 @@ async def progress(current, total):
406406
progress_callback(migrated_count, total_docs)
407407

408408
logger.debug(f"Migrated {migrated_count}/{total_docs} documents")
409-
409+
410410
logger.info(f"Migration complete: {migrated_count} documents migrated")
411411

412412
# Verify migration by checking Redis keys
@@ -430,4 +430,3 @@ async def progress(current, total):
430430
)
431431

432432
return new_index
433-

0 commit comments

Comments
 (0)