32
32
import org .apache .lucene .index .MergePolicy ;
33
33
import org .apache .lucene .index .VectorEncoding ;
34
34
import org .apache .lucene .index .VectorSimilarityFunction ;
35
+ import org .apache .lucene .store .Directory ;
35
36
import org .apache .lucene .store .FSDirectory ;
37
+ import org .apache .lucene .store .IOContext ;
38
+ import org .apache .lucene .store .IndexInput ;
39
+ import org .apache .lucene .store .MMapDirectory ;
40
+ import org .apache .lucene .store .NIOFSDirectory ;
41
+ import org .apache .lucene .store .NativeFSLockFactory ;
36
42
import org .apache .lucene .util .PrintStreamInfoStream ;
37
43
import org .elasticsearch .common .io .Channels ;
44
+ import org .elasticsearch .core .IOUtils ;
45
+ import org .elasticsearch .index .store .LuceneFilesExtensions ;
46
+ import org .elasticsearch .index .store .Store ;
38
47
39
48
import java .io .IOException ;
40
49
import java .io .UncheckedIOException ;
@@ -124,7 +133,7 @@ public boolean isEnabled(String component) {
124
133
125
134
long start = System .nanoTime ();
126
135
AtomicInteger numDocsIndexed = new AtomicInteger ();
127
- try (FSDirectory dir = FSDirectory . open (indexPath ); IndexWriter iw = new IndexWriter (dir , iwc ); ) {
136
+ try (Directory dir = getDirectory (indexPath ); IndexWriter iw = new IndexWriter (dir , iwc )) {
128
137
for (Path docsPath : this .docsPath ) {
129
138
int dim = this .dim ;
130
139
try (FileChannel in = FileChannel .open (docsPath )) {
@@ -212,7 +221,7 @@ public boolean isEnabled(String component) {
212
221
iwc .setCodec (codec );
213
222
logger .debug ("KnnIndexer: forceMerge in {}" , indexPath );
214
223
long startNS = System .nanoTime ();
215
- try (IndexWriter iw = new IndexWriter (FSDirectory . open (indexPath ), iwc )) {
224
+ try (IndexWriter iw = new IndexWriter (getDirectory (indexPath ), iwc )) {
216
225
iw .forceMerge (1 );
217
226
}
218
227
long endNS = System .nanoTime ();
@@ -221,6 +230,14 @@ public boolean isEnabled(String component) {
221
230
results .forceMergeTimeMS = TimeUnit .NANOSECONDS .toMillis (elapsedNSec );
222
231
}
223
232
233
+ static Directory getDirectory (Path indexPath ) throws IOException {
234
+ Directory dir = FSDirectory .open (indexPath );
235
+ if (dir instanceof MMapDirectory mmapDir ) {
236
+ return new HybridDirectory (mmapDir );
237
+ }
238
+ return dir ;
239
+ }
240
+
224
241
static class IndexerThread extends Thread {
225
242
private final IndexWriter iw ;
226
243
private final AtomicInteger numDocsIndexed ;
@@ -358,4 +375,64 @@ synchronized void next(byte[] dest) throws IOException {
358
375
bytes .get (dest );
359
376
}
360
377
}
378
+
379
+ // Copy of Elastic's HybridDirectory which extends NIOFSDirectory and uses MMapDirectory for certain files.
380
+ static final class HybridDirectory extends NIOFSDirectory {
381
+ private final MMapDirectory delegate ;
382
+
383
+ HybridDirectory (MMapDirectory delegate ) throws IOException {
384
+ super (delegate .getDirectory (), NativeFSLockFactory .INSTANCE );
385
+ this .delegate = delegate ;
386
+ }
387
+
388
+ @ Override
389
+ public IndexInput openInput (String name , IOContext context ) throws IOException {
390
+ if (useDelegate (name , context )) {
391
+ // we need to do these checks on the outer directory since the inner doesn't know about pending deletes
392
+ ensureOpen ();
393
+ ensureCanRead (name );
394
+ // we switch the context here since mmap checks for the READONCE context by identity
395
+ context = context == Store .READONCE_CHECKSUM ? IOContext .READONCE : context ;
396
+ // we only use the mmap to open inputs. Everything else is managed by the NIOFSDirectory otherwise
397
+ // we might run into trouble with files that are pendingDelete in one directory but still
398
+ // listed in listAll() from the other. We on the other hand don't want to list files from both dirs
399
+ // and intersect for perf reasons.
400
+ return delegate .openInput (name , context );
401
+ } else {
402
+ return super .openInput (name , context );
403
+ }
404
+ }
405
+
406
+ @ Override
407
+ public void close () throws IOException {
408
+ IOUtils .close (super ::close , delegate );
409
+ }
410
+
411
+ private static String getExtension (String name ) {
412
+ // Unlike FileSwitchDirectory#getExtension, we treat `tmp` as a normal file extension, which can have its own rules for mmaping.
413
+ final int lastDotIndex = name .lastIndexOf ('.' );
414
+ if (lastDotIndex == -1 ) {
415
+ return "" ;
416
+ } else {
417
+ return name .substring (lastDotIndex + 1 );
418
+ }
419
+ }
420
+
421
+ static boolean useDelegate (String name , IOContext ioContext ) {
422
+ if (ioContext == Store .READONCE_CHECKSUM ) {
423
+ // If we're just reading the footer for the checksum then mmap() isn't really necessary, and it's desperately inefficient
424
+ // if pre-loading is enabled on this file.
425
+ return false ;
426
+ }
427
+
428
+ final LuceneFilesExtensions extension = LuceneFilesExtensions .fromExtension (getExtension (name ));
429
+ if (extension == null || extension .shouldMmap () == false ) {
430
+ // Other files are either less performance-sensitive (e.g. stored field index, norms metadata)
431
+ // or are large and have a random access pattern and mmap leads to page cache trashing
432
+ // (e.g. stored fields and term vectors).
433
+ return false ;
434
+ }
435
+ return true ;
436
+ }
437
+ }
361
438
}
0 commit comments