Skip to content

Commit 3d2d765

Browse files
authored
Use a copy of Elasticsearch's hybrid directory in qa:vector:checkVec utility (#134344)
1 parent e5c91ca commit 3d2d765

File tree

2 files changed

+80
-4
lines changed

2 files changed

+80
-4
lines changed

qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,18 @@
3232
import org.apache.lucene.index.MergePolicy;
3333
import org.apache.lucene.index.VectorEncoding;
3434
import org.apache.lucene.index.VectorSimilarityFunction;
35+
import org.apache.lucene.store.Directory;
3536
import org.apache.lucene.store.FSDirectory;
37+
import org.apache.lucene.store.IOContext;
38+
import org.apache.lucene.store.IndexInput;
39+
import org.apache.lucene.store.MMapDirectory;
40+
import org.apache.lucene.store.NIOFSDirectory;
41+
import org.apache.lucene.store.NativeFSLockFactory;
3642
import org.apache.lucene.util.PrintStreamInfoStream;
3743
import org.elasticsearch.common.io.Channels;
44+
import org.elasticsearch.core.IOUtils;
45+
import org.elasticsearch.index.store.LuceneFilesExtensions;
46+
import org.elasticsearch.index.store.Store;
3847

3948
import java.io.IOException;
4049
import java.io.UncheckedIOException;
@@ -124,7 +133,7 @@ public boolean isEnabled(String component) {
124133

125134
long start = System.nanoTime();
126135
AtomicInteger numDocsIndexed = new AtomicInteger();
127-
try (FSDirectory dir = FSDirectory.open(indexPath); IndexWriter iw = new IndexWriter(dir, iwc);) {
136+
try (Directory dir = getDirectory(indexPath); IndexWriter iw = new IndexWriter(dir, iwc)) {
128137
for (Path docsPath : this.docsPath) {
129138
int dim = this.dim;
130139
try (FileChannel in = FileChannel.open(docsPath)) {
@@ -212,7 +221,7 @@ public boolean isEnabled(String component) {
212221
iwc.setCodec(codec);
213222
logger.debug("KnnIndexer: forceMerge in {}", indexPath);
214223
long startNS = System.nanoTime();
215-
try (IndexWriter iw = new IndexWriter(FSDirectory.open(indexPath), iwc)) {
224+
try (IndexWriter iw = new IndexWriter(getDirectory(indexPath), iwc)) {
216225
iw.forceMerge(1);
217226
}
218227
long endNS = System.nanoTime();
@@ -221,6 +230,14 @@ public boolean isEnabled(String component) {
221230
results.forceMergeTimeMS = TimeUnit.NANOSECONDS.toMillis(elapsedNSec);
222231
}
223232

233+
static Directory getDirectory(Path indexPath) throws IOException {
234+
Directory dir = FSDirectory.open(indexPath);
235+
if (dir instanceof MMapDirectory mmapDir) {
236+
return new HybridDirectory(mmapDir);
237+
}
238+
return dir;
239+
}
240+
224241
static class IndexerThread extends Thread {
225242
private final IndexWriter iw;
226243
private final AtomicInteger numDocsIndexed;
@@ -358,4 +375,64 @@ synchronized void next(byte[] dest) throws IOException {
358375
bytes.get(dest);
359376
}
360377
}
378+
379+
// Copy of Elastic's HybridDirectory which extends NIOFSDirectory and uses MMapDirectory for certain files.
380+
static final class HybridDirectory extends NIOFSDirectory {
381+
private final MMapDirectory delegate;
382+
383+
HybridDirectory(MMapDirectory delegate) throws IOException {
384+
super(delegate.getDirectory(), NativeFSLockFactory.INSTANCE);
385+
this.delegate = delegate;
386+
}
387+
388+
@Override
389+
public IndexInput openInput(String name, IOContext context) throws IOException {
390+
if (useDelegate(name, context)) {
391+
// we need to do these checks on the outer directory since the inner doesn't know about pending deletes
392+
ensureOpen();
393+
ensureCanRead(name);
394+
// we switch the context here since mmap checks for the READONCE context by identity
395+
context = context == Store.READONCE_CHECKSUM ? IOContext.READONCE : context;
396+
// we only use the mmap to open inputs. Everything else is managed by the NIOFSDirectory otherwise
397+
// we might run into trouble with files that are pendingDelete in one directory but still
398+
// listed in listAll() from the other. We on the other hand don't want to list files from both dirs
399+
// and intersect for perf reasons.
400+
return delegate.openInput(name, context);
401+
} else {
402+
return super.openInput(name, context);
403+
}
404+
}
405+
406+
@Override
407+
public void close() throws IOException {
408+
IOUtils.close(super::close, delegate);
409+
}
410+
411+
private static String getExtension(String name) {
412+
// Unlike FileSwitchDirectory#getExtension, we treat `tmp` as a normal file extension, which can have its own rules for mmaping.
413+
final int lastDotIndex = name.lastIndexOf('.');
414+
if (lastDotIndex == -1) {
415+
return "";
416+
} else {
417+
return name.substring(lastDotIndex + 1);
418+
}
419+
}
420+
421+
static boolean useDelegate(String name, IOContext ioContext) {
422+
if (ioContext == Store.READONCE_CHECKSUM) {
423+
// If we're just reading the footer for the checksum then mmap() isn't really necessary, and it's desperately inefficient
424+
// if pre-loading is enabled on this file.
425+
return false;
426+
}
427+
428+
final LuceneFilesExtensions extension = LuceneFilesExtensions.fromExtension(getExtension(name));
429+
if (extension == null || extension.shouldMmap() == false) {
430+
// Other files are either less performance-sensitive (e.g. stored field index, norms metadata)
431+
// or are large and have a random access pattern and mmap leads to page cache trashing
432+
// (e.g. stored fields and term vectors).
433+
return false;
434+
}
435+
return true;
436+
}
437+
}
361438
}

qa/vector/src/main/java/org/elasticsearch/test/knn/KnnSearcher.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import org.apache.lucene.search.Weight;
5353
import org.apache.lucene.store.Directory;
5454
import org.apache.lucene.store.FSDirectory;
55-
import org.apache.lucene.store.MMapDirectory;
5655
import org.apache.lucene.util.BitSet;
5756
import org.apache.lucene.util.BitSetIterator;
5857
import org.apache.lucene.util.FixedBitSet;
@@ -178,7 +177,7 @@ void runSearch(KnnIndexTester.Results finalResults, boolean earlyTermination) th
178177
);
179178
KnnIndexer.VectorReader targetReader = KnnIndexer.VectorReader.create(input, dim, vectorEncoding, offsetByteSize);
180179
long startNS;
181-
try (MMapDirectory dir = new MMapDirectory(indexPath)) {
180+
try (Directory dir = KnnIndexer.getDirectory(indexPath)) {
182181
try (DirectoryReader reader = DirectoryReader.open(dir)) {
183182
IndexSearcher searcher = searchThreads > 1 ? new IndexSearcher(reader, executorService) : new IndexSearcher(reader);
184183
byte[] targetBytes = new byte[dim];

0 commit comments

Comments
 (0)