3232import org .apache .lucene .index .MergePolicy ;
3333import org .apache .lucene .index .VectorEncoding ;
3434import org .apache .lucene .index .VectorSimilarityFunction ;
35+ import org .apache .lucene .store .Directory ;
3536import org .apache .lucene .store .FSDirectory ;
37+ import org .apache .lucene .store .IOContext ;
38+ import org .apache .lucene .store .IndexInput ;
39+ import org .apache .lucene .store .MMapDirectory ;
40+ import org .apache .lucene .store .NIOFSDirectory ;
41+ import org .apache .lucene .store .NativeFSLockFactory ;
3642import org .apache .lucene .util .PrintStreamInfoStream ;
3743import org .elasticsearch .common .io .Channels ;
44+ import org .elasticsearch .core .IOUtils ;
45+ import org .elasticsearch .index .store .LuceneFilesExtensions ;
46+ import org .elasticsearch .index .store .Store ;
3847
3948import java .io .IOException ;
4049import java .io .UncheckedIOException ;
@@ -124,7 +133,7 @@ public boolean isEnabled(String component) {
124133
125134 long start = System .nanoTime ();
126135 AtomicInteger numDocsIndexed = new AtomicInteger ();
127- try (FSDirectory dir = FSDirectory . open (indexPath ); IndexWriter iw = new IndexWriter (dir , iwc ); ) {
136+ try (Directory dir = getDirectory (indexPath ); IndexWriter iw = new IndexWriter (dir , iwc )) {
128137 for (Path docsPath : this .docsPath ) {
129138 int dim = this .dim ;
130139 try (FileChannel in = FileChannel .open (docsPath )) {
@@ -212,7 +221,7 @@ public boolean isEnabled(String component) {
212221 iwc .setCodec (codec );
213222 logger .debug ("KnnIndexer: forceMerge in {}" , indexPath );
214223 long startNS = System .nanoTime ();
215- try (IndexWriter iw = new IndexWriter (FSDirectory . open (indexPath ), iwc )) {
224+ try (IndexWriter iw = new IndexWriter (getDirectory (indexPath ), iwc )) {
216225 iw .forceMerge (1 );
217226 }
218227 long endNS = System .nanoTime ();
@@ -221,6 +230,14 @@ public boolean isEnabled(String component) {
221230 results .forceMergeTimeMS = TimeUnit .NANOSECONDS .toMillis (elapsedNSec );
222231 }
223232
233+ static Directory getDirectory (Path indexPath ) throws IOException {
234+ Directory dir = FSDirectory .open (indexPath );
235+ if (dir instanceof MMapDirectory mmapDir ) {
236+ return new HybridDirectory (mmapDir );
237+ }
238+ return dir ;
239+ }
240+
224241 static class IndexerThread extends Thread {
225242 private final IndexWriter iw ;
226243 private final AtomicInteger numDocsIndexed ;
@@ -358,4 +375,64 @@ synchronized void next(byte[] dest) throws IOException {
358375 bytes .get (dest );
359376 }
360377 }
378+
379+ // Copy of Elastic's HybridDirectory which extends NIOFSDirectory and uses MMapDirectory for certain files.
380+ static final class HybridDirectory extends NIOFSDirectory {
381+ private final MMapDirectory delegate ;
382+
383+ HybridDirectory (MMapDirectory delegate ) throws IOException {
384+ super (delegate .getDirectory (), NativeFSLockFactory .INSTANCE );
385+ this .delegate = delegate ;
386+ }
387+
388+ @ Override
389+ public IndexInput openInput (String name , IOContext context ) throws IOException {
390+ if (useDelegate (name , context )) {
391+ // we need to do these checks on the outer directory since the inner doesn't know about pending deletes
392+ ensureOpen ();
393+ ensureCanRead (name );
394+ // we switch the context here since mmap checks for the READONCE context by identity
395+ context = context == Store .READONCE_CHECKSUM ? IOContext .READONCE : context ;
396+ // we only use the mmap to open inputs. Everything else is managed by the NIOFSDirectory otherwise
397+ // we might run into trouble with files that are pendingDelete in one directory but still
398+ // listed in listAll() from the other. We on the other hand don't want to list files from both dirs
399+ // and intersect for perf reasons.
400+ return delegate .openInput (name , context );
401+ } else {
402+ return super .openInput (name , context );
403+ }
404+ }
405+
406+ @ Override
407+ public void close () throws IOException {
408+ IOUtils .close (super ::close , delegate );
409+ }
410+
411+ private static String getExtension (String name ) {
412+ // Unlike FileSwitchDirectory#getExtension, we treat `tmp` as a normal file extension, which can have its own rules for mmaping.
413+ final int lastDotIndex = name .lastIndexOf ('.' );
414+ if (lastDotIndex == -1 ) {
415+ return "" ;
416+ } else {
417+ return name .substring (lastDotIndex + 1 );
418+ }
419+ }
420+
421+ static boolean useDelegate (String name , IOContext ioContext ) {
422+ if (ioContext == Store .READONCE_CHECKSUM ) {
423+ // If we're just reading the footer for the checksum then mmap() isn't really necessary, and it's desperately inefficient
424+ // if pre-loading is enabled on this file.
425+ return false ;
426+ }
427+
428+ final LuceneFilesExtensions extension = LuceneFilesExtensions .fromExtension (getExtension (name ));
429+ if (extension == null || extension .shouldMmap () == false ) {
430+ // Other files are either less performance-sensitive (e.g. stored field index, norms metadata)
431+ // or are large and have a random access pattern and mmap leads to page cache trashing
432+ // (e.g. stored fields and term vectors).
433+ return false ;
434+ }
435+ return true ;
436+ }
437+ }
361438}
0 commit comments