diff --git a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/ESGpuHnswVectorsWriter.java b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/ESGpuHnswVectorsWriter.java index df9b47ee5c62d..099ad22c5002e 100644 --- a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/ESGpuHnswVectorsWriter.java +++ b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/ESGpuHnswVectorsWriter.java @@ -150,9 +150,9 @@ public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException { flatVectorWriter.flush(maxDoc, sortMap); for (FieldWriter field : fields) { if (sortMap == null) { - writeField(field); + flushField(field); } else { - writeSortingField(field, sortMap); + flushSortingField(field, sortMap); } } } @@ -185,83 +185,56 @@ public long ramBytesUsed() { return total; } - private static final class DatasetOrVectors { - private final CuVSMatrix dataset; - private final float[][] vectors; - - static DatasetOrVectors fromArray(float[][] vectors) { - return new DatasetOrVectors( - vectors.length < MIN_NUM_VECTORS_FOR_GPU_BUILD ? null : CuVSMatrix.ofArray(vectors), - vectors.length < MIN_NUM_VECTORS_FOR_GPU_BUILD ? vectors : null - ); - } - - static DatasetOrVectors fromDataset(CuVSMatrix dataset) { - return new DatasetOrVectors(dataset, null); - } - - private DatasetOrVectors(CuVSMatrix dataset, float[][] vectors) { - this.dataset = dataset; - this.vectors = vectors; - validateState(); - } - - private void validateState() { - if ((dataset == null && vectors == null) || (dataset != null && vectors != null)) { - throw new IllegalStateException("Exactly one of dataset or vectors must be non-null"); - } - } - - int size() { - return dataset != null ? (int) dataset.size() : vectors.length; - } - - CuVSMatrix getDataset() { - return dataset; - } - - float[][] getVectors() { - return vectors; - } - } - - private void writeField(FieldWriter fieldWriter) throws IOException { + /** + * For FlatFieldVectorWriter we only need to support float[] during flush: during indexing users provide floats[], and pass floats to + * FlatFieldVectorWriter, even when we have a BYTE dataType (i.e. an "int8_hnsw" type). + * During merging, we use quantized data, so we need to support byte[] too (see {@link ESGpuHnswVectorsWriter#mergeOneField}), + * but not here. + * That's how our other current formats work: use floats during indexing, and quantized data to build graph during merging. + */ + private void flushField(FieldWriter fieldWriter) throws IOException { float[][] vectors = fieldWriter.flatFieldVectorsWriter.getVectors().toArray(float[][]::new); - writeFieldInternal(fieldWriter.fieldInfo, DatasetOrVectors.fromArray(vectors)); + try (CuVSMatrix dataset = vectors.length < MIN_NUM_VECTORS_FOR_GPU_BUILD ? null : CuVSMatrix.ofArray(vectors)) { + writeFieldInternal(fieldWriter.fieldInfo, dataset, vectors.length); + } } - private void writeSortingField(FieldWriter fieldData, Sorter.DocMap sortMap) throws IOException { + private void flushSortingField(FieldWriter fieldWriter, Sorter.DocMap sortMap) throws IOException { // The flatFieldVectorsWriter's flush method, called before this, has already sorted the vectors according to the sortMap. // We can now treat them as a simple, sorted list of vectors. - float[][] vectors = fieldData.flatFieldVectorsWriter.getVectors().toArray(float[][]::new); - writeFieldInternal(fieldData.fieldInfo, DatasetOrVectors.fromArray(vectors)); + float[][] vectors = fieldWriter.flatFieldVectorsWriter.getVectors().toArray(float[][]::new); + try (CuVSMatrix dataset = vectors.length < MIN_NUM_VECTORS_FOR_GPU_BUILD ? null : CuVSMatrix.ofArray(vectors)) { + writeFieldInternal(fieldWriter.fieldInfo, dataset, vectors.length); + } } - private void writeFieldInternal(FieldInfo fieldInfo, DatasetOrVectors datasetOrVectors) throws IOException { + private void writeFieldInternal(FieldInfo fieldInfo, CuVSMatrix dataset, int datasetSize) throws IOException { try { long vectorIndexOffset = vectorIndex.getFilePointer(); int[][] graphLevelNodeOffsets = new int[1][]; - HnswGraph mockGraph; - if (datasetOrVectors.getVectors() != null) { - int size = datasetOrVectors.size(); + final HnswGraph graph; + if (dataset == null) { if (logger.isDebugEnabled()) { - logger.debug("Skip building carga index; vectors length {} < {} (min for GPU)", size, MIN_NUM_VECTORS_FOR_GPU_BUILD); + logger.debug( + "Skip building carga index; vectors length {} < {} (min for GPU)", + datasetSize, + MIN_NUM_VECTORS_FOR_GPU_BUILD + ); } - mockGraph = writeGraph(size, graphLevelNodeOffsets); + graph = writeMockGraph(datasetSize, graphLevelNodeOffsets); } else { - var dataset = datasetOrVectors.getDataset(); var cuVSResources = cuVSResourceManager.acquire((int) dataset.size(), (int) dataset.columns(), dataset.dataType()); try { try (var index = buildGPUIndex(cuVSResources, fieldInfo.getVectorSimilarityFunction(), dataset)) { assert index != null : "GPU index should be built for field: " + fieldInfo.name; - mockGraph = writeGraph(index.getGraph(), graphLevelNodeOffsets); + graph = writeGraph(index.getGraph(), graphLevelNodeOffsets); } } finally { cuVSResourceManager.release(cuVSResources); } } long vectorIndexLength = vectorIndex.getFilePointer() - vectorIndexOffset; - writeMeta(fieldInfo, vectorIndexOffset, vectorIndexLength, datasetOrVectors.size(), mockGraph, graphLevelNodeOffsets); + writeMeta(fieldInfo, vectorIndexOffset, vectorIndexLength, datasetSize, graph, graphLevelNodeOffsets); } catch (IOException e) { throw e; } catch (Throwable t) { @@ -337,7 +310,7 @@ private HnswGraph writeGraph(CuVSMatrix cagraGraph, int[][] levelNodeOffsets) th } // create a mock graph where every node is connected to every other node - private HnswGraph writeGraph(int elementCount, int[][] levelNodeOffsets) throws IOException { + private HnswGraph writeMockGraph(int elementCount, int[][] levelNodeOffsets) throws IOException { if (elementCount == 0) { return null; } @@ -435,20 +408,52 @@ public void mergeOneField(FieldInfo fieldInfo, MergeState mergeState) throws IOE } } try (IndexInput in = mergeState.segmentInfo.dir.openInput(tempRawVectorsFileName, IOContext.DEFAULT)) { - DatasetOrVectors datasetOrVectors; var input = FilterIndexInput.unwrapOnlyTest(in); - if (input instanceof MemorySegmentAccessInput memorySegmentAccessInput && numVectors >= MIN_NUM_VECTORS_FOR_GPU_BUILD) { - var ds = DatasetUtils.getInstance() - .fromInput(memorySegmentAccessInput, numVectors, fieldInfo.getVectorDimension(), dataType); - datasetOrVectors = DatasetOrVectors.fromDataset(ds); + + final CuVSMatrix dataset; + if (numVectors >= MIN_NUM_VECTORS_FOR_GPU_BUILD) { + if (input instanceof MemorySegmentAccessInput memorySegmentAccessInput) { + // Direct access to mmapped file + dataset = DatasetUtils.getInstance() + .fromInput(memorySegmentAccessInput, numVectors, fieldInfo.getVectorDimension(), dataType); + } else { + logger.debug( + () -> "Cannot mmap merged raw vectors temporary file. IndexInput type [" + input.getClass().getSimpleName() + "]" + ); + + // Read vector-by-vector + var builder = CuVSMatrix.hostBuilder(numVectors, fieldInfo.getVectorDimension(), dataType); + + // During merging, we use quantized data, so we need to support byte[] too. + // That's how our current formats work: use floats during indexing, and quantized data to build a graph during merging. + if (dataType == CuVSMatrix.DataType.FLOAT) { + float[] vector = new float[fieldInfo.getVectorDimension()]; + for (int i = 0; i < numVectors; ++i) { + input.readFloats(vector, 0, fieldInfo.getVectorDimension()); + builder.addVector(vector); + } + } else { + assert dataType == CuVSMatrix.DataType.BYTE; + byte[] vector = new byte[fieldInfo.getVectorDimension()]; + for (int i = 0; i < numVectors; ++i) { + input.readBytes(vector, 0, fieldInfo.getVectorDimension()); + builder.addVector(vector); + } + } + dataset = builder.build(); + } } else { - // assert numVectors < MIN_NUM_VECTORS_FOR_GPU_BUILD : "numVectors: " + numVectors; // we don't really need real value for vectors here, // we just build a mock graph where every node is connected to every other node - float[][] vectors = new float[numVectors][fieldInfo.getVectorDimension()]; - datasetOrVectors = DatasetOrVectors.fromArray(vectors); + dataset = null; + } + try { + writeFieldInternal(fieldInfo, dataset, numVectors); + } finally { + if (dataset != null) { + dataset.close(); + } } - writeFieldInternal(fieldInfo, datasetOrVectors); } finally { org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(mergeState.segmentInfo.dir, tempRawVectorsFileName); } diff --git a/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/ESGpuHnswSQVectorsFormatTests.java b/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/ESGpuHnswSQVectorsFormatTests.java index 7c2dce8adcfec..8d639ccf58d32 100644 --- a/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/ESGpuHnswSQVectorsFormatTests.java +++ b/x-pack/plugin/gpu/src/test/java/org/elasticsearch/xpack/gpu/codec/ESGpuHnswSQVectorsFormatTests.java @@ -10,11 +10,13 @@ import org.apache.lucene.index.VectorEncoding; import org.apache.lucene.index.VectorSimilarityFunction; import org.apache.lucene.tests.index.BaseKnnVectorsFormatTestCase; +import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.tests.util.TestUtil; import org.elasticsearch.common.logging.LogConfigurator; import org.elasticsearch.xpack.gpu.GPUSupport; import org.junit.BeforeClass; +@LuceneTestCase.SuppressSysoutChecks(bugUrl = "https://github.com/rapidsai/cuvs/issues/1310") public class ESGpuHnswSQVectorsFormatTests extends BaseKnnVectorsFormatTestCase { static {