Skip to content

Commit e47b044

Browse files
authored
Adding asynchronous fetching for DirectIO directory (#134803)
One significant cost of DirectIO is simply waiting for bytes to be read in a path dedicate for compute. This change adds "prefetch" capabilities to DirectIO by allowing to prefetch particular file positions. For simplicity, I have it always prefetch a DirectIO page. Initially I did a bunch of work to allow prefetching multiple pages (e.g. more than 8192 bytes), but this greatly complicated the implementation. I think this can be added as a follow up. Here are some benchmarks for vectors. Note, the recall difference indicates I am doing something wrong right now. I am thinking I have a couple off-by-one errors and I am still investigating. Opening as a draft until I can figure out this weird bug (and of course, remove all my extraneous changes used for testing this thing)...This is labeled as 9.2, but I would be very surprise if it actually lands there. This PR: ``` index_name index_type visit_percentage(%) latency(ms) net_cpu_time(ms) avg_cpu_count QPS recall visited filter_selectivity ------------------------------ ---------- ------------------- ----------- ---------------- ------------- ------ ------ -------- ------------------ cohere-wikipedia-docs-768d.vec ivf 1.00 5.57 0.00 0.00 179.53 0.92 87397.37 1.00 ``` Baseline DirectIO: ``` index_name index_type visit_percentage(%) latency(ms) net_cpu_time(ms) avg_cpu_count QPS recall visited filter_selectivity ------------------------------ ---------- ------------------- ----------- ---------------- ------------- ------ ------ -------- ------------------ cohere-wikipedia-docs-768d.vec ivf 1.00 8.12 0.00 0.00 123.15 0.94 87397.37 1.00 ``` Baseline MMAP (when many floating points can still just reside in memory): ``` index_name index_type visit_percentage(%) latency(ms) net_cpu_time(ms) avg_cpu_count QPS recall visited filter_selectivity ------------------------------ ---------- ------------------- ----------- ---------------- ------------- ------ ------ -------- ------------------ cohere-wikipedia-docs-768d.vec ivf 1.00 3.58 0.00 0.00 279.33 0.94 87397.37 1.00 ```
1 parent 9dff96f commit e47b044

File tree

7 files changed

+977
-79
lines changed

7 files changed

+977
-79
lines changed

qa/vector/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@
1818
requires org.elasticsearch.logging;
1919
requires java.management;
2020
requires jdk.management;
21+
requires org.apache.lucene.misc;
2122
requires org.elasticsearch.gpu;
2223
}

qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java

Lines changed: 2 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,11 @@
3434
import org.apache.lucene.index.VectorSimilarityFunction;
3535
import org.apache.lucene.store.Directory;
3636
import org.apache.lucene.store.FSDirectory;
37-
import org.apache.lucene.store.IOContext;
38-
import org.apache.lucene.store.IndexInput;
3937
import org.apache.lucene.store.MMapDirectory;
40-
import org.apache.lucene.store.NIOFSDirectory;
4138
import org.apache.lucene.store.NativeFSLockFactory;
4239
import org.apache.lucene.util.PrintStreamInfoStream;
4340
import org.elasticsearch.common.io.Channels;
44-
import org.elasticsearch.core.IOUtils;
45-
import org.elasticsearch.index.store.LuceneFilesExtensions;
46-
import org.elasticsearch.index.store.Store;
41+
import org.elasticsearch.index.store.FsDirectoryFactory;
4742

4843
import java.io.IOException;
4944
import java.io.UncheckedIOException;
@@ -233,7 +228,7 @@ public boolean isEnabled(String component) {
233228
static Directory getDirectory(Path indexPath) throws IOException {
234229
Directory dir = FSDirectory.open(indexPath);
235230
if (dir instanceof MMapDirectory mmapDir) {
236-
return new HybridDirectory(mmapDir);
231+
return new FsDirectoryFactory.HybridDirectory(NativeFSLockFactory.INSTANCE, mmapDir, 64);
237232
}
238233
return dir;
239234
}
@@ -375,64 +370,4 @@ synchronized void next(byte[] dest) throws IOException {
375370
bytes.get(dest);
376371
}
377372
}
378-
379-
// Copy of Elastic's HybridDirectory which extends NIOFSDirectory and uses MMapDirectory for certain files.
380-
static final class HybridDirectory extends NIOFSDirectory {
381-
private final MMapDirectory delegate;
382-
383-
HybridDirectory(MMapDirectory delegate) throws IOException {
384-
super(delegate.getDirectory(), NativeFSLockFactory.INSTANCE);
385-
this.delegate = delegate;
386-
}
387-
388-
@Override
389-
public IndexInput openInput(String name, IOContext context) throws IOException {
390-
if (useDelegate(name, context)) {
391-
// we need to do these checks on the outer directory since the inner doesn't know about pending deletes
392-
ensureOpen();
393-
ensureCanRead(name);
394-
// we switch the context here since mmap checks for the READONCE context by identity
395-
context = context == Store.READONCE_CHECKSUM ? IOContext.READONCE : context;
396-
// we only use the mmap to open inputs. Everything else is managed by the NIOFSDirectory otherwise
397-
// we might run into trouble with files that are pendingDelete in one directory but still
398-
// listed in listAll() from the other. We on the other hand don't want to list files from both dirs
399-
// and intersect for perf reasons.
400-
return delegate.openInput(name, context);
401-
} else {
402-
return super.openInput(name, context);
403-
}
404-
}
405-
406-
@Override
407-
public void close() throws IOException {
408-
IOUtils.close(super::close, delegate);
409-
}
410-
411-
private static String getExtension(String name) {
412-
// Unlike FileSwitchDirectory#getExtension, we treat `tmp` as a normal file extension, which can have its own rules for mmaping.
413-
final int lastDotIndex = name.lastIndexOf('.');
414-
if (lastDotIndex == -1) {
415-
return "";
416-
} else {
417-
return name.substring(lastDotIndex + 1);
418-
}
419-
}
420-
421-
static boolean useDelegate(String name, IOContext ioContext) {
422-
if (ioContext == Store.READONCE_CHECKSUM) {
423-
// If we're just reading the footer for the checksum then mmap() isn't really necessary, and it's desperately inefficient
424-
// if pre-loading is enabled on this file.
425-
return false;
426-
}
427-
428-
final LuceneFilesExtensions extension = LuceneFilesExtensions.fromExtension(getExtension(name));
429-
if (extension == null || extension.shouldMmap() == false) {
430-
// Other files are either less performance-sensitive (e.g. stored field index, norms metadata)
431-
// or are large and have a random access pattern and mmap leads to page cache trashing
432-
// (e.g. stored fields and term vectors).
433-
return false;
434-
}
435-
return true;
436-
}
437-
}
438373
}

server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
179179
IndexModule.INDEX_RECOVERY_TYPE_SETTING,
180180
IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING,
181181
FsDirectoryFactory.INDEX_LOCK_FACTOR_SETTING,
182+
FsDirectoryFactory.ASYNC_PREFETCH_LIMIT,
182183
EngineConfig.INDEX_CODEC_SETTING,
183184
IndexMetadata.SETTING_WAIT_FOR_ACTIVE_SHARDS,
184185
IndexSettings.DEFAULT_PIPELINE,

0 commit comments

Comments
 (0)