Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.search.TaskExecutor;
import org.apache.lucene.util.hnsw.HnswGraph;
import org.elasticsearch.index.store.FsDirectoryFactory;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -124,7 +125,9 @@ public KnnVectorsWriter fieldsWriter(SegmentWriteState state) throws IOException

@Override
public KnnVectorsReader fieldsReader(SegmentReadState state) throws IOException {
return new Lucene99HnswVectorsReader(state, flatVectorsFormat.fieldsReader(state));
// mark the flat vectors format as using direct io to HybridDirectory
var flatVectorState = new SegmentReadState(state, state.segmentSuffix + FsDirectoryFactory.DIRECT_IO_KEY);
return new Lucene99HnswVectorsReader(state, flatVectorsFormat.fieldsReader(flatVectorState));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.NIOFSDirectory;
Expand All @@ -31,12 +33,15 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.function.BiPredicate;

public class FsDirectoryFactory implements IndexStorePlugin.DirectoryFactory {

public static final String DIRECT_IO_KEY = "*DIRECTIO*";

public static final Setting<LockFactory> INDEX_LOCK_FACTOR_SETTING = new Setting<>("index.store.fs.fs_lock", "native", (s) -> {
return switch (s) {
case "native" -> NativeFSLockFactory.INSTANCE;
Expand Down Expand Up @@ -70,13 +75,13 @@ protected Directory newFSDirectory(Path location, LockFactory lockFactory, Index
if (primaryDirectory instanceof MMapDirectory mMapDirectory) {
return new HybridDirectory(lockFactory, setPreload(mMapDirectory, preLoadExtensions));
} else {
return primaryDirectory;
return new KeyRemovalDirectory(primaryDirectory);
}
case MMAPFS:
return setPreload(new MMapDirectory(location, lockFactory), preLoadExtensions);
return new KeyRemovalDirectory(setPreload(new MMapDirectory(location, lockFactory), preLoadExtensions));
case SIMPLEFS:
case NIOFS:
return new NIOFSDirectory(location, lockFactory);
return new KeyRemovalDirectory(new NIOFSDirectory(location, lockFactory));
default:
throw new AssertionError("unexpected built-in store type [" + type + "]");
}
Expand Down Expand Up @@ -119,17 +124,18 @@ static final class HybridDirectory extends NIOFSDirectory {

@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
if (useDelegate(name, context)) {
String newName = useDelegate(name, context);
if (newName != null) {
// we need to do these checks on the outer directory since the inner doesn't know about pending deletes
ensureOpen();
ensureCanRead(name);
ensureCanRead(newName);
// we switch the context here since mmap checks for the READONCE context by identity
context = context == Store.READONCE_CHECKSUM ? IOContext.READONCE : context;
// we only use the mmap to open inputs. Everything else is managed by the NIOFSDirectory otherwise
// we might run into trouble with files that are pendingDelete in one directory but still
// listed in listAll() from the other. We on the other hand don't want to list files from both dirs
// and intersect for perf reasons.
return delegate.openInput(name, context);
return delegate.openInput(newName, context);
} else {
return super.openInput(name, context);
}
Expand All @@ -150,25 +156,101 @@ private static String getExtension(String name) {
}
}

static boolean useDelegate(String name, IOContext ioContext) {
static String useDelegate(String name, IOContext ioContext) {
if (name.contains(DIRECT_IO_KEY)) {
return name.replace(DIRECT_IO_KEY, "");
}

if (ioContext == Store.READONCE_CHECKSUM) {
// If we're just reading the footer for the checksum then mmap() isn't really necessary, and it's desperately inefficient
// if pre-loading is enabled on this file.
return false;
return null;
}

final LuceneFilesExtensions extension = LuceneFilesExtensions.fromExtension(getExtension(name));
if (extension == null || extension.shouldMmap() == false) {
// Other files are either less performance-sensitive (e.g. stored field index, norms metadata)
// or are large and have a random access pattern and mmap leads to page cache trashing
// (e.g. stored fields and term vectors).
return false;
return null;
}
return true;
return name;
}

MMapDirectory getDelegate() {
return delegate;
}
}

private static class KeyRemovalDirectory extends Directory {

private final Directory delegate;

private KeyRemovalDirectory(Directory delegate) {
this.delegate = delegate;
}

@Override
public String[] listAll() throws IOException {
return delegate.listAll();
}

@Override
public void deleteFile(String name) throws IOException {
assert name.contains(DIRECT_IO_KEY) == false;
delegate.deleteFile(name);
}

@Override
public long fileLength(String name) throws IOException {
return delegate.fileLength(name.replace(DIRECT_IO_KEY, ""));
}

@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
return delegate.createOutput(name.replace(DIRECT_IO_KEY, ""), context);
}

@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
return delegate.createTempOutput(prefix, suffix, context);
}

@Override
public void sync(Collection<String> names) throws IOException {
assert names.stream().noneMatch(s -> s.contains(DIRECT_IO_KEY));
delegate.sync(names);
}

@Override
public void syncMetaData() throws IOException {
delegate.syncMetaData();
}

@Override
public void rename(String source, String dest) throws IOException {
assert source.contains(DIRECT_IO_KEY) == false;
delegate.rename(source, dest);
}

@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
return delegate.openInput(name.replace(DIRECT_IO_KEY, ""), context);
}

@Override
public Lock obtainLock(String name) throws IOException {
return delegate.obtainLock(name.replace(DIRECT_IO_KEY, ""));
}

@Override
public void close() throws IOException {
delegate.close();
}

@Override
public Set<String> getPendingDeletions() throws IOException {
return delegate.getPendingDeletions();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,17 @@ public void testPreload() throws IOException {
try (Directory directory = newDirectory(build)) {
assertTrue(FsDirectoryFactory.isHybridFs(directory));
FsDirectoryFactory.HybridDirectory hybridDirectory = (FsDirectoryFactory.HybridDirectory) FilterDirectory.unwrap(directory);
assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.dvd", newIOContext(random())));
assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.nvd", newIOContext(random())));
assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.tim", newIOContext(random())));
assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.tip", newIOContext(random())));
assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.cfs", newIOContext(random())));
assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.dim", newIOContext(random())));
assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.kdd", newIOContext(random())));
assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.kdi", newIOContext(random())));
assertFalse(FsDirectoryFactory.HybridDirectory.useDelegate("foo.kdi", Store.READONCE_CHECKSUM));
assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.tmp", newIOContext(random())));
assertTrue(FsDirectoryFactory.HybridDirectory.useDelegate("foo.fdt__0.tmp", newIOContext(random())));
assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.dvd", newIOContext(random())));
assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.nvd", newIOContext(random())));
assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.tim", newIOContext(random())));
assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.tip", newIOContext(random())));
assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.cfs", newIOContext(random())));
assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.dim", newIOContext(random())));
assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.kdd", newIOContext(random())));
assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.kdi", newIOContext(random())));
assertNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.kdi", Store.READONCE_CHECKSUM));
assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.tmp", newIOContext(random())));
assertNotNull(FsDirectoryFactory.HybridDirectory.useDelegate("foo.fdt__0.tmp", newIOContext(random())));
MMapDirectory delegate = hybridDirectory.getDelegate();
assertThat(delegate, Matchers.instanceOf(MMapDirectory.class));
var func = fsDirectoryFactory.preLoadFuncMap.get(delegate);
Expand Down