Skip to content

Commit 6f5f268

Browse files
authored
[GPU] Support for performance profiling (#136021)
In order to better understand the performance characteristics of vector indexing with a GPU, this PR introduces 2 changes: - changes to KnnIndexTester (more logging, support different write buffer sizes in input, support async-profiler - more logging in the GPU codec
1 parent 7663396 commit 6f5f268

File tree

8 files changed

+124
-16
lines changed

8 files changed

+124
-16
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ build/
4646
**/.local*
4747
.vagrant/
4848
/logs/
49+
**/target/
4950

5051
# osx stuff
5152
.DS_Store

qa/vector/build.gradle

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,34 @@ tasks.register("checkVec", JavaExec) {
5151
systemProperty "es.logger.out", "console"
5252
systemProperty "es.logger.level", "INFO" // Change to DEBUG if needed
5353
systemProperty 'es.nativelibs.path', TestUtil.getTestLibraryPath(file("../../libs/native/libraries/build/platform/").toString())
54-
jvmArgs '-Xms4g', '-Xmx4g', '-Djava.util.concurrent.ForkJoinPool.common.parallelism=8', '-XX:+UnlockDiagnosticVMOptions', '-XX:+DebugNonSafepoints', '-XX:+HeapDumpOnOutOfMemoryError'
54+
jvmArgs '-Xms16g', '-Xmx16g', '-Djava.util.concurrent.ForkJoinPool.common.parallelism=8', '-XX:+UnlockDiagnosticVMOptions', '-XX:+DebugNonSafepoints', '-XX:+HeapDumpOnOutOfMemoryError'
5555
if (buildParams.getRuntimeJavaVersion().map { it.majorVersion.toInteger() }.get() >= 21) {
5656
jvmArgs '--add-modules=jdk.incubator.vector', '--enable-native-access=ALL-UNNAMED'
5757
}
5858
if (System.getenv("DO_PROFILING") != null) {
5959
jvmArgs '-XX:StartFlightRecording=dumponexit=true,maxsize=250M,filename=knn.jfr,settings=profile.jfc'
6060
}
61+
def asyncProfilerPath = System.getProperty("asyncProfiler.path", null)
62+
if (asyncProfilerPath != null) {
63+
def asyncProfilerEvent = System.getProperty("asyncProfiler.event", "cpu")
64+
if (OS.current().equals(OS.MAC)) {
65+
def asyncProfilerAgent = "${asyncProfilerPath}/lib/libasyncProfiler.dylib"
66+
println "Using async-profiler agent ${asyncProfilerAgent}"
67+
68+
// MacOS implementation of async-profiler does not support wall clock profiling with another event.
69+
// Wall clock times can be obtained separately invoking this task with `-DasyncProfiler.event=wall`
70+
jvmArgs "-agentpath:${asyncProfilerAgent}=start,event=${asyncProfilerEvent},interval=10ms,file=${layout.buildDirectory.asFile.get()}/tmp/elasticsearch-0_%t_%p.jfr"
71+
} else if (OS.current().equals(OS.LINUX)) {
72+
// Linux implementation of async-profiler uses perf_event, which allows wall clock profiling with another event (cpu)
73+
def additionalWallInterval = asyncProfilerEvent.equals("cpu") ? ",wall=50ms" : ""
74+
75+
def asyncProfilerAgent = "${asyncProfilerPath}/lib/libasyncProfiler.so"
76+
println "Using async-profiler agent ${asyncProfilerAgent}"
77+
jvmArgs "-agentpath:${asyncProfilerAgent}=start,event=${asyncProfilerEvent},interval=10ms${additionalWallInterval},file=${layout.buildDirectory.asFile.get()}/tmp/elasticsearch-0_%t_%p.jfr"
78+
} else {
79+
println "Ignoring 'asyncProfiler.path': not available on ${OS.current()}";
80+
}
81+
}
6182
if (buildParams.getIsRuntimeJavaHomeSet()) {
6283
executable = "${buildParams.runtimeJavaHome.get()}/bin/java" + (OS.current() == OS.WINDOWS ? '.exe' : '')
6384
} else {

qa/vector/src/main/java/org/elasticsearch/test/knn/CmdLineArgs.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@
99

1010
package org.elasticsearch.test.knn;
1111

12+
import org.apache.lucene.index.IndexWriterConfig;
1213
import org.apache.lucene.index.VectorEncoding;
1314
import org.apache.lucene.index.VectorSimilarityFunction;
1415
import org.elasticsearch.common.Strings;
1516
import org.elasticsearch.core.PathUtils;
17+
import org.elasticsearch.monitor.jvm.JvmInfo;
1618
import org.elasticsearch.xcontent.ObjectParser;
1719
import org.elasticsearch.xcontent.ParseField;
1820
import org.elasticsearch.xcontent.ToXContentObject;
@@ -53,7 +55,9 @@ record CmdLineArgs(
5355
VectorEncoding vectorEncoding,
5456
int dimensions,
5557
boolean earlyTermination,
56-
KnnIndexTester.MergePolicyType mergePolicy
58+
KnnIndexTester.MergePolicyType mergePolicy,
59+
double writerBufferSizeInMb,
60+
int writerMaxBufferedDocs
5761
) implements ToXContentObject {
5862

5963
static final ParseField DOC_VECTORS_FIELD = new ParseField("doc_vectors");
@@ -82,6 +86,15 @@ record CmdLineArgs(
8286
static final ParseField FILTER_SELECTIVITY_FIELD = new ParseField("filter_selectivity");
8387
static final ParseField SEED_FIELD = new ParseField("seed");
8488
static final ParseField MERGE_POLICY_FIELD = new ParseField("merge_policy");
89+
static final ParseField WRITER_BUFFER_MB_FIELD = new ParseField("writer_buffer_mb");
90+
static final ParseField WRITER_BUFFER_DOCS_FIELD = new ParseField("writer_buffer_docs");
91+
92+
/** By default, in ES the default writer buffer size is 10% of the heap space
93+
* (see {@code IndexingMemoryController.INDEX_BUFFER_SIZE_SETTING}).
94+
* We configure the Java heap size for this tool in {@code build.gradle}; currently we default to 16GB, so in that case
95+
* the buffer size would be 1.6GB.
96+
*/
97+
static final double DEFAULT_WRITER_BUFFER_MB = (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() / (1024.0 * 1024.0)) * 0.1;
8598

8699
static CmdLineArgs fromXContent(XContentParser parser) throws IOException {
87100
Builder builder = PARSER.apply(parser, null);
@@ -117,6 +130,8 @@ static CmdLineArgs fromXContent(XContentParser parser) throws IOException {
117130
PARSER.declareFloat(Builder::setFilterSelectivity, FILTER_SELECTIVITY_FIELD);
118131
PARSER.declareLong(Builder::setSeed, SEED_FIELD);
119132
PARSER.declareString(Builder::setMergePolicy, MERGE_POLICY_FIELD);
133+
PARSER.declareDouble(Builder::setWriterBufferMb, WRITER_BUFFER_MB_FIELD);
134+
PARSER.declareInt(Builder::setWriterMaxBufferedDocs, WRITER_BUFFER_DOCS_FIELD);
120135
}
121136

122137
@Override
@@ -152,6 +167,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
152167
builder.field(EARLY_TERMINATION_FIELD.getPreferredName(), earlyTermination);
153168
builder.field(FILTER_SELECTIVITY_FIELD.getPreferredName(), filterSelectivity);
154169
builder.field(SEED_FIELD.getPreferredName(), seed);
170+
builder.field(WRITER_BUFFER_MB_FIELD.getPreferredName(), writerBufferSizeInMb);
171+
builder.field(WRITER_BUFFER_DOCS_FIELD.getPreferredName(), writerMaxBufferedDocs);
155172
return builder.endObject();
156173
}
157174

@@ -186,6 +203,13 @@ static class Builder {
186203
private float filterSelectivity = 1f;
187204
private long seed = 1751900822751L;
188205
private KnnIndexTester.MergePolicyType mergePolicy = null;
206+
private double writerBufferSizeInMb = DEFAULT_WRITER_BUFFER_MB;
207+
208+
/**
209+
* Elasticsearch does not set this explicitly, and in Lucene this setting is
210+
* disabled by default (writer flushes by RAM usage).
211+
*/
212+
private int writerMaxBufferedDocs = IndexWriterConfig.DISABLE_AUTO_FLUSH;
189213

190214
public Builder setDocVectors(List<String> docVectors) {
191215
if (docVectors == null || docVectors.isEmpty()) {
@@ -316,6 +340,16 @@ public Builder setMergePolicy(String mergePolicy) {
316340
return this;
317341
}
318342

343+
public Builder setWriterBufferMb(double writerBufferSizeInMb) {
344+
this.writerBufferSizeInMb = writerBufferSizeInMb;
345+
return this;
346+
}
347+
348+
public Builder setWriterMaxBufferedDocs(int writerMaxBufferedDocs) {
349+
this.writerMaxBufferedDocs = writerMaxBufferedDocs;
350+
return this;
351+
}
352+
319353
public CmdLineArgs build() {
320354
if (docVectors == null) {
321355
throw new IllegalArgumentException("Document vectors path must be provided");
@@ -350,7 +384,9 @@ public CmdLineArgs build() {
350384
vectorEncoding,
351385
dimensions,
352386
earlyTermination,
353-
mergePolicy
387+
mergePolicy,
388+
writerBufferSizeInMb,
389+
writerMaxBufferedDocs
354390
);
355391
}
356392
}

qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexTester.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,9 @@ public static void main(String[] args) throws Exception {
240240
cmdLineArgs.dimensions(),
241241
cmdLineArgs.vectorSpace(),
242242
cmdLineArgs.numDocs(),
243-
mergePolicy
243+
mergePolicy,
244+
cmdLineArgs.writerBufferSizeInMb(),
245+
cmdLineArgs.writerMaxBufferedDocs()
244246
);
245247
if (cmdLineArgs.reindex() == false && Files.exists(indexPath) == false) {
246248
throw new IllegalArgumentException("Index path does not exist: " + indexPath);
@@ -301,7 +303,14 @@ public String toString() {
301303
return "No results available.";
302304
}
303305

304-
String[] indexingHeaders = { "index_name", "index_type", "num_docs", "index_time(ms)", "force_merge_time(ms)", "num_segments" };
306+
String[] indexingHeaders = {
307+
"index_name",
308+
"index_type",
309+
"num_docs",
310+
"doc_add_time(ms)",
311+
"total_index_time(ms)",
312+
"force_merge_time(ms)",
313+
"num_segments" };
305314

306315
// Define column headers
307316
String[] searchHeaders = {
@@ -327,6 +336,7 @@ public String toString() {
327336
indexResult.indexName,
328337
indexResult.indexType,
329338
Integer.toString(indexResult.numDocs),
339+
Long.toString(indexResult.docAddTimeMS),
330340
Long.toString(indexResult.indexTimeMS),
331341
Long.toString(indexResult.forceMergeTimeMS),
332342
Integer.toString(indexResult.numSegments) };
@@ -409,6 +419,7 @@ private int[] calculateColumnWidths(String[] headers, String[]... data) {
409419

410420
static class Results {
411421
final String indexType, indexName;
422+
public long docAddTimeMS;
412423
int numDocs;
413424
final float filterSelectivity;
414425
long indexTimeMS;

qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.lucene.index.ConcurrentMergeScheduler;
3030
import org.apache.lucene.index.IndexWriter;
3131
import org.apache.lucene.index.IndexWriterConfig;
32+
import org.apache.lucene.index.IndexableField;
3233
import org.apache.lucene.index.MergePolicy;
3334
import org.apache.lucene.index.VectorEncoding;
3435
import org.apache.lucene.index.VectorSimilarityFunction;
@@ -60,7 +61,6 @@
6061
import static org.elasticsearch.test.knn.KnnIndexTester.logger;
6162

6263
class KnnIndexer {
63-
private static final double WRITER_BUFFER_MB = 128;
6464
static final String ID_FIELD = "id";
6565
static final String VECTOR_FIELD = "vector";
6666

@@ -73,6 +73,8 @@ class KnnIndexer {
7373
private final int numDocs;
7474
private final int numIndexThreads;
7575
private final MergePolicy mergePolicy;
76+
private final double writerBufferSizeInMb;
77+
private final int writerMaxBufferedDocs;
7678

7779
KnnIndexer(
7880
List<Path> docsPath,
@@ -83,7 +85,9 @@ class KnnIndexer {
8385
int dim,
8486
VectorSimilarityFunction similarityFunction,
8587
int numDocs,
86-
MergePolicy mergePolicy
88+
MergePolicy mergePolicy,
89+
double writerBufferSizeInMb,
90+
int writerMaxBufferedDocs
8791
) {
8892
this.docsPath = docsPath;
8993
this.indexPath = indexPath;
@@ -94,12 +98,15 @@ class KnnIndexer {
9498
this.similarityFunction = similarityFunction;
9599
this.numDocs = numDocs;
96100
this.mergePolicy = mergePolicy;
101+
this.writerBufferSizeInMb = writerBufferSizeInMb;
102+
this.writerMaxBufferedDocs = writerMaxBufferedDocs;
97103
}
98104

99105
void createIndex(KnnIndexTester.Results result) throws IOException, InterruptedException, ExecutionException {
100106
IndexWriterConfig iwc = new IndexWriterConfig().setOpenMode(IndexWriterConfig.OpenMode.CREATE);
101107
iwc.setCodec(codec);
102-
iwc.setRAMBufferSizeMB(WRITER_BUFFER_MB);
108+
iwc.setMaxBufferedDocs(writerMaxBufferedDocs);
109+
iwc.setRAMBufferSizeMB(writerBufferSizeInMb);
103110
iwc.setUseCompoundFile(false);
104111
if (mergePolicy != null) {
105112
iwc.setMergePolicy(mergePolicy);
@@ -178,15 +185,20 @@ public boolean isEnabled(String component) {
178185

179186
VectorReader inReader = VectorReader.create(in, dim, vectorEncoding, offsetByteSize);
180187
try (ExecutorService exec = Executors.newFixedThreadPool(numIndexThreads, r -> new Thread(r, "KnnIndexer-Thread"))) {
181-
List<Future<?>> threads = new ArrayList<>();
188+
List<Future<?>> futures = new ArrayList<>();
189+
List<IndexerThread> threads = new ArrayList<>();
182190
for (int i = 0; i < numIndexThreads; i++) {
183-
Thread t = new IndexerThread(iw, inReader, dim, vectorEncoding, fieldType, numDocsIndexed, numDocs);
191+
var t = new IndexerThread(iw, inReader, dim, vectorEncoding, fieldType, numDocsIndexed, numDocs);
192+
threads.add(t);
184193
t.setDaemon(true);
185-
threads.add(exec.submit(t));
194+
futures.add(exec.submit(t));
186195
}
187-
for (Future<?> t : threads) {
188-
t.get();
196+
for (Future<?> future : futures) {
197+
future.get();
189198
}
199+
result.docAddTimeMS = TimeUnit.NANOSECONDS.toMillis(
200+
threads.stream().mapToLong(x -> x.docAddTime).sum() / numIndexThreads
201+
);
190202
}
191203
}
192204
}
@@ -243,6 +255,9 @@ static class IndexerThread extends Thread {
243255
private final float[] floatVectorBuffer;
244256
private final VectorReader in;
245257

258+
long readTime;
259+
long docAddTime;
260+
246261
private IndexerThread(
247262
IndexWriter iw,
248263
VectorReader in,
@@ -289,23 +304,32 @@ private void _run() throws IOException {
289304
continue;
290305
}
291306

292-
Document doc = new Document();
307+
var startRead = System.nanoTime();
308+
final IndexableField field;
293309
switch (vectorEncoding) {
294310
case BYTE -> {
295311
in.next(byteVectorBuffer);
296-
doc.add(new KnnByteVectorField(VECTOR_FIELD, byteVectorBuffer, fieldType));
312+
field = new KnnByteVectorField(VECTOR_FIELD, byteVectorBuffer, fieldType);
297313
}
298314
case FLOAT32 -> {
299315
in.next(floatVectorBuffer);
300-
doc.add(new KnnFloatVectorField(VECTOR_FIELD, floatVectorBuffer, fieldType));
316+
field = new KnnFloatVectorField(VECTOR_FIELD, floatVectorBuffer, fieldType);
301317
}
318+
default -> throw new UnsupportedOperationException();
302319
}
320+
long endRead = System.nanoTime();
321+
readTime += (endRead - startRead);
322+
323+
Document doc = new Document();
324+
doc.add(field);
303325

304326
if ((id + 1) % 25000 == 0) {
305327
logger.debug("Done indexing " + (id + 1) + " documents.");
306328
}
307329
doc.add(new StoredField(ID_FIELD, id));
308330
iw.addDocument(doc);
331+
332+
docAddTime += (System.nanoTime() - endRead);
309333
}
310334
}
311335
}

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.common.unit.MemorySizeValue;
2424
import org.elasticsearch.core.Nullable;
2525
import org.elasticsearch.core.TimeValue;
26+
import org.elasticsearch.core.UpdateForV10;
2627
import org.elasticsearch.index.IndexMode;
2728
import org.elasticsearch.index.IndexSettings;
2829
import org.elasticsearch.index.codec.CodecProvider;
@@ -131,6 +132,7 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
131132
* TODO: Remove in 9.0
132133
*/
133134
@Deprecated
135+
@UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_INDEXING)
134136
public static final Setting<Boolean> INDEX_OPTIMIZE_AUTO_GENERATED_IDS = Setting.boolSetting(
135137
"index.optimize_auto_generated_id",
136138
true,
@@ -213,6 +215,7 @@ public EngineConfig(
213215
// Add an escape hatch in case this change proves problematic - it used
214216
// to be a fixed amound of RAM: 256 MB.
215217
// TODO: Remove this escape hatch in 8.x
218+
@UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_INDEXING)
216219
final String escapeHatchProperty = "es.index.memory.max_index_buffer_size";
217220
String maxBufferSize = System.getProperty(escapeHatchProperty);
218221
if (maxBufferSize != null) {

x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ private int numLockedResources() {
130130
@Override
131131
public ManagedCuVSResources acquire(int numVectors, int dims, CuVSMatrix.DataType dataType) throws InterruptedException {
132132
try {
133+
var started = System.nanoTime();
133134
lock.lock();
134135

135136
boolean allConditionsMet = false;
@@ -181,6 +182,8 @@ public ManagedCuVSResources acquire(int numVectors, int dims, CuVSMatrix.DataTyp
181182
enoughResourcesCondition.await();
182183
}
183184
}
185+
var elapsed = started - System.nanoTime();
186+
logger.debug("Resource acquired in [{}ms]", elapsed / 1_000_000.0);
184187
res.locked = true;
185188
return res;
186189
} finally {

0 commit comments

Comments
 (0)