Skip to content

Commit ce56550

Browse files
committed
Add CuVSResourceManager for controlling parallelism and pooling of cuVS resources
1 parent 9ad0367 commit ce56550

File tree

9 files changed

+268
-80
lines changed

9 files changed

+268
-80
lines changed

x-pack/plugin/gpu/src/internalClusterTest/java/org/elasticsearch/plugin/gpu/GPUIndexIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.search.vectors.KnnSearchBuilder;
1616
import org.elasticsearch.test.ESIntegTestCase;
1717
import org.elasticsearch.xpack.gpu.GPUPlugin;
18+
import org.elasticsearch.xpack.gpu.GPUSupport;
1819

1920
import java.util.Collection;
2021
import java.util.List;
@@ -32,6 +33,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
3233
}
3334

3435
public void testBasic() {
36+
assumeTrue("cuvs not supported", GPUSupport.isSupported(false));
3537
final int dims = randomIntBetween(4, 128);
3638
final int[] numDocs = new int[] { randomIntBetween(1, 100), 1, 2, randomIntBetween(1, 100) };
3739
createIndex(dims);
@@ -45,6 +47,7 @@ public void testBasic() {
4547
}
4648

4749
public void testSearchWithoutGPU() {
50+
assumeTrue("cuvs not supported", GPUSupport.isSupported(false));
4851
final int dims = randomIntBetween(4, 128);
4952
final int numDocs = randomIntBetween(1, 500);
5053
createIndex(dims);

x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/GPUPlugin.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
*/
77
package org.elasticsearch.xpack.gpu;
88

9-
import com.nvidia.cuvs.CuVSResources;
10-
119
import org.elasticsearch.common.util.FeatureFlag;
1210
import org.elasticsearch.index.IndexSettings;
1311
import org.elasticsearch.index.mapper.vectors.DenseVectorFieldMapper;
@@ -34,17 +32,16 @@ public VectorsFormatProvider getVectorsFormatProvider() {
3432
+ "]"
3533
);
3634
}
37-
CuVSResources resources = GPUVectorsFormat.cuVSResourcesOrNull(true);
38-
if (resources == null) {
35+
if (GPUSupport.isSupported(true) == false) {
3936
throw new IllegalArgumentException(
4037
"[index.vectors.indexing.use_gpu] was set to [true], but GPU resources are not accessible on the node."
4138
);
4239
}
4340
return new GPUVectorsFormat();
4441
}
45-
if ((gpuMode == IndexSettings.GpuMode.AUTO)
42+
if (gpuMode == IndexSettings.GpuMode.AUTO
4643
&& vectorIndexTypeSupported(indexOptions.getType())
47-
&& GPUVectorsFormat.cuVSResourcesOrNull(false) != null) {
44+
&& GPUSupport.isSupported(false)) {
4845
return new GPUVectorsFormat();
4946
}
5047
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.gpu;
9+
10+
import com.nvidia.cuvs.CuVSResources;
11+
12+
import org.elasticsearch.logging.LogManager;
13+
import org.elasticsearch.logging.Logger;
14+
15+
public class GPUSupport {
16+
17+
private static final Logger LOG = LogManager.getLogger(GPUSupport.class);
18+
19+
/** Tells whether the platform supports cuvs. */
20+
public static boolean isSupported(boolean logError) {
21+
try (var resources = cuVSResourcesOrNull(logError)) {
22+
if (resources != null) {
23+
return true;
24+
}
25+
}
26+
return false;
27+
}
28+
29+
/** Returns a resources if supported, otherwise null. */
30+
public static CuVSResources cuVSResourcesOrNull(boolean logError) {
31+
try {
32+
var resources = CuVSResources.create();
33+
return resources;
34+
} catch (UnsupportedOperationException uoe) {
35+
if (logError) {
36+
String msg = "";
37+
if (uoe.getMessage() == null) {
38+
msg = "Runtime Java version: " + Runtime.version().feature();
39+
} else {
40+
msg = ": " + uoe.getMessage();
41+
}
42+
LOG.warn("GPU based vector indexing is not supported on this platform or java version; " + msg);
43+
}
44+
} catch (Throwable t) {
45+
if (logError) {
46+
if (t instanceof ExceptionInInitializerError ex) {
47+
t = ex.getCause();
48+
}
49+
LOG.warn("Exception occurred during creation of cuvs resources. " + t);
50+
}
51+
}
52+
return null;
53+
}
54+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.gpu.codec;
9+
10+
import com.nvidia.cuvs.CuVSResources;
11+
12+
import org.elasticsearch.xpack.gpu.GPUSupport;
13+
14+
import java.nio.file.Path;
15+
import java.util.Objects;
16+
import java.util.concurrent.ArrayBlockingQueue;
17+
import java.util.concurrent.BlockingQueue;
18+
19+
/**
20+
* A manager of {@link com.nvidia.cuvs.CuVSResources}. There is one manager per GPU.
21+
*
22+
* <p>All access to GPU resources is mediated through a manager. A manager helps coordinate usage threads to:
23+
* <ul>
24+
* <li>ensure single-threaded access to any particular resource at a time</li>
25+
* <li>Control the total number of concurrent operations that may be performed on a GPU</li>
26+
* <li>Pool resources, to avoid frequent creation and destruction, which are expensive operations. </li>
27+
* </ul>
28+
*
29+
* <p> Fundamentally, a resource is used in compute and memory bound operations. The former occurs prior to the latter, e.g.
30+
* index build (compute), followed by a copy/process of the newly built index (memory). The manager allows the resource
31+
* user to indicate that compute is complete before releasing the resources. This can help improve parallelism of compute
32+
* on the GPU - allowing the next compute operation to proceed before releasing the resources.
33+
*
34+
*/
35+
public interface CuVSResourceManager {
36+
37+
/**
38+
* Acquires a resource from the manager.
39+
*
40+
* <p>A manager can use the given parameters, numVectors and dims, to estimate the potential
41+
* effect on GPU memory and compute usage to determine whether to give out
42+
* another resource or wait for a resources to be returned before giving out another.
43+
*/
44+
// numVectors and dims are currently unused
45+
ManagedCuVSResources acquire(int numVectors, int dims) throws InterruptedException;
46+
47+
/** Marks the resources as finished with regard to compute. */
48+
void finishedComputation(ManagedCuVSResources resources);
49+
50+
/** Returns the given resource to the manager. */
51+
void release(ManagedCuVSResources resources);
52+
53+
/** Shuts down the manager, releasing all open resources. */
54+
void shutdown();
55+
56+
/** Returns the system-wide pooling manager. */
57+
static CuVSResourceManager pooling() {
58+
return PoolingCuVSResourceManager.INSTANCE;
59+
}
60+
61+
/**
62+
* A manager that maintains a pool of resources.
63+
*/
64+
final class PoolingCuVSResourceManager implements CuVSResourceManager {
65+
66+
static final int MAX_RESOURCES = 4;
67+
static final PoolingCuVSResourceManager INSTANCE = new PoolingCuVSResourceManager(MAX_RESOURCES);
68+
69+
final BlockingQueue<ManagedCuVSResources> pool;
70+
final int capacity;
71+
int createdCount;
72+
73+
public PoolingCuVSResourceManager(int capacity) {
74+
if (capacity < 1 || capacity > MAX_RESOURCES) {
75+
throw new IllegalArgumentException("Resource count must be between 1 and " + MAX_RESOURCES);
76+
}
77+
this.capacity = capacity;
78+
this.pool = new ArrayBlockingQueue<>(capacity);
79+
}
80+
81+
// GPU metadata - memory, generation, ...
82+
// // acquiring for 10M x 1536 dims
83+
// // acquiring for 100,000 x 128 dims
84+
@Override
85+
public ManagedCuVSResources acquire(int numVectors, int dims) throws InterruptedException {
86+
ManagedCuVSResources res = pool.poll();
87+
if (res != null) {
88+
return res;
89+
}
90+
synchronized (this) {
91+
if (createdCount < capacity) {
92+
createdCount++;
93+
var r = Objects.requireNonNull(GPUSupport.cuVSResourcesOrNull(true));
94+
return new ManagedCuVSResources(r);
95+
}
96+
}
97+
// Otherwise, wait for one to be released
98+
return pool.take();
99+
}
100+
101+
@Override
102+
public void finishedComputation(ManagedCuVSResources resources) {
103+
// currently does nothing, but could allow acquire to return possibly blocked resources
104+
}
105+
106+
@Override
107+
public void release(ManagedCuVSResources resources) {
108+
var added = pool.offer(Objects.requireNonNull(resources));
109+
assert added : "Failed to release resource back to pool";
110+
}
111+
112+
@Override
113+
public void shutdown() {
114+
for (ManagedCuVSResources res : pool) {
115+
res.delegate.close();
116+
}
117+
pool.clear();
118+
}
119+
}
120+
121+
final class ManagedCuVSResources implements CuVSResources {
122+
123+
final CuVSResources delegate;
124+
125+
ManagedCuVSResources(CuVSResources resources) {
126+
this.delegate = resources;
127+
}
128+
129+
@Override
130+
public ScopedAccess access() {
131+
return delegate.access();
132+
}
133+
134+
@Override
135+
public void close() {
136+
throw new UnsupportedOperationException("this resource is managed, cannot be closed by clients");
137+
}
138+
139+
@Override
140+
public Path tempDirectory() {
141+
return null;
142+
}
143+
}
144+
}

x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/GPUToHNSWVectorsWriter.java

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import com.nvidia.cuvs.CagraIndex;
1111
import com.nvidia.cuvs.CagraIndexParams;
12-
import com.nvidia.cuvs.CuVSResources;
1312
import com.nvidia.cuvs.Dataset;
1413

1514
import org.apache.lucene.codecs.CodecUtil;
@@ -68,7 +67,7 @@ final class GPUToHNSWVectorsWriter extends KnnVectorsWriter {
6867
private static final long SHALLOW_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(GPUToHNSWVectorsWriter.class);
6968
private static final int LUCENE99_HNSW_DIRECT_MONOTONIC_BLOCK_SHIFT = 16;
7069

71-
private final CuVSResources cuVSResources;
70+
private final CuVSResourceManager cuVSResourceManager;
7271
private final SegmentWriteState segmentWriteState;
7372
private final IndexOutput meta, vectorIndex;
7473
private final int M;
@@ -78,10 +77,15 @@ final class GPUToHNSWVectorsWriter extends KnnVectorsWriter {
7877
private final List<FieldWriter> fields = new ArrayList<>();
7978
private boolean finished;
8079

81-
GPUToHNSWVectorsWriter(CuVSResources cuVSResources, SegmentWriteState state, int M, int beamWidth, FlatVectorsWriter flatVectorWriter)
82-
throws IOException {
83-
assert cuVSResources != null : "CuVSResources must not be null";
84-
this.cuVSResources = cuVSResources;
80+
GPUToHNSWVectorsWriter(
81+
CuVSResourceManager cuVSResourceManager,
82+
SegmentWriteState state,
83+
int M,
84+
int beamWidth,
85+
FlatVectorsWriter flatVectorWriter
86+
) throws IOException {
87+
assert cuVSResourceManager != null : "CuVSResources must not be null";
88+
this.cuVSResourceManager = cuVSResourceManager;
8589
this.M = M;
8690
this.flatVectorWriter = flatVectorWriter;
8791
this.beamWidth = beamWidth;
@@ -267,42 +271,52 @@ private String buildGPUIndex(VectorSimilarityFunction similarityFunction, Datase
267271
.withMetric(distanceType)
268272
.build();
269273

270-
// build index on GPU
271-
long startTime = System.nanoTime();
272-
var index = CagraIndex.newBuilder(cuVSResources).withDataset(dataset).withIndexParams(params).build();
273-
if (logger.isDebugEnabled()) {
274-
logger.debug("Carga index created in: {} ms; #num vectors: {}", (System.nanoTime() - startTime) / 1_000_000.0, dataset.size());
275-
}
276-
277-
// TODO: do serialization through MemorySegment instead of a temp file
278-
// serialize index for CPU consumption to the hnwslib format
279-
startTime = System.nanoTime();
280-
IndexOutput tempCagraHNSW = null;
281-
boolean success = false;
274+
var cuVSResources = cuVSResourceManager.acquire(dataset.size(), dataset.dimensions());
282275
try {
283-
tempCagraHNSW = segmentWriteState.directory.createTempOutput(
284-
vectorIndex.getName(),
285-
"cagra_hnws_temp",
286-
segmentWriteState.context
287-
);
288-
var tempCagraHNSWOutputStream = new IndexOutputOutputStream(tempCagraHNSW);
289-
index.serializeToHNSW(tempCagraHNSWOutputStream);
276+
long startTime = System.nanoTime();
277+
var indexBuilder = CagraIndex.newBuilder(cuVSResources).withDataset(dataset).withIndexParams(params);
278+
var index = indexBuilder.build();
279+
cuVSResourceManager.finishedComputation(cuVSResources);
290280
if (logger.isDebugEnabled()) {
291-
logger.debug("Carga index serialized to hnswlib format in: {} ms", (System.nanoTime() - startTime) / 1_000_000.0);
281+
logger.debug(
282+
"Carga index created in: {} ms; #num vectors: {}",
283+
(System.nanoTime() - startTime) / 1_000_000.0,
284+
dataset.size()
285+
);
292286
}
293-
success = true;
294-
} finally {
295-
index.destroyIndex();
296-
if (success) {
297-
org.elasticsearch.core.IOUtils.close(tempCagraHNSW);
298-
} else {
299-
if (tempCagraHNSW != null) {
300-
IOUtils.closeWhileHandlingException(tempCagraHNSW);
301-
org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(segmentWriteState.directory, tempCagraHNSW.getName());
287+
288+
// TODO: do serialization through MemorySegment instead of a temp file
289+
// serialize index for CPU consumption to the hnwslib format
290+
startTime = System.nanoTime();
291+
IndexOutput tempCagraHNSW = null;
292+
boolean success = false;
293+
try {
294+
tempCagraHNSW = segmentWriteState.directory.createTempOutput(
295+
vectorIndex.getName(),
296+
"cagra_hnws_temp",
297+
segmentWriteState.context
298+
);
299+
var tempCagraHNSWOutputStream = new IndexOutputOutputStream(tempCagraHNSW);
300+
index.serializeToHNSW(tempCagraHNSWOutputStream);
301+
if (logger.isDebugEnabled()) {
302+
logger.debug("Carga index serialized to hnswlib format in: {} ms", (System.nanoTime() - startTime) / 1_000_000.0);
303+
}
304+
success = true;
305+
} finally {
306+
index.destroyIndex();
307+
if (success) {
308+
org.elasticsearch.core.IOUtils.close(tempCagraHNSW);
309+
} else {
310+
if (tempCagraHNSW != null) {
311+
IOUtils.closeWhileHandlingException(tempCagraHNSW);
312+
org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(segmentWriteState.directory, tempCagraHNSW.getName());
313+
}
302314
}
303315
}
316+
return tempCagraHNSW.getName();
317+
} finally {
318+
cuVSResourceManager.release(cuVSResources);
304319
}
305-
return tempCagraHNSW.getName();
306320
}
307321

308322
@SuppressForbidden(reason = "require usage of Lucene's IOUtils#deleteFilesIgnoringExceptions(...)")

0 commit comments

Comments
 (0)