2929import org .apache .lucene .index .ConcurrentMergeScheduler ;
3030import org .apache .lucene .index .IndexWriter ;
3131import org .apache .lucene .index .IndexWriterConfig ;
32+ import org .apache .lucene .index .IndexableField ;
3233import org .apache .lucene .index .MergePolicy ;
3334import org .apache .lucene .index .VectorEncoding ;
3435import org .apache .lucene .index .VectorSimilarityFunction ;
6566import static org .elasticsearch .test .knn .KnnIndexTester .logger ;
6667
6768class KnnIndexer {
68- private static final double WRITER_BUFFER_MB = 128 ;
6969 static final String ID_FIELD = "id" ;
7070 static final String VECTOR_FIELD = "vector" ;
7171
@@ -78,6 +78,7 @@ class KnnIndexer {
7878 private final int numDocs ;
7979 private final int numIndexThreads ;
8080 private final MergePolicy mergePolicy ;
81+ private final double writerBufferSizeInMb ;
8182
8283 KnnIndexer (
8384 List <Path > docsPath ,
@@ -88,7 +89,8 @@ class KnnIndexer {
8889 int dim ,
8990 VectorSimilarityFunction similarityFunction ,
9091 int numDocs ,
91- MergePolicy mergePolicy
92+ MergePolicy mergePolicy ,
93+ double writerBufferSizeInMb
9294 ) {
9395 this .docsPath = docsPath ;
9496 this .indexPath = indexPath ;
@@ -99,12 +101,14 @@ class KnnIndexer {
99101 this .similarityFunction = similarityFunction ;
100102 this .numDocs = numDocs ;
101103 this .mergePolicy = mergePolicy ;
104+ this .writerBufferSizeInMb = writerBufferSizeInMb ;
102105 }
103106
104107 void createIndex (KnnIndexTester .Results result ) throws IOException , InterruptedException , ExecutionException {
105108 IndexWriterConfig iwc = new IndexWriterConfig ().setOpenMode (IndexWriterConfig .OpenMode .CREATE );
106109 iwc .setCodec (codec );
107- iwc .setRAMBufferSizeMB (WRITER_BUFFER_MB );
110+ iwc .setMaxBufferedDocs (IndexWriterConfig .DISABLE_AUTO_FLUSH );
111+ iwc .setRAMBufferSizeMB (writerBufferSizeInMb );
108112 iwc .setUseCompoundFile (false );
109113 if (mergePolicy != null ) {
110114 iwc .setMergePolicy (mergePolicy );
@@ -248,6 +252,9 @@ static class IndexerThread extends Thread {
248252 private final float [] floatVectorBuffer ;
249253 private final VectorReader in ;
250254
255+ long readTime ;
256+ long docAddTime ;
257+
251258 private IndexerThread (
252259 IndexWriter iw ,
253260 VectorReader in ,
@@ -283,6 +290,7 @@ public void run() {
283290 } catch (IOException ioe ) {
284291 throw new UncheckedIOException (ioe );
285292 }
293+ logger .info ("Index thread times: [{}] read, [{}] add doc" , readTime , docAddTime );
286294 }
287295
288296 private void _run () throws IOException {
@@ -294,23 +302,32 @@ private void _run() throws IOException {
294302 continue ;
295303 }
296304
297- Document doc = new Document ();
305+ var startRead = System .nanoTime ();
306+ final IndexableField field ;
298307 switch (vectorEncoding ) {
299308 case BYTE -> {
300309 in .next (byteVectorBuffer );
301- doc . add ( new KnnByteVectorField (VECTOR_FIELD , byteVectorBuffer , fieldType ) );
310+ field = new KnnByteVectorField (VECTOR_FIELD , byteVectorBuffer , fieldType );
302311 }
303312 case FLOAT32 -> {
304313 in .next (floatVectorBuffer );
305- doc . add ( new KnnFloatVectorField (VECTOR_FIELD , floatVectorBuffer , fieldType ) );
314+ field = new KnnFloatVectorField (VECTOR_FIELD , floatVectorBuffer , fieldType );
306315 }
316+ default -> throw new UnsupportedOperationException ();
307317 }
318+ long endRead = System .nanoTime ();
319+ readTime += (endRead - startRead );
320+
321+ Document doc = new Document ();
322+ doc .add (field );
308323
309324 if ((id + 1 ) % 25000 == 0 ) {
310325 logger .debug ("Done indexing " + (id + 1 ) + " documents." );
311326 }
312327 doc .add (new StoredField (ID_FIELD , id ));
313328 iw .addDocument (doc );
329+
330+ docAddTime += (System .nanoTime () - endRead );
314331 }
315332 }
316333 }
0 commit comments