Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.search.vectors.KnnSearchBuilder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.gpu.GPUPlugin;
import org.elasticsearch.xpack.gpu.GPUSupport;

import java.util.Collection;
import java.util.List;
Expand All @@ -32,6 +33,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

public void testBasic() {
assumeTrue("cuvs not supported", GPUSupport.isSupported(false));
final int dims = randomIntBetween(4, 128);
final int[] numDocs = new int[] { randomIntBetween(1, 100), 1, 2, randomIntBetween(1, 100) };
createIndex(dims);
Expand All @@ -45,6 +47,7 @@ public void testBasic() {
}

public void testSearchWithoutGPU() {
assumeTrue("cuvs not supported", GPUSupport.isSupported(false));
final int dims = randomIntBetween(4, 128);
final int numDocs = randomIntBetween(1, 500);
createIndex(dims);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
*/
package org.elasticsearch.xpack.gpu;

import com.nvidia.cuvs.CuVSResources;

import org.elasticsearch.common.util.FeatureFlag;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.vectors.DenseVectorFieldMapper;
Expand All @@ -34,17 +32,16 @@ public VectorsFormatProvider getVectorsFormatProvider() {
+ "]"
);
}
CuVSResources resources = GPUVectorsFormat.cuVSResourcesOrNull(true);
if (resources == null) {
if (GPUSupport.isSupported(true) == false) {
throw new IllegalArgumentException(
"[index.vectors.indexing.use_gpu] was set to [true], but GPU resources are not accessible on the node."
);
}
return new GPUVectorsFormat();
}
if ((gpuMode == IndexSettings.GpuMode.AUTO)
if (gpuMode == IndexSettings.GpuMode.AUTO
&& vectorIndexTypeSupported(indexOptions.getType())
&& GPUVectorsFormat.cuVSResourcesOrNull(false) != null) {
&& GPUSupport.isSupported(false)) {
return new GPUVectorsFormat();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.gpu;

import com.nvidia.cuvs.CuVSResources;

import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;

public class GPUSupport {

private static final Logger LOG = LogManager.getLogger(GPUSupport.class);

/** Tells whether the platform supports cuvs. */
public static boolean isSupported(boolean logError) {
try (var resources = cuVSResourcesOrNull(logError)) {
if (resources != null) {
return true;
}
}
return false;
}

/** Returns a resources if supported, otherwise null. */
public static CuVSResources cuVSResourcesOrNull(boolean logError) {
try {
var resources = CuVSResources.create();
return resources;
} catch (UnsupportedOperationException uoe) {
if (logError) {
String msg = "";
if (uoe.getMessage() == null) {
msg = "Runtime Java version: " + Runtime.version().feature();
} else {
msg = ": " + uoe.getMessage();
}
LOG.warn("GPU based vector indexing is not supported on this platform or java version; " + msg);
}
} catch (Throwable t) {
if (logError) {
if (t instanceof ExceptionInInitializerError ex) {
t = ex.getCause();
}
LOG.warn("Exception occurred during creation of cuvs resources. " + t);
}
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.gpu.codec;

import com.nvidia.cuvs.CuVSResources;

import org.elasticsearch.xpack.gpu.GPUSupport;

import java.nio.file.Path;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
* A manager of {@link com.nvidia.cuvs.CuVSResources}. There is one manager per GPU.
*
* <p>All access to GPU resources is mediated through a manager. A manager helps coordinate usage threads to:
* <ul>
* <li>ensure single-threaded access to any particular resource at a time</li>
* <li>Control the total number of concurrent operations that may be performed on a GPU</li>
* <li>Pool resources, to avoid frequent creation and destruction, which are expensive operations. </li>
* </ul>
*
* <p> Fundamentally, a resource is used in compute and memory bound operations. The former occurs prior to the latter, e.g.
* index build (compute), followed by a copy/process of the newly built index (memory). The manager allows the resource
* user to indicate that compute is complete before releasing the resources. This can help improve parallelism of compute
* on the GPU - allowing the next compute operation to proceed before releasing the resources.
*
*/
public interface CuVSResourceManager {

/**
* Acquires a resource from the manager.
*
* <p>A manager can use the given parameters, numVectors and dims, to estimate the potential
* effect on GPU memory and compute usage to determine whether to give out
* another resource or wait for a resources to be returned before giving out another.
*/
// numVectors and dims are currently unused, but could be used along with GPU metadata,
// memory, generation, etc, when acquiring for 10M x 1536 dims, or 100,000 x 128 dims,
// to give out a resources or not.
ManagedCuVSResources acquire(int numVectors, int dims) throws InterruptedException;

/** Marks the resources as finished with regard to compute. */
void finishedComputation(ManagedCuVSResources resources);

/** Returns the given resource to the manager. */
void release(ManagedCuVSResources resources);

/** Shuts down the manager, releasing all open resources. */
void shutdown();

/** Returns the system-wide pooling manager. */
static CuVSResourceManager pooling() {
return PoolingCuVSResourceManager.INSTANCE;
}

/**
* A manager that maintains a pool of resources.
*/
class PoolingCuVSResourceManager implements CuVSResourceManager {

static final int MAX_RESOURCES = 2;
static final PoolingCuVSResourceManager INSTANCE = new PoolingCuVSResourceManager(MAX_RESOURCES);

final BlockingQueue<ManagedCuVSResources> pool;
final int capacity;
int createdCount;

public PoolingCuVSResourceManager(int capacity) {
if (capacity < 1 || capacity > MAX_RESOURCES) {
throw new IllegalArgumentException("Resource count must be between 1 and " + MAX_RESOURCES);
}
this.capacity = capacity;
this.pool = new ArrayBlockingQueue<>(capacity);
}

@Override
public ManagedCuVSResources acquire(int numVectors, int dims) throws InterruptedException {
ManagedCuVSResources res = pool.poll();
if (res != null) {
return res;
}
synchronized (this) {
if (createdCount < capacity) {
createdCount++;
return new ManagedCuVSResources(Objects.requireNonNull(createNew()));
}
}
// Otherwise, wait for one to be released
return pool.take();
}

// visible for testing
protected CuVSResources createNew() {
return GPUSupport.cuVSResourcesOrNull(true);
}

@Override
public void finishedComputation(ManagedCuVSResources resources) {
// currently does nothing, but could allow acquire to return possibly blocked resources
}

@Override
public void release(ManagedCuVSResources resources) {
var added = pool.offer(Objects.requireNonNull(resources));
assert added : "Failed to release resource back to pool";
}

@Override
public void shutdown() {
for (ManagedCuVSResources res : pool) {
res.delegate.close();
}
pool.clear();
}
}

/** A managed resource. Cannot be closed. */
final class ManagedCuVSResources implements CuVSResources {

final CuVSResources delegate;

ManagedCuVSResources(CuVSResources resources) {
this.delegate = resources;
}

@Override
public ScopedAccess access() {
return delegate.access();
}

@Override
public void close() {
throw new UnsupportedOperationException("this resource is managed, cannot be closed by clients");
}

@Override
public Path tempDirectory() {
return null;
}

@Override
public String toString() {
return "ManagedCuVSResources[delegate=" + delegate + "]";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import com.nvidia.cuvs.CagraIndex;
import com.nvidia.cuvs.CagraIndexParams;
import com.nvidia.cuvs.CuVSResources;
import com.nvidia.cuvs.Dataset;

import org.apache.lucene.codecs.CodecUtil;
Expand Down Expand Up @@ -68,7 +67,7 @@ final class GPUToHNSWVectorsWriter extends KnnVectorsWriter {
private static final long SHALLOW_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(GPUToHNSWVectorsWriter.class);
private static final int LUCENE99_HNSW_DIRECT_MONOTONIC_BLOCK_SHIFT = 16;

private final CuVSResources cuVSResources;
private final CuVSResourceManager cuVSResourceManager;
private final SegmentWriteState segmentWriteState;
private final IndexOutput meta, vectorIndex;
private final int M;
Expand All @@ -78,10 +77,15 @@ final class GPUToHNSWVectorsWriter extends KnnVectorsWriter {
private final List<FieldWriter> fields = new ArrayList<>();
private boolean finished;

GPUToHNSWVectorsWriter(CuVSResources cuVSResources, SegmentWriteState state, int M, int beamWidth, FlatVectorsWriter flatVectorWriter)
throws IOException {
assert cuVSResources != null : "CuVSResources must not be null";
this.cuVSResources = cuVSResources;
GPUToHNSWVectorsWriter(
CuVSResourceManager cuVSResourceManager,
SegmentWriteState state,
int M,
int beamWidth,
FlatVectorsWriter flatVectorWriter
) throws IOException {
assert cuVSResourceManager != null : "CuVSResources must not be null";
this.cuVSResourceManager = cuVSResourceManager;
this.M = M;
this.flatVectorWriter = flatVectorWriter;
this.beamWidth = beamWidth;
Expand Down Expand Up @@ -267,42 +271,52 @@ private String buildGPUIndex(VectorSimilarityFunction similarityFunction, Datase
.withMetric(distanceType)
.build();

// build index on GPU
long startTime = System.nanoTime();
var index = CagraIndex.newBuilder(cuVSResources).withDataset(dataset).withIndexParams(params).build();
if (logger.isDebugEnabled()) {
logger.debug("Carga index created in: {} ms; #num vectors: {}", (System.nanoTime() - startTime) / 1_000_000.0, dataset.size());
}

// TODO: do serialization through MemorySegment instead of a temp file
// serialize index for CPU consumption to the hnwslib format
startTime = System.nanoTime();
IndexOutput tempCagraHNSW = null;
boolean success = false;
var cuVSResources = cuVSResourceManager.acquire(dataset.size(), dataset.dimensions());
try {
tempCagraHNSW = segmentWriteState.directory.createTempOutput(
vectorIndex.getName(),
"cagra_hnws_temp",
segmentWriteState.context
);
var tempCagraHNSWOutputStream = new IndexOutputOutputStream(tempCagraHNSW);
index.serializeToHNSW(tempCagraHNSWOutputStream);
long startTime = System.nanoTime();
var indexBuilder = CagraIndex.newBuilder(cuVSResources).withDataset(dataset).withIndexParams(params);
var index = indexBuilder.build();
cuVSResourceManager.finishedComputation(cuVSResources);
if (logger.isDebugEnabled()) {
logger.debug("Carga index serialized to hnswlib format in: {} ms", (System.nanoTime() - startTime) / 1_000_000.0);
logger.debug(
"Carga index created in: {} ms; #num vectors: {}",
(System.nanoTime() - startTime) / 1_000_000.0,
dataset.size()
);
}
success = true;
} finally {
index.destroyIndex();
if (success) {
org.elasticsearch.core.IOUtils.close(tempCagraHNSW);
} else {
if (tempCagraHNSW != null) {
IOUtils.closeWhileHandlingException(tempCagraHNSW);
org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(segmentWriteState.directory, tempCagraHNSW.getName());

// TODO: do serialization through MemorySegment instead of a temp file
// serialize index for CPU consumption to the hnwslib format
startTime = System.nanoTime();
IndexOutput tempCagraHNSW = null;
boolean success = false;
try {
tempCagraHNSW = segmentWriteState.directory.createTempOutput(
vectorIndex.getName(),
"cagra_hnws_temp",
segmentWriteState.context
);
var tempCagraHNSWOutputStream = new IndexOutputOutputStream(tempCagraHNSW);
index.serializeToHNSW(tempCagraHNSWOutputStream);
if (logger.isDebugEnabled()) {
logger.debug("Carga index serialized to hnswlib format in: {} ms", (System.nanoTime() - startTime) / 1_000_000.0);
}
success = true;
} finally {
index.destroyIndex();
if (success) {
org.elasticsearch.core.IOUtils.close(tempCagraHNSW);
} else {
if (tempCagraHNSW != null) {
IOUtils.closeWhileHandlingException(tempCagraHNSW);
org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(segmentWriteState.directory, tempCagraHNSW.getName());
}
}
}
return tempCagraHNSW.getName();
} finally {
cuVSResourceManager.release(cuVSResources);
}
return tempCagraHNSW.getName();
}

@SuppressForbidden(reason = "require usage of Lucene's IOUtils#deleteFilesIgnoringExceptions(...)")
Expand Down
Loading