Skip to content

Commit 446fba7

Browse files
authored
Add CuVSResourceManager for controlling parallelism and pooling of cuVS resources (#132670)
1 parent 9ad0367 commit 446fba7

File tree

10 files changed

+398
-80
lines changed

10 files changed

+398
-80
lines changed

x-pack/plugin/gpu/src/internalClusterTest/java/org/elasticsearch/plugin/gpu/GPUIndexIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.search.vectors.KnnSearchBuilder;
1616
import org.elasticsearch.test.ESIntegTestCase;
1717
import org.elasticsearch.xpack.gpu.GPUPlugin;
18+
import org.elasticsearch.xpack.gpu.GPUSupport;
1819

1920
import java.util.Collection;
2021
import java.util.List;
@@ -32,6 +33,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
3233
}
3334

3435
public void testBasic() {
36+
assumeTrue("cuvs not supported", GPUSupport.isSupported(false));
3537
final int dims = randomIntBetween(4, 128);
3638
final int[] numDocs = new int[] { randomIntBetween(1, 100), 1, 2, randomIntBetween(1, 100) };
3739
createIndex(dims);
@@ -45,6 +47,7 @@ public void testBasic() {
4547
}
4648

4749
public void testSearchWithoutGPU() {
50+
assumeTrue("cuvs not supported", GPUSupport.isSupported(false));
4851
final int dims = randomIntBetween(4, 128);
4952
final int numDocs = randomIntBetween(1, 500);
5053
createIndex(dims);

x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/GPUPlugin.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
*/
77
package org.elasticsearch.xpack.gpu;
88

9-
import com.nvidia.cuvs.CuVSResources;
10-
119
import org.elasticsearch.common.util.FeatureFlag;
1210
import org.elasticsearch.index.IndexSettings;
1311
import org.elasticsearch.index.mapper.vectors.DenseVectorFieldMapper;
@@ -34,17 +32,16 @@ public VectorsFormatProvider getVectorsFormatProvider() {
3432
+ "]"
3533
);
3634
}
37-
CuVSResources resources = GPUVectorsFormat.cuVSResourcesOrNull(true);
38-
if (resources == null) {
35+
if (GPUSupport.isSupported(true) == false) {
3936
throw new IllegalArgumentException(
4037
"[index.vectors.indexing.use_gpu] was set to [true], but GPU resources are not accessible on the node."
4138
);
4239
}
4340
return new GPUVectorsFormat();
4441
}
45-
if ((gpuMode == IndexSettings.GpuMode.AUTO)
42+
if (gpuMode == IndexSettings.GpuMode.AUTO
4643
&& vectorIndexTypeSupported(indexOptions.getType())
47-
&& GPUVectorsFormat.cuVSResourcesOrNull(false) != null) {
44+
&& GPUSupport.isSupported(false)) {
4845
return new GPUVectorsFormat();
4946
}
5047
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.gpu;
9+
10+
import com.nvidia.cuvs.CuVSResources;
11+
12+
import org.elasticsearch.logging.LogManager;
13+
import org.elasticsearch.logging.Logger;
14+
15+
public class GPUSupport {
16+
17+
private static final Logger LOG = LogManager.getLogger(GPUSupport.class);
18+
19+
/** Tells whether the platform supports cuvs. */
20+
public static boolean isSupported(boolean logError) {
21+
try (var resources = cuVSResourcesOrNull(logError)) {
22+
if (resources != null) {
23+
return true;
24+
}
25+
}
26+
return false;
27+
}
28+
29+
/** Returns a resources if supported, otherwise null. */
30+
public static CuVSResources cuVSResourcesOrNull(boolean logError) {
31+
try {
32+
var resources = CuVSResources.create();
33+
return resources;
34+
} catch (UnsupportedOperationException uoe) {
35+
if (logError) {
36+
String msg = "";
37+
if (uoe.getMessage() == null) {
38+
msg = "Runtime Java version: " + Runtime.version().feature();
39+
} else {
40+
msg = ": " + uoe.getMessage();
41+
}
42+
LOG.warn("GPU based vector indexing is not supported on this platform or java version; " + msg);
43+
}
44+
} catch (Throwable t) {
45+
if (logError) {
46+
if (t instanceof ExceptionInInitializerError ex) {
47+
t = ex.getCause();
48+
}
49+
LOG.warn("Exception occurred during creation of cuvs resources. " + t);
50+
}
51+
}
52+
return null;
53+
}
54+
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.gpu.codec;
9+
10+
import com.nvidia.cuvs.CuVSResources;
11+
12+
import org.elasticsearch.xpack.gpu.GPUSupport;
13+
14+
import java.nio.file.Path;
15+
import java.util.Objects;
16+
import java.util.concurrent.ArrayBlockingQueue;
17+
import java.util.concurrent.BlockingQueue;
18+
19+
/**
20+
* A manager of {@link com.nvidia.cuvs.CuVSResources}. There is one manager per GPU.
21+
*
22+
* <p>All access to GPU resources is mediated through a manager. A manager helps coordinate usage threads to:
23+
* <ul>
24+
* <li>ensure single-threaded access to any particular resource at a time</li>
25+
* <li>Control the total number of concurrent operations that may be performed on a GPU</li>
26+
* <li>Pool resources, to avoid frequent creation and destruction, which are expensive operations. </li>
27+
* </ul>
28+
*
29+
* <p> Fundamentally, a resource is used in compute and memory bound operations. The former occurs prior to the latter, e.g.
30+
* index build (compute), followed by a copy/process of the newly built index (memory). The manager allows the resource
31+
* user to indicate that compute is complete before releasing the resources. This can help improve parallelism of compute
32+
* on the GPU - allowing the next compute operation to proceed before releasing the resources.
33+
*
34+
*/
35+
public interface CuVSResourceManager {
36+
37+
/**
38+
* Acquires a resource from the manager.
39+
*
40+
* <p>A manager can use the given parameters, numVectors and dims, to estimate the potential
41+
* effect on GPU memory and compute usage to determine whether to give out
42+
* another resource or wait for a resources to be returned before giving out another.
43+
*/
44+
// numVectors and dims are currently unused, but could be used along with GPU metadata,
45+
// memory, generation, etc, when acquiring for 10M x 1536 dims, or 100,000 x 128 dims,
46+
// to give out a resources or not.
47+
ManagedCuVSResources acquire(int numVectors, int dims) throws InterruptedException;
48+
49+
/** Marks the resources as finished with regard to compute. */
50+
void finishedComputation(ManagedCuVSResources resources);
51+
52+
/** Returns the given resource to the manager. */
53+
void release(ManagedCuVSResources resources);
54+
55+
/** Shuts down the manager, releasing all open resources. */
56+
void shutdown();
57+
58+
/** Returns the system-wide pooling manager. */
59+
static CuVSResourceManager pooling() {
60+
return PoolingCuVSResourceManager.INSTANCE;
61+
}
62+
63+
/**
64+
* A manager that maintains a pool of resources.
65+
*/
66+
class PoolingCuVSResourceManager implements CuVSResourceManager {
67+
68+
static final int MAX_RESOURCES = 2;
69+
static final PoolingCuVSResourceManager INSTANCE = new PoolingCuVSResourceManager(MAX_RESOURCES);
70+
71+
final BlockingQueue<ManagedCuVSResources> pool;
72+
final int capacity;
73+
int createdCount;
74+
75+
public PoolingCuVSResourceManager(int capacity) {
76+
if (capacity < 1 || capacity > MAX_RESOURCES) {
77+
throw new IllegalArgumentException("Resource count must be between 1 and " + MAX_RESOURCES);
78+
}
79+
this.capacity = capacity;
80+
this.pool = new ArrayBlockingQueue<>(capacity);
81+
}
82+
83+
@Override
84+
public ManagedCuVSResources acquire(int numVectors, int dims) throws InterruptedException {
85+
ManagedCuVSResources res = pool.poll();
86+
if (res != null) {
87+
return res;
88+
}
89+
synchronized (this) {
90+
if (createdCount < capacity) {
91+
createdCount++;
92+
return new ManagedCuVSResources(Objects.requireNonNull(createNew()));
93+
}
94+
}
95+
// Otherwise, wait for one to be released
96+
return pool.take();
97+
}
98+
99+
// visible for testing
100+
protected CuVSResources createNew() {
101+
return GPUSupport.cuVSResourcesOrNull(true);
102+
}
103+
104+
@Override
105+
public void finishedComputation(ManagedCuVSResources resources) {
106+
// currently does nothing, but could allow acquire to return possibly blocked resources
107+
}
108+
109+
@Override
110+
public void release(ManagedCuVSResources resources) {
111+
var added = pool.offer(Objects.requireNonNull(resources));
112+
assert added : "Failed to release resource back to pool";
113+
}
114+
115+
@Override
116+
public void shutdown() {
117+
for (ManagedCuVSResources res : pool) {
118+
res.delegate.close();
119+
}
120+
pool.clear();
121+
}
122+
}
123+
124+
/** A managed resource. Cannot be closed. */
125+
final class ManagedCuVSResources implements CuVSResources {
126+
127+
final CuVSResources delegate;
128+
129+
ManagedCuVSResources(CuVSResources resources) {
130+
this.delegate = resources;
131+
}
132+
133+
@Override
134+
public ScopedAccess access() {
135+
return delegate.access();
136+
}
137+
138+
@Override
139+
public void close() {
140+
throw new UnsupportedOperationException("this resource is managed, cannot be closed by clients");
141+
}
142+
143+
@Override
144+
public Path tempDirectory() {
145+
return null;
146+
}
147+
148+
@Override
149+
public String toString() {
150+
return "ManagedCuVSResources[delegate=" + delegate + "]";
151+
}
152+
}
153+
}

x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/GPUToHNSWVectorsWriter.java

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import com.nvidia.cuvs.CagraIndex;
1111
import com.nvidia.cuvs.CagraIndexParams;
12-
import com.nvidia.cuvs.CuVSResources;
1312
import com.nvidia.cuvs.Dataset;
1413

1514
import org.apache.lucene.codecs.CodecUtil;
@@ -68,7 +67,7 @@ final class GPUToHNSWVectorsWriter extends KnnVectorsWriter {
6867
private static final long SHALLOW_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(GPUToHNSWVectorsWriter.class);
6968
private static final int LUCENE99_HNSW_DIRECT_MONOTONIC_BLOCK_SHIFT = 16;
7069

71-
private final CuVSResources cuVSResources;
70+
private final CuVSResourceManager cuVSResourceManager;
7271
private final SegmentWriteState segmentWriteState;
7372
private final IndexOutput meta, vectorIndex;
7473
private final int M;
@@ -78,10 +77,15 @@ final class GPUToHNSWVectorsWriter extends KnnVectorsWriter {
7877
private final List<FieldWriter> fields = new ArrayList<>();
7978
private boolean finished;
8079

81-
GPUToHNSWVectorsWriter(CuVSResources cuVSResources, SegmentWriteState state, int M, int beamWidth, FlatVectorsWriter flatVectorWriter)
82-
throws IOException {
83-
assert cuVSResources != null : "CuVSResources must not be null";
84-
this.cuVSResources = cuVSResources;
80+
GPUToHNSWVectorsWriter(
81+
CuVSResourceManager cuVSResourceManager,
82+
SegmentWriteState state,
83+
int M,
84+
int beamWidth,
85+
FlatVectorsWriter flatVectorWriter
86+
) throws IOException {
87+
assert cuVSResourceManager != null : "CuVSResources must not be null";
88+
this.cuVSResourceManager = cuVSResourceManager;
8589
this.M = M;
8690
this.flatVectorWriter = flatVectorWriter;
8791
this.beamWidth = beamWidth;
@@ -267,42 +271,52 @@ private String buildGPUIndex(VectorSimilarityFunction similarityFunction, Datase
267271
.withMetric(distanceType)
268272
.build();
269273

270-
// build index on GPU
271-
long startTime = System.nanoTime();
272-
var index = CagraIndex.newBuilder(cuVSResources).withDataset(dataset).withIndexParams(params).build();
273-
if (logger.isDebugEnabled()) {
274-
logger.debug("Carga index created in: {} ms; #num vectors: {}", (System.nanoTime() - startTime) / 1_000_000.0, dataset.size());
275-
}
276-
277-
// TODO: do serialization through MemorySegment instead of a temp file
278-
// serialize index for CPU consumption to the hnwslib format
279-
startTime = System.nanoTime();
280-
IndexOutput tempCagraHNSW = null;
281-
boolean success = false;
274+
var cuVSResources = cuVSResourceManager.acquire(dataset.size(), dataset.dimensions());
282275
try {
283-
tempCagraHNSW = segmentWriteState.directory.createTempOutput(
284-
vectorIndex.getName(),
285-
"cagra_hnws_temp",
286-
segmentWriteState.context
287-
);
288-
var tempCagraHNSWOutputStream = new IndexOutputOutputStream(tempCagraHNSW);
289-
index.serializeToHNSW(tempCagraHNSWOutputStream);
276+
long startTime = System.nanoTime();
277+
var indexBuilder = CagraIndex.newBuilder(cuVSResources).withDataset(dataset).withIndexParams(params);
278+
var index = indexBuilder.build();
279+
cuVSResourceManager.finishedComputation(cuVSResources);
290280
if (logger.isDebugEnabled()) {
291-
logger.debug("Carga index serialized to hnswlib format in: {} ms", (System.nanoTime() - startTime) / 1_000_000.0);
281+
logger.debug(
282+
"Carga index created in: {} ms; #num vectors: {}",
283+
(System.nanoTime() - startTime) / 1_000_000.0,
284+
dataset.size()
285+
);
292286
}
293-
success = true;
294-
} finally {
295-
index.destroyIndex();
296-
if (success) {
297-
org.elasticsearch.core.IOUtils.close(tempCagraHNSW);
298-
} else {
299-
if (tempCagraHNSW != null) {
300-
IOUtils.closeWhileHandlingException(tempCagraHNSW);
301-
org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(segmentWriteState.directory, tempCagraHNSW.getName());
287+
288+
// TODO: do serialization through MemorySegment instead of a temp file
289+
// serialize index for CPU consumption to the hnwslib format
290+
startTime = System.nanoTime();
291+
IndexOutput tempCagraHNSW = null;
292+
boolean success = false;
293+
try {
294+
tempCagraHNSW = segmentWriteState.directory.createTempOutput(
295+
vectorIndex.getName(),
296+
"cagra_hnws_temp",
297+
segmentWriteState.context
298+
);
299+
var tempCagraHNSWOutputStream = new IndexOutputOutputStream(tempCagraHNSW);
300+
index.serializeToHNSW(tempCagraHNSWOutputStream);
301+
if (logger.isDebugEnabled()) {
302+
logger.debug("Carga index serialized to hnswlib format in: {} ms", (System.nanoTime() - startTime) / 1_000_000.0);
303+
}
304+
success = true;
305+
} finally {
306+
index.destroyIndex();
307+
if (success) {
308+
org.elasticsearch.core.IOUtils.close(tempCagraHNSW);
309+
} else {
310+
if (tempCagraHNSW != null) {
311+
IOUtils.closeWhileHandlingException(tempCagraHNSW);
312+
org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(segmentWriteState.directory, tempCagraHNSW.getName());
313+
}
302314
}
303315
}
316+
return tempCagraHNSW.getName();
317+
} finally {
318+
cuVSResourceManager.release(cuVSResources);
304319
}
305-
return tempCagraHNSW.getName();
306320
}
307321

308322
@SuppressForbidden(reason = "require usage of Lucene's IOUtils#deleteFilesIgnoringExceptions(...)")

0 commit comments

Comments
 (0)