diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/ES818HnswBinaryQuantizedVectorsFormat.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/ES818HnswBinaryQuantizedVectorsFormat.java index 56942017c3cef..62eb470d11be1 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/ES818HnswBinaryQuantizedVectorsFormat.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/es818/ES818HnswBinaryQuantizedVectorsFormat.java @@ -30,6 +30,7 @@ import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.search.TaskExecutor; import org.apache.lucene.util.hnsw.HnswGraph; +import org.elasticsearch.index.store.FsDirectoryFactory; import java.io.IOException; import java.util.concurrent.ExecutorService; @@ -124,7 +125,9 @@ public KnnVectorsWriter fieldsWriter(SegmentWriteState state) throws IOException @Override public KnnVectorsReader fieldsReader(SegmentReadState state) throws IOException { - return new Lucene99HnswVectorsReader(state, flatVectorsFormat.fieldsReader(state)); + // mark the flat vectors format as using direct io to HybridDirectory + var flatVectorState = new SegmentReadState(state, state.segmentSuffix + FsDirectoryFactory.DIRECT_IO_KEY); + return new Lucene99HnswVectorsReader(state, flatVectorsFormat.fieldsReader(flatVectorState)); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java b/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java index bc94db13074db..a4ffb1e25078f 100644 --- a/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java +++ b/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java @@ -15,6 +15,8 @@ import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.Lock; import org.apache.lucene.store.LockFactory; import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.store.NIOFSDirectory; @@ -31,12 +33,15 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Collection; import java.util.HashSet; import java.util.Set; import java.util.function.BiPredicate; public class FsDirectoryFactory implements IndexStorePlugin.DirectoryFactory { + public static final String DIRECT_IO_KEY = "*DIRECTIO*"; + public static final Setting INDEX_LOCK_FACTOR_SETTING = new Setting<>("index.store.fs.fs_lock", "native", (s) -> { return switch (s) { case "native" -> NativeFSLockFactory.INSTANCE; @@ -70,13 +75,13 @@ protected Directory newFSDirectory(Path location, LockFactory lockFactory, Index if (primaryDirectory instanceof MMapDirectory mMapDirectory) { return new HybridDirectory(lockFactory, setPreload(mMapDirectory, preLoadExtensions)); } else { - return primaryDirectory; + return new KeyRemovalDirectory(primaryDirectory); } case MMAPFS: - return setPreload(new MMapDirectory(location, lockFactory), preLoadExtensions); + return new KeyRemovalDirectory(setPreload(new MMapDirectory(location, lockFactory), preLoadExtensions)); case SIMPLEFS: case NIOFS: - return new NIOFSDirectory(location, lockFactory); + return new KeyRemovalDirectory(new NIOFSDirectory(location, lockFactory)); default: throw new AssertionError("unexpected built-in store type [" + type + "]"); } @@ -119,17 +124,18 @@ static final class HybridDirectory extends NIOFSDirectory { @Override public IndexInput openInput(String name, IOContext context) throws IOException { - if (useDelegate(name, context)) { + String newName = useDelegate(name, context); + if (newName != null) { // we need to do these checks on the outer directory since the inner doesn't know about pending deletes ensureOpen(); - ensureCanRead(name); + ensureCanRead(newName); // we switch the context here since mmap checks for the READONCE context by identity context = context == Store.READONCE_CHECKSUM ? IOContext.READONCE : context; // we only use the mmap to open inputs. Everything else is managed by the NIOFSDirectory otherwise // we might run into trouble with files that are pendingDelete in one directory but still // listed in listAll() from the other. We on the other hand don't want to list files from both dirs // and intersect for perf reasons. - return delegate.openInput(name, context); + return delegate.openInput(newName, context); } else { return super.openInput(name, context); } @@ -150,11 +156,15 @@ private static String getExtension(String name) { } } - static boolean useDelegate(String name, IOContext ioContext) { + static String useDelegate(String name, IOContext ioContext) { + if (name.contains(DIRECT_IO_KEY)) { + return name.replace(DIRECT_IO_KEY, ""); + } + if (ioContext == Store.READONCE_CHECKSUM) { // If we're just reading the footer for the checksum then mmap() isn't really necessary, and it's desperately inefficient // if pre-loading is enabled on this file. - return false; + return null; } final LuceneFilesExtensions extension = LuceneFilesExtensions.fromExtension(getExtension(name)); @@ -162,13 +172,85 @@ static boolean useDelegate(String name, IOContext ioContext) { // Other files are either less performance-sensitive (e.g. stored field index, norms metadata) // or are large and have a random access pattern and mmap leads to page cache trashing // (e.g. stored fields and term vectors). - return false; + return null; } - return true; + return name; } MMapDirectory getDelegate() { return delegate; } } + + private static class KeyRemovalDirectory extends Directory { + + private final Directory delegate; + + private KeyRemovalDirectory(Directory delegate) { + this.delegate = delegate; + } + + @Override + public String[] listAll() throws IOException { + return delegate.listAll(); + } + + @Override + public void deleteFile(String name) throws IOException { + assert name.contains(DIRECT_IO_KEY) == false; + delegate.deleteFile(name); + } + + @Override + public long fileLength(String name) throws IOException { + return delegate.fileLength(name.replace(DIRECT_IO_KEY, "")); + } + + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + return delegate.createOutput(name.replace(DIRECT_IO_KEY, ""), context); + } + + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException { + return delegate.createTempOutput(prefix, suffix, context); + } + + @Override + public void sync(Collection names) throws IOException { + assert names.stream().noneMatch(s -> s.contains(DIRECT_IO_KEY)); + delegate.sync(names); + } + + @Override + public void syncMetaData() throws IOException { + delegate.syncMetaData(); + } + + @Override + public void rename(String source, String dest) throws IOException { + assert source.contains(DIRECT_IO_KEY) == false; + delegate.rename(source, dest); + } + + @Override + public IndexInput openInput(String name, IOContext context) throws IOException { + return delegate.openInput(name.replace(DIRECT_IO_KEY, ""), context); + } + + @Override + public Lock obtainLock(String name) throws IOException { + return delegate.obtainLock(name.replace(DIRECT_IO_KEY, "")); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public Set getPendingDeletions() throws IOException { + return delegate.getPendingDeletions(); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/store/FsDirectoryFactoryTests.java b/server/src/test/java/org/elasticsearch/index/store/FsDirectoryFactoryTests.java index b0a14515f2fbc..e4b5f408c5795 100644 --- a/server/src/test/java/org/elasticsearch/index/store/FsDirectoryFactoryTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/FsDirectoryFactoryTests.java @@ -54,17 +54,17 @@ public void testPreload() throws IOException { try (Directory directory = newDirectory(build)) { assertTrue(FsDirectoryFactory.isHybridFs(directory)); FsDirectoryFactory.HybridDirectory hybridDirectory = (FsDirectoryFactory.HybridDirectory) FilterDirectory.unwrap(directory); - assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.dvd", newIOContext(random()))); - assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.nvd", newIOContext(random()))); - assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.tim", newIOContext(random()))); - assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.tip", newIOContext(random()))); - assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.cfs", newIOContext(random()))); - assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.dim", newIOContext(random()))); - assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.kdd", newIOContext(random()))); - assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.kdi", newIOContext(random()))); - assertFalse(FsDirectoryFactory.HybridDirectory.useDelegate("foo.kdi", Store.READONCE_CHECKSUM)); - assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.tmp", newIOContext(random()))); - assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.fdt__0.tmp", newIOContext(random()))); + assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.dvd", newIOContext(random()))); + assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.nvd", newIOContext(random()))); + assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.tim", newIOContext(random()))); + assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.tip", newIOContext(random()))); + assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.cfs", newIOContext(random()))); + assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.dim", newIOContext(random()))); + assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.kdd", newIOContext(random()))); + assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.kdi", newIOContext(random()))); + assertNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.kdi", Store.READONCE_CHECKSUM)); + assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.tmp", newIOContext(random()))); + assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.fdt__0.tmp", newIOContext(random()))); MMapDirectory delegate = hybridDirectory.getDelegate(); assertThat(delegate, Matchers.instanceOf(MMapDirectory.class)); var func = fsDirectoryFactory.preLoadFuncMap.get(delegate);