Skip to content

Commit 32e24fb

Browse files
authored
[GPU] Support for performance profiling (#136021) (#136264)
In order to better understand the performance characteristics of vector indexing with a GPU, this PR introduces 2 changes: - changes to KnnIndexTester (more logging, support different write buffer sizes in input, support async-profiler - more logging in the GPU codec
1 parent 45fa66c commit 32e24fb

File tree

8 files changed

+124
-16
lines changed

8 files changed

+124
-16
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ build/
4646
**/.local*
4747
.vagrant/
4848
/logs/
49+
**/target/
4950

5051
# osx stuff
5152
.DS_Store

qa/vector/build.gradle

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,34 @@ tasks.register("checkVec", JavaExec) {
5151
systemProperty "es.logger.out", "console"
5252
systemProperty "es.logger.level", "INFO" // Change to DEBUG if needed
5353
systemProperty 'es.nativelibs.path', TestUtil.getTestLibraryPath(file("../../libs/native/libraries/build/platform/").toString())
54-
jvmArgs '-Xms4g', '-Xmx4g', '-Djava.util.concurrent.ForkJoinPool.common.parallelism=8', '-XX:+UnlockDiagnosticVMOptions', '-XX:+DebugNonSafepoints', '-XX:+HeapDumpOnOutOfMemoryError'
54+
jvmArgs '-Xms16g', '-Xmx16g', '-Djava.util.concurrent.ForkJoinPool.common.parallelism=8', '-XX:+UnlockDiagnosticVMOptions', '-XX:+DebugNonSafepoints', '-XX:+HeapDumpOnOutOfMemoryError'
5555
if (buildParams.getRuntimeJavaVersion().map { it.majorVersion.toInteger() }.get() >= 21) {
5656
jvmArgs '--add-modules=jdk.incubator.vector', '--enable-native-access=ALL-UNNAMED'
5757
}
5858
if (System.getenv("DO_PROFILING") != null) {
5959
jvmArgs '-XX:StartFlightRecording=dumponexit=true,maxsize=250M,filename=knn.jfr,settings=profile.jfc'
6060
}
61+
def asyncProfilerPath = System.getProperty("asyncProfiler.path", null)
62+
if (asyncProfilerPath != null) {
63+
def asyncProfilerEvent = System.getProperty("asyncProfiler.event", "cpu")
64+
if (OS.current().equals(OS.MAC)) {
65+
def asyncProfilerAgent = "${asyncProfilerPath}/lib/libasyncProfiler.dylib"
66+
println "Using async-profiler agent ${asyncProfilerAgent}"
67+
68+
// MacOS implementation of async-profiler does not support wall clock profiling with another event.
69+
// Wall clock times can be obtained separately invoking this task with `-DasyncProfiler.event=wall`
70+
jvmArgs "-agentpath:${asyncProfilerAgent}=start,event=${asyncProfilerEvent},interval=10ms,file=${layout.buildDirectory.asFile.get()}/tmp/elasticsearch-0_%t_%p.jfr"
71+
} else if (OS.current().equals(OS.LINUX)) {
72+
// Linux implementation of async-profiler uses perf_event, which allows wall clock profiling with another event (cpu)
73+
def additionalWallInterval = asyncProfilerEvent.equals("cpu") ? ",wall=50ms" : ""
74+
75+
def asyncProfilerAgent = "${asyncProfilerPath}/lib/libasyncProfiler.so"
76+
println "Using async-profiler agent ${asyncProfilerAgent}"
77+
jvmArgs "-agentpath:${asyncProfilerAgent}=start,event=${asyncProfilerEvent},interval=10ms${additionalWallInterval},file=${layout.buildDirectory.asFile.get()}/tmp/elasticsearch-0_%t_%p.jfr"
78+
} else {
79+
println "Ignoring 'asyncProfiler.path': not available on ${OS.current()}";
80+
}
81+
}
6182
if (buildParams.getIsRuntimeJavaHomeSet()) {
6283
executable = "${buildParams.runtimeJavaHome.get()}/bin/java" + (OS.current() == OS.WINDOWS ? '.exe' : '')
6384
} else {

qa/vector/src/main/java/org/elasticsearch/test/knn/CmdLineArgs.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@
99

1010
package org.elasticsearch.test.knn;
1111

12+
import org.apache.lucene.index.IndexWriterConfig;
1213
import org.apache.lucene.index.VectorEncoding;
1314
import org.apache.lucene.index.VectorSimilarityFunction;
1415
import org.elasticsearch.common.Strings;
1516
import org.elasticsearch.core.PathUtils;
17+
import org.elasticsearch.monitor.jvm.JvmInfo;
1618
import org.elasticsearch.xcontent.ObjectParser;
1719
import org.elasticsearch.xcontent.ParseField;
1820
import org.elasticsearch.xcontent.ToXContentObject;
@@ -53,7 +55,9 @@ record CmdLineArgs(
5355
VectorEncoding vectorEncoding,
5456
int dimensions,
5557
boolean earlyTermination,
56-
KnnIndexTester.MergePolicyType mergePolicy
58+
KnnIndexTester.MergePolicyType mergePolicy,
59+
double writerBufferSizeInMb,
60+
int writerMaxBufferedDocs
5761
) implements ToXContentObject {
5862

5963
static final ParseField DOC_VECTORS_FIELD = new ParseField("doc_vectors");
@@ -82,6 +86,15 @@ record CmdLineArgs(
8286
static final ParseField FILTER_SELECTIVITY_FIELD = new ParseField("filter_selectivity");
8387
static final ParseField SEED_FIELD = new ParseField("seed");
8488
static final ParseField MERGE_POLICY_FIELD = new ParseField("merge_policy");
89+
static final ParseField WRITER_BUFFER_MB_FIELD = new ParseField("writer_buffer_mb");
90+
static final ParseField WRITER_BUFFER_DOCS_FIELD = new ParseField("writer_buffer_docs");
91+
92+
/** By default, in ES the default writer buffer size is 10% of the heap space
93+
* (see {@code IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING}).
94+
* We configure the Java heap size for this tool in {@code build.gradle}; currently we default to 16GB, so in that case
95+
* the buffer size would be 1.6GB.
96+
*/
97+
static final double DEFAULT_WRITER_BUFFER_MB = (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() / (1024.0 * 1024.0)) * 0.1;
8598

8699
static CmdLineArgs fromXContent(XContentParser parser) throws IOException {
87100
Builder builder = PARSER.apply(parser, null);
@@ -117,6 +130,8 @@ static CmdLineArgs fromXContent(XContentParser parser) throws IOException {
117130
PARSER.declareFloat(Builder::setFilterSelectivity, FILTER_SELECTIVITY_FIELD);
118131
PARSER.declareLong(Builder::setSeed, SEED_FIELD);
119132
PARSER.declareString(Builder::setMergePolicy, MERGE_POLICY_FIELD);
133+
PARSER.declareDouble(Builder::setWriterBufferMb, WRITER_BUFFER_MB_FIELD);
134+
PARSER.declareInt(Builder::setWriterMaxBufferedDocs, WRITER_BUFFER_DOCS_FIELD);
120135
}
121136

122137
@Override
@@ -152,6 +167,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
152167
builder.field(EARLY_TERMINATION_FIELD.getPreferredName(), earlyTermination);
153168
builder.field(FILTER_SELECTIVITY_FIELD.getPreferredName(), filterSelectivity);
154169
builder.field(SEED_FIELD.getPreferredName(), seed);
170+
builder.field(WRITER_BUFFER_MB_FIELD.getPreferredName(), writerBufferSizeInMb);
171+
builder.field(WRITER_BUFFER_DOCS_FIELD.getPreferredName(), writerMaxBufferedDocs);
155172
return builder.endObject();
156173
}
157174

@@ -186,6 +203,13 @@ static class Builder {
186203
private float filterSelectivity = 1f;
187204
private long seed = 1751900822751L;
188205
private KnnIndexTester.MergePolicyType mergePolicy = null;
206+
private double writerBufferSizeInMb = DEFAULT_WRITER_BUFFER_MB;
207+
208+
/**
209+
* Elasticsearch does not set this explicitly, and in Lucene this setting is
210+
* disabled by default (writer flushes by RAM usage).
211+
*/
212+
private int writerMaxBufferedDocs = IndexWriterConfig.DISABLE_AUTO_FLUSH;
189213

190214
public Builder setDocVectors(List<String> docVectors) {
191215
if (docVectors == null || docVectors.isEmpty()) {
@@ -316,6 +340,16 @@ public Builder setMergePolicy(String mergePolicy) {
316340
return this;
317341
}
318342

343+
public Builder setWriterBufferMb(double writerBufferSizeInMb) {
344+
this.writerBufferSizeInMb = writerBufferSizeInMb;
345+
return this;
346+
}
347+
348+
public Builder setWriterMaxBufferedDocs(int writerMaxBufferedDocs) {
349+
this.writerMaxBufferedDocs = writerMaxBufferedDocs;
350+
return this;
351+
}
352+
319353
public CmdLineArgs build() {
320354
if (docVectors == null) {
321355
throw new IllegalArgumentException("Document vectors path must be provided");
@@ -350,7 +384,9 @@ public CmdLineArgs build() {
350384
vectorEncoding,
351385
dimensions,
352386
earlyTermination,
353-
mergePolicy
387+
mergePolicy,
388+
writerBufferSizeInMb,
389+
writerMaxBufferedDocs
354390
);
355391
}
356392
}

qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexTester.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,9 @@ public static void main(String[] args) throws Exception {
240240
cmdLineArgs.dimensions(),
241241
cmdLineArgs.vectorSpace(),
242242
cmdLineArgs.numDocs(),
243-
mergePolicy
243+
mergePolicy,
244+
cmdLineArgs.writerBufferSizeInMb(),
245+
cmdLineArgs.writerMaxBufferedDocs()
244246
);
245247
if (cmdLineArgs.reindex() == false && Files.exists(indexPath) == false) {
246248
throw new IllegalArgumentException("Index path does not exist: " + indexPath);
@@ -301,7 +303,14 @@ public String toString() {
301303
return "No results available.";
302304
}
303305

304-
String[] indexingHeaders = { "index_name", "index_type", "num_docs", "index_time(ms)", "force_merge_time(ms)", "num_segments" };
306+
String[] indexingHeaders = {
307+
"index_name",
308+
"index_type",
309+
"num_docs",
310+
"doc_add_time(ms)",
311+
"total_index_time(ms)",
312+
"force_merge_time(ms)",
313+
"num_segments" };
305314

306315
// Define column headers
307316
String[] searchHeaders = {
@@ -327,6 +336,7 @@ public String toString() {
327336
indexResult.indexName,
328337
indexResult.indexType,
329338
Integer.toString(indexResult.numDocs),
339+
Long.toString(indexResult.docAddTimeMS),
330340
Long.toString(indexResult.indexTimeMS),
331341
Long.toString(indexResult.forceMergeTimeMS),
332342
Integer.toString(indexResult.numSegments) };
@@ -409,6 +419,7 @@ private int[] calculateColumnWidths(String[] headers, String[]... data) {
409419

410420
static class Results {
411421
final String indexType, indexName;
422+
public long docAddTimeMS;
412423
int numDocs;
413424
final float filterSelectivity;
414425
long indexTimeMS;

qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.lucene.index.ConcurrentMergeScheduler;
3030
import org.apache.lucene.index.IndexWriter;
3131
import org.apache.lucene.index.IndexWriterConfig;
32+
import org.apache.lucene.index.IndexableField;
3233
import org.apache.lucene.index.MergePolicy;
3334
import org.apache.lucene.index.VectorEncoding;
3435
import org.apache.lucene.index.VectorSimilarityFunction;
@@ -65,7 +66,6 @@
6566
import static org.elasticsearch.test.knn.KnnIndexTester.logger;
6667

6768
class KnnIndexer {
68-
private static final double WRITER_BUFFER_MB = 128;
6969
static final String ID_FIELD = "id";
7070
static final String VECTOR_FIELD = "vector";
7171

@@ -78,6 +78,8 @@ class KnnIndexer {
7878
private final int numDocs;
7979
private final int numIndexThreads;
8080
private final MergePolicy mergePolicy;
81+
private final double writerBufferSizeInMb;
82+
private final int writerMaxBufferedDocs;
8183

8284
KnnIndexer(
8385
List<Path> docsPath,
@@ -88,7 +90,9 @@ class KnnIndexer {
8890
int dim,
8991
VectorSimilarityFunction similarityFunction,
9092
int numDocs,
91-
MergePolicy mergePolicy
93+
MergePolicy mergePolicy,
94+
double writerBufferSizeInMb,
95+
int writerMaxBufferedDocs
9296
) {
9397
this.docsPath = docsPath;
9498
this.indexPath = indexPath;
@@ -99,12 +103,15 @@ class KnnIndexer {
99103
this.similarityFunction = similarityFunction;
100104
this.numDocs = numDocs;
101105
this.mergePolicy = mergePolicy;
106+
this.writerBufferSizeInMb = writerBufferSizeInMb;
107+
this.writerMaxBufferedDocs = writerMaxBufferedDocs;
102108
}
103109

104110
void createIndex(KnnIndexTester.Results result) throws IOException, InterruptedException, ExecutionException {
105111
IndexWriterConfig iwc = new IndexWriterConfig().setOpenMode(IndexWriterConfig.OpenMode.CREATE);
106112
iwc.setCodec(codec);
107-
iwc.setRAMBufferSizeMB(WRITER_BUFFER_MB);
113+
iwc.setMaxBufferedDocs(writerMaxBufferedDocs);
114+
iwc.setRAMBufferSizeMB(writerBufferSizeInMb);
108115
iwc.setUseCompoundFile(false);
109116
if (mergePolicy != null) {
110117
iwc.setMergePolicy(mergePolicy);
@@ -183,15 +190,20 @@ public boolean isEnabled(String component) {
183190

184191
VectorReader inReader = VectorReader.create(in, dim, vectorEncoding, offsetByteSize);
185192
try (ExecutorService exec = Executors.newFixedThreadPool(numIndexThreads, r -> new Thread(r, "KnnIndexer-Thread"))) {
186-
List<Future<?>> threads = new ArrayList<>();
193+
List<Future<?>> futures = new ArrayList<>();
194+
List<IndexerThread> threads = new ArrayList<>();
187195
for (int i = 0; i < numIndexThreads; i++) {
188-
Thread t = new IndexerThread(iw, inReader, dim, vectorEncoding, fieldType, numDocsIndexed, numDocs);
196+
var t = new IndexerThread(iw, inReader, dim, vectorEncoding, fieldType, numDocsIndexed, numDocs);
197+
threads.add(t);
189198
t.setDaemon(true);
190-
threads.add(exec.submit(t));
199+
futures.add(exec.submit(t));
191200
}
192-
for (Future<?> t : threads) {
193-
t.get();
201+
for (Future<?> future : futures) {
202+
future.get();
194203
}
204+
result.docAddTimeMS = TimeUnit.NANOSECONDS.toMillis(
205+
threads.stream().mapToLong(x -> x.docAddTime).sum() / numIndexThreads
206+
);
195207
}
196208
}
197209
}
@@ -248,6 +260,9 @@ static class IndexerThread extends Thread {
248260
private final float[] floatVectorBuffer;
249261
private final VectorReader in;
250262

263+
long readTime;
264+
long docAddTime;
265+
251266
private IndexerThread(
252267
IndexWriter iw,
253268
VectorReader in,
@@ -294,23 +309,32 @@ private void _run() throws IOException {
294309
continue;
295310
}
296311

297-
Document doc = new Document();
312+
var startRead = System.nanoTime();
313+
final IndexableField field;
298314
switch (vectorEncoding) {
299315
case BYTE -> {
300316
in.next(byteVectorBuffer);
301-
doc.add(new KnnByteVectorField(VECTOR_FIELD, byteVectorBuffer, fieldType));
317+
field = new KnnByteVectorField(VECTOR_FIELD, byteVectorBuffer, fieldType);
302318
}
303319
case FLOAT32 -> {
304320
in.next(floatVectorBuffer);
305-
doc.add(new KnnFloatVectorField(VECTOR_FIELD, floatVectorBuffer, fieldType));
321+
field = new KnnFloatVectorField(VECTOR_FIELD, floatVectorBuffer, fieldType);
306322
}
323+
default -> throw new UnsupportedOperationException();
307324
}
325+
long endRead = System.nanoTime();
326+
readTime += (endRead - startRead);
327+
328+
Document doc = new Document();
329+
doc.add(field);
308330

309331
if ((id + 1) % 25000 == 0) {
310332
logger.debug("Done indexing " + (id + 1) + " documents.");
311333
}
312334
doc.add(new StoredField(ID_FIELD, id));
313335
iw.addDocument(doc);
336+
337+
docAddTime += (System.nanoTime() - endRead);
314338
}
315339
}
316340
}

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.common.unit.MemorySizeValue;
2424
import org.elasticsearch.core.Nullable;
2525
import org.elasticsearch.core.TimeValue;
26+
import org.elasticsearch.core.UpdateForV10;
2627
import org.elasticsearch.index.IndexMode;
2728
import org.elasticsearch.index.IndexSettings;
2829
import org.elasticsearch.index.codec.CodecProvider;
@@ -131,6 +132,7 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
131132
* TODO: Remove in 9.0
132133
*/
133134
@Deprecated
135+
@UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_INDEXING)
134136
public static final Setting<Boolean> INDEX_OPTIMIZE_AUTO_GENERATED_IDS = Setting.boolSetting(
135137
"index.optimize_auto_generated_id",
136138
true,
@@ -213,6 +215,7 @@ public EngineConfig(
213215
// Add an escape hatch in case this change proves problematic - it used
214216
// to be a fixed amound of RAM: 256 MB.
215217
// TODO: Remove this escape hatch in 8.x
218+
@UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_INDEXING)
216219
final String escapeHatchProperty = "es.index.memory.max_index_buffer_size";
217220
String maxBufferSize = System.getProperty(escapeHatchProperty);
218221
if (maxBufferSize != null) {

x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ private int numLockedResources() {
130130
@Override
131131
public ManagedCuVSResources acquire(int numVectors, int dims, CuVSMatrix.DataType dataType) throws InterruptedException {
132132
try {
133+
var started = System.nanoTime();
133134
lock.lock();
134135

135136
boolean allConditionsMet = false;
@@ -181,6 +182,8 @@ public ManagedCuVSResources acquire(int numVectors, int dims, CuVSMatrix.DataTyp
181182
enoughResourcesCondition.await();
182183
}
183184
}
185+
var elapsed = started - System.nanoTime();
186+
logger.debug("Resource acquired in [{}ms]", elapsed / 1_000_000.0);
184187
res.locked = true;
185188
return res;
186189
} finally {

0 commit comments

Comments
 (0)