99from core .common .models import (
1010 IndexBuildParameters ,
1111 VectorsDataset ,
12+ IndexSerializationMode ,
1213)
1314
1415from core .common .models .index_builder .faiss import (
2021 get_omp_num_threads ,
2122)
2223from core .common .models .index_build_parameters import DataType
23- from core .common .models .index_builder import CagraGraphBuildAlgo
24+ from core .common .models .index_builder import (
25+ CagraGraphBuildAlgo ,
26+ FaissCpuBuildIndexOutput ,
27+ )
2428from core .index_builder .interface import IndexBuildService
2529from timeit import default_timer as timer
2630
31+ from typing import Union , Any
32+ from io import BytesIO
2733import logging
2834
2935logger = logging .getLogger (__name__ )
@@ -42,19 +48,19 @@ def build_index(
4248 self ,
4349 index_build_parameters : IndexBuildParameters ,
4450 vectors_dataset : VectorsDataset ,
45- cpu_index_output_file_path : str ,
46- ) -> None :
51+ ) -> Any :
4752 """
4853 Orchestrates the workflow of
4954 - creating a GPU Index for the specified vectors dataset,
5055 - converting into CPU compatible Index
51- - and writing the CPU Index to disc
5256 Uses the faiss library methods to achieve this.
5357
5458 Args:
5559 vectors_dataset: The set of vectors to index
5660 index_build_parameters: The API Index Build parameters
57- cpu_index_output_file_path: The complete file path on disc to write the cpuIndex to.
61+
62+ Returns:
63+ The CPU compatible Index
5864 """
5965 faiss_gpu_index_cagra_builder = None
6066 faiss_index_hnsw_cagra_builder = None
@@ -139,17 +145,7 @@ def build_index(
139145 f"{ index_conversion_time :.2f} seconds"
140146 )
141147
142- # Step 3: Write CPU Index to persistent storage
143- t1 = timer ()
144- faiss_index_hnsw_cagra_builder .write_cpu_index (
145- faiss_cpu_build_index_output , cpu_index_output_file_path
146- )
147- t2 = timer ()
148- index_write_time = t2 - t1
149- logger .debug (
150- f"Index write time for vector path { index_build_parameters .vector_path } : "
151- f"{ index_write_time :.2f} seconds"
152- )
148+ return faiss_cpu_build_index_output
153149
154150 except Exception as exception :
155151 # Clean up GPU Index Response if orchestrator failed after GPU Index Creation
@@ -161,17 +157,70 @@ def build_index(
161157 f"Failed to clean up GPU index response for vector path "
162158 f"{ index_build_parameters .vector_path } : { e } "
163159 )
160+ raise Exception (
161+ f"Faiss Index Build Service build_index workflow failed: { exception } "
162+ ) from exception
164163
165- # Clean up CPU Index Response if orchestrator failed after CPU Index Creation
166- if faiss_cpu_build_index_output is not None :
167- try :
168- faiss_cpu_build_index_output .cleanup ()
169- except Exception as e :
170- logger .error (
171- f"Failed to clean up CPU index response for vector path "
172- f"{ index_build_parameters .vector_path } : { e } "
173- )
164+ def write_cpu_index (
165+ self ,
166+ cpu_build_index_output : FaissCpuBuildIndexOutput ,
167+ index_build_parameters : IndexBuildParameters ,
168+ index_serialization_mode : IndexSerializationMode ,
169+ output_destination : Union [str , BytesIO ],
170+ ) -> None :
171+ """
172+ Method to write the CPU index and vector dataset id mapping to storage destination,
173+ for uploading later to remote object store.
174+ Uses faiss write_index library method to achieve this
175+
176+ Args:
177+ cpu_build_index_output (FaissCpuBuildIndexOutput): A datamodel containing the created GPU Faiss Index
178+ and dataset Vector Ids components
179+ index_build_parameters: The API Index Build parameters
180+ index_serialization_mode: The serialization mode for the index
181+ output_destination (Union[str, BytesIO]): Output destination for the index.
182+ - str: File path for disk writing
183+ - BytesIO: Existing buffer to write to
184+ """
185+ try :
186+ t1 = timer ()
187+ writer = None
188+ if (
189+ index_serialization_mode == IndexSerializationMode .MEMORY
190+ and isinstance (
191+ output_destination , BytesIO
192+ ) # for resolving mypy errors, we add this check
193+ ):
194+ # Use a faiss callback to serialize directly to the buffer
195+ # We use a faiss callback instead of faiss.serialize_index
196+ # to avoid the extra memory overhead of the c++ vector data structure
197+ # and numpy arrays created in serialize_index method
198+ writer = faiss .PyCallbackIOWriter (output_destination .write )
199+ else :
200+ # Otherwise, treat the output destination like a file path
201+ writer = output_destination
202+ if index_build_parameters .data_type != DataType .BINARY :
203+ faiss .write_index (cpu_build_index_output .index_id_map , writer )
204+ else :
205+ faiss .write_index_binary (cpu_build_index_output .index_id_map , writer )
206+ # Free memory taken by CPU Index
207+ cpu_build_index_output .cleanup ()
208+ t2 = timer ()
209+ index_write_time = t2 - t1
210+ logger .debug (
211+ f"Index write time for vector path { index_build_parameters .vector_path } : "
212+ f"{ index_write_time :.2f} seconds"
213+ )
214+ except Exception as exception :
215+ # Clean up CPU Index Response if write failed
216+ try :
217+ cpu_build_index_output .cleanup ()
218+ except Exception as e :
219+ logger .error (
220+ f"Failed to clean up CPU index response for vector path "
221+ f"{ index_build_parameters .vector_path } : { e } "
222+ )
174223
175224 raise Exception (
176- f"Faiss Index Build Service build_index workflow failed: { exception } "
225+ f"Faiss Index Build Service write_index workflow failed: { exception } "
177226 ) from exception
0 commit comments