Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
f111798
Absolutely garbage async io POC, it reads correctly but is slower tha…
benwtrent Sep 12, 2025
aa69974
iter
benwtrent Sep 15, 2025
e36d5a4
iter
benwtrent Sep 15, 2025
9f43918
iter
benwtrent Sep 16, 2025
8ccc268
Update docs/changelog/134803.yaml
benwtrent Sep 16, 2025
2ce6dd2
repeatably failing test
benwtrent Sep 18, 2025
74793df
Merge remote-tracking branch 'upstream/main' into exp/async-direct-io
benwtrent Sep 18, 2025
9ed9b4c
fixing bug
benwtrent Sep 18, 2025
c19b9b1
iter
benwtrent Sep 18, 2025
ca6db4a
Delete docs/changelog/134803.yaml
benwtrent Sep 18, 2025
6a5086e
iter
benwtrent Sep 18, 2025
87909c0
Merge branch 'exp/async-direct-io' of github.com:benwtrent/elasticsea…
benwtrent Sep 18, 2025
d1feca4
iter
benwtrent Sep 19, 2025
431fa36
Merge remote-tracking branch 'upstream/main' into exp/async-direct-io
benwtrent Sep 19, 2025
bfd8656
iter
benwtrent Sep 19, 2025
f5c6d7e
prep for merge
benwtrent Sep 24, 2025
ed19e82
iter
benwtrent Sep 24, 2025
89b430d
Merge remote-tracking branch 'upstream/main' into exp/async-direct-io
benwtrent Sep 24, 2025
6a72edc
fix after merge
benwtrent Sep 24, 2025
ad3b64c
Merge branch 'main' into exp/async-direct-io
benwtrent Sep 24, 2025
91a5bd0
fixing compilation
benwtrent Sep 24, 2025
d4fa576
Merge remote-tracking branch 'upstream/main' into exp/async-direct-io
benwtrent Sep 24, 2025
38a153b
iter
benwtrent Sep 24, 2025
04607d2
fixing forbidden
benwtrent Sep 24, 2025
ad17181
addressing PR comments, cleaning up code
benwtrent Sep 25, 2025
d7d36b7
Merge branch 'main' into exp/async-direct-io
benwtrent Sep 25, 2025
0ebdf43
iter
benwtrent Sep 26, 2025
6633f68
Merge branch 'exp/async-direct-io' of github.com:benwtrent/elasticsea…
benwtrent Sep 26, 2025
dd8abc9
Merge branch 'main' into exp/async-direct-io
benwtrent Sep 26, 2025
1ae5dd3
Merge remote-tracking branch 'upstream/main' into exp/async-direct-io
benwtrent Sep 29, 2025
441aa24
Merge branch 'exp/async-direct-io' of github.com:benwtrent/elasticsea…
benwtrent Sep 29, 2025
7b6a822
Merge branch 'main' into exp/async-direct-io
benwtrent Oct 2, 2025
b8afc2b
Merge remote-tracking branch 'upstream/main' into exp/async-direct-io
benwtrent Oct 6, 2025
cdfb99d
Adding a setting and addressing PR comments
benwtrent Oct 6, 2025
7636387
fixing setting
benwtrent Oct 6, 2025
cb5bd7b
Merge branch 'main' into exp/async-direct-io
benwtrent Oct 6, 2025
d7e20bb
Merge branch 'main' into exp/async-direct-io
benwtrent Oct 7, 2025
3e98e35
Merge branch 'main' into exp/async-direct-io
benwtrent Oct 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/134803.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 134803
summary: "[DRAFT] Adding asynchronous fetching for DirectIO directory"
area: "Vector Search, Search"
type: enhancement
issues: []
2 changes: 1 addition & 1 deletion qa/vector/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ tasks.register("checkVec", JavaExec) {
jvmArgs '--add-modules=jdk.incubator.vector', '--enable-native-access=ALL-UNNAMED'
}
if (System.getenv("DO_PROFILING") != null) {
jvmArgs '-XX:StartFlightRecording=dumponexit=true,maxsize=250M,filename=knn.jfr,settings=profile.jfc'
jvmArgs '-agentpath:/Users/benjamintrent/.local/bin/async-profiler-4.0-macos/lib/libasyncProfiler.dylib=start,jstackdepth=4096,interval=500us,event=cpu,file=knn-search-cpu.jfr'
}
if (buildParams.getIsRuntimeJavaHomeSet()) {
executable = "${buildParams.runtimeJavaHome.get()}/bin/java" + (OS.current() == OS.WINDOWS ? '.exe' : '')
Expand Down
1 change: 1 addition & 0 deletions qa/vector/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
requires org.elasticsearch.logging;
requires java.management;
requires jdk.management;
requires org.apache.lucene.misc;
}
50 changes: 48 additions & 2 deletions qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.VectorEncoding;
import org.apache.lucene.index.VectorSimilarityFunction;
import org.apache.lucene.misc.store.DirectIODirectory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext;
Expand All @@ -42,6 +43,9 @@
import org.apache.lucene.util.PrintStreamInfoStream;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.codec.vectors.es818.DirectIOIndexInputSupplier;
import org.elasticsearch.index.store.AsyncDirectIOIndexInput;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.index.store.LuceneFilesExtensions;
import org.elasticsearch.index.store.Store;

Expand All @@ -55,6 +59,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -233,7 +238,7 @@ public boolean isEnabled(String component) {
static Directory getDirectory(Path indexPath) throws IOException {
Directory dir = FSDirectory.open(indexPath);
if (dir instanceof MMapDirectory mmapDir) {
return new HybridDirectory(mmapDir);
return new FsDirectoryFactory.HybridDirectory(NativeFSLockFactory.INSTANCE, mmapDir);
}
return dir;
}
Expand Down Expand Up @@ -377,12 +382,40 @@ synchronized void next(byte[] dest) throws IOException {
}

// Copy of Elastic's HybridDirectory which extends NIOFSDirectory and uses MMapDirectory for certain files.
static final class HybridDirectory extends NIOFSDirectory {
static final class HybridDirectory extends NIOFSDirectory implements DirectIOIndexInputSupplier {
private final MMapDirectory delegate;
private final DirectIODirectory directIODelegate;

HybridDirectory(MMapDirectory delegate) throws IOException {
super(delegate.getDirectory(), NativeFSLockFactory.INSTANCE);
this.delegate = delegate;
DirectIODirectory directIO;
int blockSize = Math.toIntExact(Files.getFileStore(delegate.getDirectory()).getBlockSize());
try {
// use 8kB buffer (two pages) to guarantee it can load all of an un-page-aligned 1024-dim float vector
directIO = new DirectIODirectory(delegate, 8192, DirectIODirectory.DEFAULT_MIN_BYTES_DIRECT) {
@Override
protected boolean useDirectIO(String name, IOContext context, OptionalLong fileLength) {
return true;
}

@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
if (useDirectIO(name, context, OptionalLong.of(fileLength(name)))) {
return new AsyncDirectIOIndexInput(getDirectory().resolve(name), blockSize, 8192, 128);
} else {
return in.openInput(name, context);
}
}
};
} catch (Exception e) {
// directio not supported
logger.warn("Could not initialize DirectIO access", e);
directIO = null;
}
logger.info("HybridDirectory: using DirectIO={} blockSize={}", directIO != null, blockSize);
this.directIODelegate = directIO;
}

@Override
Expand Down Expand Up @@ -434,5 +467,18 @@ static boolean useDelegate(String name, IOContext ioContext) {
}
return true;
}

@Override
public IndexInput openInputDirect(String name, IOContext context) throws IOException {
if (directIODelegate == null) {
logger.warn("DirectIODirectory not initialized, falling back to normal openInput");
return openInput(name, context);
}
// we need to do these checks on the outer directory since the inner doesn't know about pending deletes
ensureOpen();
ensureCanRead(name);
logger.info("Opening {} with direct IO", name);
return directIODelegate.openInput(name, context);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public abstract class AbstractFlatVectorsFormat extends FlatVectorsFormat {
reason = "TODO Deprecate any lenient usage of Boolean#parseBoolean https://github.com/elastic/elasticsearch/issues/128993"
)
private static boolean getUseDirectIO() {
return Boolean.parseBoolean(System.getProperty("vector.rescoring.directio", "false"));
return true;// Boolean.parseBoolean(System.getProperty("vector.rescoring.directio", "false"));
}

protected AbstractFlatVectorsFormat(String name) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.codec.vectors;

import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IntsRef;

import java.io.IOException;

public interface RescorableVectorValues {

VectorReScorer rescorer(float[] target) throws IOException;

interface VectorReScorer {
DocIdSetIterator iterator();

Bulk bulk(DocIdSetIterator matchingDocs) throws IOException;

interface Bulk {
float nextDocsAndScores(int nextCount, Bits liveDocs, DocAndFloatFeatureBuffer buffer) throws IOException;
}
}

final class DocAndFloatFeatureBuffer {

private static final float[] EMPTY_FLOATS = new float[0];

public int[] docs = IntsRef.EMPTY_INTS;
public float[] features = EMPTY_FLOATS;
public int size;

public DocAndFloatFeatureBuffer() {}

public void growNoCopy(int minSize) {
if (docs.length < minSize) {
docs = ArrayUtil.growNoCopy(docs, minSize);
features = new float[docs.length];
}
}

public void apply(Bits liveDocs) {
int newSize = 0;
for (int i = 0; i < size; ++i) {
if (liveDocs.get(docs[i])) {
docs[newSize] = docs[i];
features[newSize] = features[i];
newSize++;
}
}
this.size = newSize;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
import org.apache.lucene.codecs.KnnVectorsWriter;
import org.apache.lucene.codecs.hnsw.FlatVectorScorerUtil;
import org.apache.lucene.codecs.hnsw.FlatVectorsFormat;
import org.apache.lucene.codecs.lucene99.Lucene99FlatVectorsFormat;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SegmentWriteState;
import org.elasticsearch.index.codec.vectors.OptimizedScalarQuantizer;
import org.elasticsearch.index.codec.vectors.es818.DirectIOLucene99FlatVectorsFormat;

import java.io.IOException;

Expand Down Expand Up @@ -56,7 +56,7 @@ public class ES920DiskBBQVectorsFormat extends KnnVectorsFormat {
public static final int VERSION_START = 0;
public static final int VERSION_CURRENT = VERSION_START;

private static final FlatVectorsFormat rawVectorFormat = new Lucene99FlatVectorsFormat(
private static final FlatVectorsFormat rawVectorFormat = new DirectIOLucene99FlatVectorsFormat(
FlatVectorScorerUtil.getLucene99FlatVectorsScorer()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
import org.apache.lucene.codecs.hnsw.FlatVectorsScorer;
import org.apache.lucene.codecs.lucene95.HasIndexSlice;
import org.apache.lucene.codecs.lucene95.OffHeapByteVectorValues;
import org.apache.lucene.codecs.lucene95.OffHeapFloatVectorValues;
import org.apache.lucene.codecs.lucene95.OrdToDocDISIReaderConfiguration;
Expand All @@ -35,6 +36,9 @@
import org.apache.lucene.index.VectorEncoding;
import org.apache.lucene.index.VectorSimilarityFunction;
import org.apache.lucene.internal.hppc.IntObjectHashMap;
import org.apache.lucene.search.ConjunctionUtils;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.VectorScorer;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
Expand All @@ -44,10 +48,12 @@
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.SuppressForbidden;
import org.apache.lucene.util.hnsw.RandomVectorScorer;
import org.elasticsearch.index.codec.vectors.RescorableVectorValues;
import org.elasticsearch.index.codec.vectors.reflect.OffHeapStats;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;

import static org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader.readSimilarityFunction;
Expand Down Expand Up @@ -203,15 +209,18 @@ private FieldEntry getFieldEntry(String field, VectorEncoding expectedEncoding)
@Override
public FloatVectorValues getFloatVectorValues(String field) throws IOException {
final FieldEntry fieldEntry = getFieldEntry(field, VectorEncoding.FLOAT32);
return OffHeapFloatVectorValues.load(
fieldEntry.similarityFunction,
vectorScorer,
fieldEntry.ordToDoc,
fieldEntry.vectorEncoding,
fieldEntry.dimension,
fieldEntry.vectorDataOffset,
fieldEntry.vectorDataLength,
vectorData
return new RescorerOffHeapVectorValues(
OffHeapFloatVectorValues.load(
fieldEntry.similarityFunction,
vectorScorer,
fieldEntry.ordToDoc,
fieldEntry.vectorEncoding,
fieldEntry.dimension,
fieldEntry.vectorDataOffset,
fieldEntry.vectorDataLength,
vectorData
),
fieldEntry.similarityFunction
);
}

Expand Down Expand Up @@ -347,4 +356,116 @@ static FieldEntry create(IndexInput input, FieldInfo info) throws IOException {
return new FieldEntry(similarityFunction, vectorEncoding, vectorDataOffset, vectorDataLength, dimension, size, ordToDoc, info);
}
}

static class RescorerOffHeapVectorValues extends FloatVectorValues implements RescorableVectorValues {

VectorSimilarityFunction similarityFunction;
FloatVectorValues inner;
IndexInput inputSlice;

RescorerOffHeapVectorValues(FloatVectorValues inner, VectorSimilarityFunction similarityFunction) {
this.inner = inner;
if (inner instanceof HasIndexSlice slice) {
this.inputSlice = slice.getSlice();
} else {
this.inputSlice = null;
}
this.inputSlice = inputSlice;
this.similarityFunction = similarityFunction;
}

@Override
public float[] vectorValue(int ord) throws IOException {
return inner.vectorValue(ord);
}

@Override
public int dimension() {
return inner.dimension();
}

@Override
public int size() {
return inner.size();
}

@Override
public RescorerOffHeapVectorValues copy() throws IOException {
return new RescorerOffHeapVectorValues(inner.copy(), similarityFunction);
}

@Override
public VectorReScorer rescorer(float[] target) throws IOException {
DocIndexIterator indexIterator = inner.iterator();
return new VectorReScorer() {
@Override
public DocIdSetIterator iterator() {
return indexIterator;
}

@Override
public Bulk bulk(DocIdSetIterator matchingDocs) throws IOException {
DocIdSetIterator conjunctionScorer = ConjunctionUtils.intersectIterators(List.of(matchingDocs, indexIterator));
if (conjunctionScorer.docID() == -1) {
conjunctionScorer.nextDoc();
}
long byteSize = (long) dimension() * Float.BYTES;
return (nextCount, liveDocs, buffer) -> {
buffer.growNoCopy(nextCount);
int size = 0;
for (int doc = conjunctionScorer.docID(); doc != DocIdSetIterator.NO_MORE_DOCS && size < nextCount; doc =
conjunctionScorer.nextDoc()) {
if (liveDocs == null || liveDocs.get(doc)) {
buffer.docs[size++] = indexIterator.index();
}
}
int bulkSize = 32;
int loopBound = size - (size % bulkSize);
int i = 0;
float maxScore = Float.NEGATIVE_INFINITY;
for (; i < loopBound; i += bulkSize) {
for (int j = 0; j < bulkSize; j++) {
long ord = buffer.docs[i + j];
inputSlice.prefetch(ord * byteSize, byteSize);
}
for (int j = 0; j < bulkSize; j++) {
float[] vector = inner.vectorValue(buffer.docs[i + j]);
assert inputSlice.getFilePointer() == (buffer.docs[i + j] + 1l) * byteSize
: "file pointer mismatch at doc="
+ buffer.docs[i + j]
+ " fp="
+ inputSlice.getFilePointer()
+ " expected="
+ ((buffer.docs[i + j] + 1l) * byteSize);
buffer.features[i + j] = similarityFunction.compare(vector, target);
if (buffer.features[i + j] > maxScore) {
maxScore = buffer.features[i + j];
}
buffer.docs[i + j] = inner.ordToDoc(buffer.docs[i + j]);
}
}
for (int j = i; j < size; j++) {
long ord = buffer.docs[j];
inputSlice.prefetch(ord * byteSize, byteSize);
}
for (; i < size; i++) {
float[] vector = inner.vectorValue(buffer.docs[i]);
buffer.features[i] = similarityFunction.compare(vector, target);
if (buffer.features[i] > maxScore) {
maxScore = buffer.features[i];
}
buffer.docs[i] = inner.ordToDoc(buffer.docs[i]);
}
buffer.size = size;
return maxScore;
};
}
};
}

@Override
public VectorScorer scorer(float[] target) throws IOException {
return inner.scorer(target);
}
}
}
Loading