From c20fa4f7d5df6f0852b7f4c76b2542a075893d25 Mon Sep 17 00:00:00 2001 From: Lorenzo Dematte Date: Mon, 6 Oct 2025 11:55:26 +0200 Subject: [PATCH 1/4] Add write buffer parameter; add log for write time vs doc add time --- .../elasticsearch/test/knn/CmdLineArgs.java | 16 ++++++++-- .../test/knn/KnnIndexTester.java | 3 +- .../elasticsearch/test/knn/KnnIndexer.java | 29 +++++++++++++++---- 3 files changed, 39 insertions(+), 9 deletions(-) diff --git a/qa/vector/src/main/java/org/elasticsearch/test/knn/CmdLineArgs.java b/qa/vector/src/main/java/org/elasticsearch/test/knn/CmdLineArgs.java index 50c8fd107c444..bd06ac68f90fb 100644 --- a/qa/vector/src/main/java/org/elasticsearch/test/knn/CmdLineArgs.java +++ b/qa/vector/src/main/java/org/elasticsearch/test/knn/CmdLineArgs.java @@ -53,7 +53,8 @@ record CmdLineArgs( VectorEncoding vectorEncoding, int dimensions, boolean earlyTermination, - KnnIndexTester.MergePolicyType mergePolicy + KnnIndexTester.MergePolicyType mergePolicy, + double writerBufferSizeInMb ) implements ToXContentObject { static final ParseField DOC_VECTORS_FIELD = new ParseField("doc_vectors"); @@ -82,6 +83,9 @@ record CmdLineArgs( static final ParseField FILTER_SELECTIVITY_FIELD = new ParseField("filter_selectivity"); static final ParseField SEED_FIELD = new ParseField("seed"); static final ParseField MERGE_POLICY_FIELD = new ParseField("merge_policy"); + static final ParseField WRITER_BUFFER_FIELD = new ParseField("writer_buffer_mb"); + + static final double DEFAULT_WRITER_BUFFER_MB = 128; static CmdLineArgs fromXContent(XContentParser parser) throws IOException { Builder builder = PARSER.apply(parser, null); @@ -117,6 +121,7 @@ static CmdLineArgs fromXContent(XContentParser parser) throws IOException { PARSER.declareFloat(Builder::setFilterSelectivity, FILTER_SELECTIVITY_FIELD); PARSER.declareLong(Builder::setSeed, SEED_FIELD); PARSER.declareString(Builder::setMergePolicy, MERGE_POLICY_FIELD); + PARSER.declareDouble(Builder::setWriterBufferMb, WRITER_BUFFER_FIELD); } @Override @@ -186,6 +191,7 @@ static class Builder { private float filterSelectivity = 1f; private long seed = 1751900822751L; private KnnIndexTester.MergePolicyType mergePolicy = null; + private double writerBufferSizeInMb = DEFAULT_WRITER_BUFFER_MB; public Builder setDocVectors(List docVectors) { if (docVectors == null || docVectors.isEmpty()) { @@ -316,6 +322,11 @@ public Builder setMergePolicy(String mergePolicy) { return this; } + public Builder setWriterBufferMb(double writerBufferSizeInMb) { + this.writerBufferSizeInMb = writerBufferSizeInMb; + return this; + } + public CmdLineArgs build() { if (docVectors == null) { throw new IllegalArgumentException("Document vectors path must be provided"); @@ -350,7 +361,8 @@ public CmdLineArgs build() { vectorEncoding, dimensions, earlyTermination, - mergePolicy + mergePolicy, + writerBufferSizeInMb ); } } diff --git a/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexTester.java b/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexTester.java index 9e4dca46f0c18..889481120d069 100644 --- a/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexTester.java +++ b/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexTester.java @@ -240,7 +240,8 @@ public static void main(String[] args) throws Exception { cmdLineArgs.dimensions(), cmdLineArgs.vectorSpace(), cmdLineArgs.numDocs(), - mergePolicy + mergePolicy, + cmdLineArgs.writerBufferSizeInMb() ); if (cmdLineArgs.reindex() == false && Files.exists(indexPath) == false) { throw new IllegalArgumentException("Index path does not exist: " + indexPath); diff --git a/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java b/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java index 1534448abee87..e9056733b283c 100644 --- a/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java +++ b/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java @@ -29,6 +29,7 @@ import org.apache.lucene.index.ConcurrentMergeScheduler; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.VectorEncoding; import org.apache.lucene.index.VectorSimilarityFunction; @@ -65,7 +66,6 @@ import static org.elasticsearch.test.knn.KnnIndexTester.logger; class KnnIndexer { - private static final double WRITER_BUFFER_MB = 128; static final String ID_FIELD = "id"; static final String VECTOR_FIELD = "vector"; @@ -78,6 +78,7 @@ class KnnIndexer { private final int numDocs; private final int numIndexThreads; private final MergePolicy mergePolicy; + private final double writerBufferSizeInMb; KnnIndexer( List docsPath, @@ -88,7 +89,8 @@ class KnnIndexer { int dim, VectorSimilarityFunction similarityFunction, int numDocs, - MergePolicy mergePolicy + MergePolicy mergePolicy, + double writerBufferSizeInMb ) { this.docsPath = docsPath; this.indexPath = indexPath; @@ -99,12 +101,14 @@ class KnnIndexer { this.similarityFunction = similarityFunction; this.numDocs = numDocs; this.mergePolicy = mergePolicy; + this.writerBufferSizeInMb = writerBufferSizeInMb; } void createIndex(KnnIndexTester.Results result) throws IOException, InterruptedException, ExecutionException { IndexWriterConfig iwc = new IndexWriterConfig().setOpenMode(IndexWriterConfig.OpenMode.CREATE); iwc.setCodec(codec); - iwc.setRAMBufferSizeMB(WRITER_BUFFER_MB); + iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH); + iwc.setRAMBufferSizeMB(writerBufferSizeInMb); iwc.setUseCompoundFile(false); if (mergePolicy != null) { iwc.setMergePolicy(mergePolicy); @@ -248,6 +252,9 @@ static class IndexerThread extends Thread { private final float[] floatVectorBuffer; private final VectorReader in; + long readTime; + long docAddTime; + private IndexerThread( IndexWriter iw, VectorReader in, @@ -283,6 +290,7 @@ public void run() { } catch (IOException ioe) { throw new UncheckedIOException(ioe); } + logger.info("Index thread times: [{}] read, [{}] add doc", readTime, docAddTime); } private void _run() throws IOException { @@ -294,23 +302,32 @@ private void _run() throws IOException { continue; } - Document doc = new Document(); + var startRead = System.nanoTime(); + final IndexableField field; switch (vectorEncoding) { case BYTE -> { in.next(byteVectorBuffer); - doc.add(new KnnByteVectorField(VECTOR_FIELD, byteVectorBuffer, fieldType)); + field = new KnnByteVectorField(VECTOR_FIELD, byteVectorBuffer, fieldType); } case FLOAT32 -> { in.next(floatVectorBuffer); - doc.add(new KnnFloatVectorField(VECTOR_FIELD, floatVectorBuffer, fieldType)); + field = new KnnFloatVectorField(VECTOR_FIELD, floatVectorBuffer, fieldType); } + default -> throw new UnsupportedOperationException(); } + long endRead = System.nanoTime(); + readTime += (endRead - startRead); + + Document doc = new Document(); + doc.add(field); if ((id + 1) % 25000 == 0) { logger.debug("Done indexing " + (id + 1) + " documents."); } doc.add(new StoredField(ID_FIELD, id)); iw.addDocument(doc); + + docAddTime += (System.nanoTime() - endRead); } } } From c5224b8782c3764893f113f3baee69d98d7a9b8f Mon Sep 17 00:00:00 2001 From: Lorenzo Dematte Date: Mon, 6 Oct 2025 14:31:49 +0200 Subject: [PATCH 2/4] Ad async profiling; adjust log message and level; adjust gitignore --- .gitignore | 1 + qa/vector/build.gradle | 16 +++++++++++++++- .../org/elasticsearch/test/knn/KnnIndexer.java | 2 +- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 73eb766462ee5..fd1f158594439 100644 --- a/.gitignore +++ b/.gitignore @@ -46,6 +46,7 @@ build/ **/.local* .vagrant/ /logs/ +**/target/ # osx stuff .DS_Store diff --git a/qa/vector/build.gradle b/qa/vector/build.gradle index b0223791797dd..3ef0d9dcd454f 100644 --- a/qa/vector/build.gradle +++ b/qa/vector/build.gradle @@ -51,13 +51,27 @@ tasks.register("checkVec", JavaExec) { systemProperty "es.logger.out", "console" systemProperty "es.logger.level", "INFO" // Change to DEBUG if needed systemProperty 'es.nativelibs.path', TestUtil.getTestLibraryPath(file("../../libs/native/libraries/build/platform/").toString()) - jvmArgs '-Xms4g', '-Xmx4g', '-Djava.util.concurrent.ForkJoinPool.common.parallelism=8', '-XX:+UnlockDiagnosticVMOptions', '-XX:+DebugNonSafepoints', '-XX:+HeapDumpOnOutOfMemoryError' + jvmArgs '-Xms16g', '-Xmx16g', '-Djava.util.concurrent.ForkJoinPool.common.parallelism=8', '-XX:+UnlockDiagnosticVMOptions', '-XX:+DebugNonSafepoints', '-XX:+HeapDumpOnOutOfMemoryError' if (buildParams.getRuntimeJavaVersion().map { it.majorVersion.toInteger() }.get() >= 21) { jvmArgs '--add-modules=jdk.incubator.vector', '--enable-native-access=ALL-UNNAMED' } if (System.getenv("DO_PROFILING") != null) { jvmArgs '-XX:StartFlightRecording=dumponexit=true,maxsize=250M,filename=knn.jfr,settings=profile.jfc' } + def asyncProfilerPath = System.getProperty("asyncProfiler.path", null) + if (asyncProfilerPath != null) { + if (OS.current().equals(OS.MAC)) { + def asyncProfilerAgent = "${asyncProfilerPath}/lib/libasyncProfiler.dylib" + println "Using async-profiler agent ${asyncProfilerAgent}" + jvmArgs "-agentpath:${asyncProfilerAgent}=start,event=cpu,interval=10ms,file=${layout.buildDirectory.asFile.get()}/tmp/elasticsearch-0_%t_%p.jfr" + } else if (OS.current().equals(OS.LINUX)) { + def asyncProfilerAgent = "${asyncProfilerPath}/lib/libasyncProfiler.so" + println "Using async-profiler agent ${asyncProfilerAgent}" + jvmArgs "-agentpath:${asyncProfilerAgent}=start,event=cpu,interval=10ms,wall=50ms,file=${layout.buildDirectory.asFile.get()}/tmp/elasticsearch-0_%t_%p.jfr" + } else { + println "Ignoring 'asyncProfiler.path': not available on ${OS.current()}"; + } + } if (buildParams.getIsRuntimeJavaHomeSet()) { executable = "${buildParams.runtimeJavaHome.get()}/bin/java" + (OS.current() == OS.WINDOWS ? '.exe' : '') } else { diff --git a/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java b/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java index e9056733b283c..2d5a5733db1b5 100644 --- a/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java +++ b/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java @@ -290,7 +290,7 @@ public void run() { } catch (IOException ioe) { throw new UncheckedIOException(ioe); } - logger.info("Index thread times: [{}] read, [{}] add doc", readTime, docAddTime); + logger.debug("Index thread times: [{}s] read, [{}s] add doc", readTime / 1e-9, docAddTime / 1e-9); } private void _run() throws IOException { From 1bc1b68115f888a18b0303e847937453ceee0361 Mon Sep 17 00:00:00 2001 From: Lorenzo Dematte Date: Mon, 6 Oct 2025 14:53:34 +0200 Subject: [PATCH 3/4] Logging with times for resource acquisition, flush and merge --- .../xpack/gpu/codec/CuVSResourceManager.java | 3 +++ .../xpack/gpu/codec/ES92GpuHnswVectorsWriter.java | 9 +++++++++ 2 files changed, 12 insertions(+) diff --git a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java index 44240a848268b..f27108098c8f6 100644 --- a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java +++ b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java @@ -130,6 +130,7 @@ private int numLockedResources() { @Override public ManagedCuVSResources acquire(int numVectors, int dims, CuVSMatrix.DataType dataType) throws InterruptedException { try { + var started = System.nanoTime(); lock.lock(); boolean allConditionsMet = false; @@ -181,6 +182,8 @@ public ManagedCuVSResources acquire(int numVectors, int dims, CuVSMatrix.DataTyp enoughResourcesCondition.await(); } } + var elapsed = started - System.nanoTime(); + logger.debug("Resource acquired in [{}ms]", elapsed / 1e-6); res.locked = true; return res; } finally { diff --git a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/ES92GpuHnswVectorsWriter.java b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/ES92GpuHnswVectorsWriter.java index f848f715f913b..ff828cc9cc829 100644 --- a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/ES92GpuHnswVectorsWriter.java +++ b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/ES92GpuHnswVectorsWriter.java @@ -162,17 +162,21 @@ public KnnFieldVectorsWriter addField(FieldInfo fieldInfo) throws IOException @Override // TODO: fix sorted index case public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException { + var started = System.nanoTime(); flatVectorWriter.flush(maxDoc, sortMap); try { flushFieldsWithoutMemoryMappedFile(sortMap); } catch (Throwable t) { throw new IOException("Failed to flush GPU index: ", t); } + var elapsed = started - System.nanoTime(); + logger.debug("Flush total time [{}ms]", elapsed / 1e-6); } private void flushFieldsWithoutMemoryMappedFile(Sorter.DocMap sortMap) throws IOException, InterruptedException { // No tmp file written, or the file cannot be mmapped for (FieldWriter field : fields) { + var started = System.nanoTime(); var fieldInfo = field.fieldInfo; var numVectors = field.flatFieldVectorsWriter.getVectors().size(); @@ -205,6 +209,8 @@ private void flushFieldsWithoutMemoryMappedFile(Sorter.DocMap sortMap) throws IO cuVSResourceManager.release(cuVSResources); } } + var elapsed = started - System.nanoTime(); + logger.debug("Flushed [{}] vectors in [{}ms]", numVectors, elapsed / 1e-6); } } @@ -447,6 +453,7 @@ private static void deleteFilesIgnoringExceptions(Directory dir, String fileName @Override // fix sorted index case public void mergeOneField(FieldInfo fieldInfo, MergeState mergeState) throws IOException { + var started = System.nanoTime(); flatVectorWriter.mergeOneField(fieldInfo, mergeState); final int numVectors; String tempRawVectorsFileName = null; @@ -526,6 +533,8 @@ public void mergeOneField(FieldInfo fieldInfo, MergeState mergeState) throws IOE } finally { deleteFilesIgnoringExceptions(mergeState.segmentInfo.dir, tempRawVectorsFileName); } + var elapsed = started - System.nanoTime(); + logger.debug("Merged [{}] vectors in [{}ms]", numVectors, elapsed / 1e-6); } private ByteVectorValues getMergedByteVectorValues(FieldInfo fieldInfo, MergeState mergeState) throws IOException { From f034d79199cdba0c7e598974cee34bbe5d3bd894 Mon Sep 17 00:00:00 2001 From: Lorenzo Dematte Date: Mon, 6 Oct 2025 16:10:25 +0200 Subject: [PATCH 4/4] Fix timing scale --- .../main/java/org/elasticsearch/test/knn/KnnIndexer.java | 2 +- .../elasticsearch/xpack/gpu/codec/CuVSResourceManager.java | 2 +- .../xpack/gpu/codec/ES92GpuHnswVectorsWriter.java | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java b/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java index 2d5a5733db1b5..cfbf4ea1f9b8d 100644 --- a/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java +++ b/qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java @@ -290,7 +290,7 @@ public void run() { } catch (IOException ioe) { throw new UncheckedIOException(ioe); } - logger.debug("Index thread times: [{}s] read, [{}s] add doc", readTime / 1e-9, docAddTime / 1e-9); + logger.debug("Index thread times: [{}ms] read, [{}ms] add doc", readTime / 1_000_000, docAddTime / 1_000_000); } private void _run() throws IOException { diff --git a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java index f27108098c8f6..b57d933d77c4b 100644 --- a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java +++ b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/CuVSResourceManager.java @@ -183,7 +183,7 @@ public ManagedCuVSResources acquire(int numVectors, int dims, CuVSMatrix.DataTyp } } var elapsed = started - System.nanoTime(); - logger.debug("Resource acquired in [{}ms]", elapsed / 1e-6); + logger.debug("Resource acquired in [{}ms]", elapsed / 1_000_000.0); res.locked = true; return res; } finally { diff --git a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/ES92GpuHnswVectorsWriter.java b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/ES92GpuHnswVectorsWriter.java index ff828cc9cc829..3916afe77caf9 100644 --- a/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/ES92GpuHnswVectorsWriter.java +++ b/x-pack/plugin/gpu/src/main/java/org/elasticsearch/xpack/gpu/codec/ES92GpuHnswVectorsWriter.java @@ -170,7 +170,7 @@ public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException { throw new IOException("Failed to flush GPU index: ", t); } var elapsed = started - System.nanoTime(); - logger.debug("Flush total time [{}ms]", elapsed / 1e-6); + logger.debug("Flush total time [{}ms]", elapsed / 1_000_000.0); } private void flushFieldsWithoutMemoryMappedFile(Sorter.DocMap sortMap) throws IOException, InterruptedException { @@ -210,7 +210,7 @@ private void flushFieldsWithoutMemoryMappedFile(Sorter.DocMap sortMap) throws IO } } var elapsed = started - System.nanoTime(); - logger.debug("Flushed [{}] vectors in [{}ms]", numVectors, elapsed / 1e-6); + logger.debug("Flushed [{}] vectors in [{}ms]", numVectors, elapsed / 1_000_000.0); } } @@ -534,7 +534,7 @@ public void mergeOneField(FieldInfo fieldInfo, MergeState mergeState) throws IOE deleteFilesIgnoringExceptions(mergeState.segmentInfo.dir, tempRawVectorsFileName); } var elapsed = started - System.nanoTime(); - logger.debug("Merged [{}] vectors in [{}ms]", numVectors, elapsed / 1e-6); + logger.debug("Merged [{}] vectors in [{}ms]", numVectors, elapsed / 1_000_000.0); } private ByteVectorValues getMergedByteVectorValues(FieldInfo fieldInfo, MergeState mergeState) throws IOException {