Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
73e3cc0
Add a direct IO option to rescore_vector for bbq_hnsw
thecoop Jul 9, 2025
fb9c1df
Merge remote-tracking branch 'upstream/lucene_snapshot' into direct-i…
thecoop Jul 10, 2025
318a564
Merge branch 'lucene_snapshot' into direct-io-index-option
thecoop Jul 14, 2025
799daaa
Merge remote-tracking branch 'upstream/lucene_snapshot' into direct-i…
thecoop Jul 14, 2025
3776a01
Use a separate option
thecoop Jul 14, 2025
31acb01
Rename option, add basic tests
thecoop Jul 14, 2025
1369293
Add more test
thecoop Jul 18, 2025
c35ca82
Merge branch 'lucene_snapshot' into direct-io-index-option
thecoop Aug 12, 2025
a5a23e5
[CI] Auto commit changes from spotless
Aug 12, 2025
a59d736
Merge branch 'lucene_snapshot' into direct-io-index-option
thecoop Aug 19, 2025
9b02a48
Change to a mapper feature rather than search feature
thecoop Aug 19, 2025
5b8d499
Create new formats for direct IO access
thecoop Aug 19, 2025
5749e57
Update reference
thecoop Aug 19, 2025
7f7372a
Merge branch 'lucene_snapshot' into direct-io-index-option
thecoop Aug 19, 2025
6215f03
Check setting in tests
thecoop Aug 19, 2025
490e505
Merge branch 'lucene_snapshot' into direct-io-index-option
thecoop Aug 27, 2025
7d33963
Merge branch 'lucene_snapshot' into direct-io-index-option
thecoop Aug 27, 2025
7c4b8af
Remove JVM option
thecoop Aug 27, 2025
15f7f4c
Merge branch 'lucene_snapshot' into direct-io-index-option
thecoop Aug 28, 2025
0d0b484
Merge remote-tracking branch 'upstream/lucene_snapshot_10_3' into dir…
thecoop Sep 22, 2025
cc3a5ff
Add direct IO to diskBBQ
thecoop Sep 23, 2025
222ccc1
Merge remote-tracking branch 'upstream/lucene_snapshot_10_3' into dir…
thecoop Sep 23, 2025
7d1edd3
Remove previous implementation versions
thecoop Sep 23, 2025
67fb702
Update implementations
thecoop Sep 23, 2025
6f92506
Update docs/changelog/130893.yaml
thecoop Oct 2, 2025
0f020c7
[CI] Update transport version definitions
Oct 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ static Codec createCodec(CmdLineArgs args) {
if (args.indexType() == IndexType.FLAT) {
format = new ES818BinaryQuantizedVectorsFormat();
} else {
format = new ES818HnswBinaryQuantizedVectorsFormat(args.hnswM(), args.hnswEfConstruction(), 1, null);
format = new ES818HnswBinaryQuantizedVectorsFormat(args.hnswM(), args.hnswEfConstruction(), 1, false, null);
}
} else if (args.quantizeBits() < 32) {
if (args.indexType() == IndexType.FLAT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public FlatVectorsWriter fieldsWriter(SegmentWriteState state) throws IOExceptio
}

static boolean shouldUseDirectIO(SegmentReadState state) {
assert ES818BinaryQuantizedVectorsFormat.USE_DIRECT_IO;
return FsDirectoryFactory.isHybridFs(state.directory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@
*/
public class ES818BinaryQuantizedVectorsFormat extends FlatVectorsFormat {

public static final boolean USE_DIRECT_IO = Boolean.parseBoolean(System.getProperty("vector.rescoring.directio", "false"));

public static final String BINARIZED_VECTOR_COMPONENT = "BVEC";
public static final String NAME = "ES818BinaryQuantizedVectorsFormat";

Expand All @@ -100,17 +98,24 @@ public class ES818BinaryQuantizedVectorsFormat extends FlatVectorsFormat {
static final String VECTOR_DATA_EXTENSION = "veb";
static final int DIRECT_MONOTONIC_BLOCK_SHIFT = 16;

private static final FlatVectorsFormat rawVectorFormat = USE_DIRECT_IO
? new DirectIOLucene99FlatVectorsFormat(FlatVectorScorerUtil.getLucene99FlatVectorsScorer())
: new Lucene99FlatVectorsFormat(FlatVectorScorerUtil.getLucene99FlatVectorsScorer());

private static final ES818BinaryFlatVectorsScorer scorer = new ES818BinaryFlatVectorsScorer(
FlatVectorScorerUtil.getLucene99FlatVectorsScorer()
);

private final FlatVectorsFormat rawVectorFormat;

/** Creates a new instance with the default number of vectors per cluster. */
public ES818BinaryQuantizedVectorsFormat() {
this(false);
}

/** Creates a new instance with the default number of vectors per cluster,
* and whether direct IO should be used to access raw vectors. */
public ES818BinaryQuantizedVectorsFormat(boolean useDirectIO) {
super(NAME);
rawVectorFormat = useDirectIO
? new DirectIOLucene99FlatVectorsFormat(FlatVectorScorerUtil.getLucene99FlatVectorsScorer())
: new Lucene99FlatVectorsFormat(FlatVectorScorerUtil.getLucene99FlatVectorsScorer());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ public class ES818HnswBinaryQuantizedVectorsFormat extends KnnVectorsFormat {
private final int beamWidth;

/** The format for storing, reading, merging vectors on disk */
private static final FlatVectorsFormat flatVectorsFormat = new ES818BinaryQuantizedVectorsFormat();
private final FlatVectorsFormat flatVectorsFormat;

private final int numMergeWorkers;
private final TaskExecutor mergeExec;

/** Constructs a format using default graph construction parameters */
public ES818HnswBinaryQuantizedVectorsFormat() {
this(DEFAULT_MAX_CONN, DEFAULT_BEAM_WIDTH, DEFAULT_NUM_MERGE_WORKER, null);
this(DEFAULT_MAX_CONN, DEFAULT_BEAM_WIDTH, DEFAULT_NUM_MERGE_WORKER, false, null);
}

/**
Expand All @@ -79,7 +79,18 @@ public ES818HnswBinaryQuantizedVectorsFormat() {
* @param beamWidth the size of the queue maintained during graph construction.
*/
public ES818HnswBinaryQuantizedVectorsFormat(int maxConn, int beamWidth) {
this(maxConn, beamWidth, DEFAULT_NUM_MERGE_WORKER, null);
this(maxConn, beamWidth, DEFAULT_NUM_MERGE_WORKER, false, null);
}

/**
* Constructs a format using the given graph construction parameters.
*
* @param maxConn the maximum number of connections to a node in the HNSW graph
* @param beamWidth the size of the queue maintained during graph construction.
* @param useDirectIO whether direct IO should be used to access raw vectors
*/
public ES818HnswBinaryQuantizedVectorsFormat(int maxConn, int beamWidth, boolean useDirectIO) {
this(maxConn, beamWidth, DEFAULT_NUM_MERGE_WORKER, useDirectIO, null);
}

/**
Expand All @@ -92,7 +103,13 @@ public ES818HnswBinaryQuantizedVectorsFormat(int maxConn, int beamWidth) {
* @param mergeExec the {@link ExecutorService} that will be used by ALL vector writers that are
* generated by this format to do the merge
*/
public ES818HnswBinaryQuantizedVectorsFormat(int maxConn, int beamWidth, int numMergeWorkers, ExecutorService mergeExec) {
public ES818HnswBinaryQuantizedVectorsFormat(
int maxConn,
int beamWidth,
int numMergeWorkers,
boolean useDirectIO,
ExecutorService mergeExec
) {
super(NAME);
if (maxConn <= 0 || maxConn > MAXIMUM_MAX_CONN) {
throw new IllegalArgumentException(
Expand All @@ -110,6 +127,9 @@ public ES818HnswBinaryQuantizedVectorsFormat(int maxConn, int beamWidth, int num
throw new IllegalArgumentException("No executor service is needed as we'll use single thread to merge");
}
this.numMergeWorkers = numMergeWorkers;

flatVectorsFormat = new ES818BinaryQuantizedVectorsFormat(useDirectIO);

if (mergeExec != null) {
this.mergeExec = new TaskExecutor(mergeExec);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ private DenseVectorIndexOptions defaultIndexOptions(boolean defaultInt8Hnsw, boo
return new BBQHnswIndexOptions(
Lucene99HnswVectorsFormat.DEFAULT_MAX_CONN,
Lucene99HnswVectorsFormat.DEFAULT_BEAM_WIDTH,
new RescoreVector(DEFAULT_OVERSAMPLE)
null
);
} else if (defaultInt8Hnsw) {
return new Int8HnswIndexOptions(
Expand Down Expand Up @@ -1632,9 +1632,6 @@ public DenseVectorIndexOptions parseIndexOptions(String fieldName, Map<String, ?
RescoreVector rescoreVector = null;
if (hasRescoreIndexVersion(indexVersion)) {
rescoreVector = RescoreVector.fromIndexOptions(indexOptionsMap, indexVersion);
if (rescoreVector == null && defaultOversampleForBBQ(indexVersion)) {
rescoreVector = new RescoreVector(DEFAULT_OVERSAMPLE);
}
}
MappingParser.checkNoRemainingFields(fieldName, indexOptionsMap);
return new BBQHnswIndexOptions(m, efConstruction, rescoreVector);
Expand All @@ -1656,9 +1653,6 @@ public DenseVectorIndexOptions parseIndexOptions(String fieldName, Map<String, ?
RescoreVector rescoreVector = null;
if (hasRescoreIndexVersion(indexVersion)) {
rescoreVector = RescoreVector.fromIndexOptions(indexOptionsMap, indexVersion);
if (rescoreVector == null && defaultOversampleForBBQ(indexVersion)) {
rescoreVector = new RescoreVector(DEFAULT_OVERSAMPLE);
}
}
MappingParser.checkNoRemainingFields(fieldName, indexOptionsMap);
return new BBQFlatIndexOptions(rescoreVector);
Expand Down Expand Up @@ -1693,9 +1687,6 @@ public DenseVectorIndexOptions parseIndexOptions(String fieldName, Map<String, ?
}
}
RescoreVector rescoreVector = RescoreVector.fromIndexOptions(indexOptionsMap, indexVersion);
if (rescoreVector == null) {
rescoreVector = new RescoreVector(DEFAULT_OVERSAMPLE);
}
Object nProbeNode = indexOptionsMap.remove("default_n_probe");
int nProbe = -1;
if (nProbeNode != null) {
Expand Down Expand Up @@ -2183,7 +2174,8 @@ public BBQHnswIndexOptions(int m, int efConstruction, RescoreVector rescoreVecto
@Override
KnnVectorsFormat getVectorsFormat(ElementType elementType) {
assert elementType == ElementType.FLOAT;
return new ES818HnswBinaryQuantizedVectorsFormat(m, efConstruction);
boolean directIO = rescoreVector != null && rescoreVector.useDirectIO != null && rescoreVector.useDirectIO;
return new ES818HnswBinaryQuantizedVectorsFormat(m, efConstruction, directIO);
}

@Override
Expand Down Expand Up @@ -2342,36 +2334,46 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
}

public record RescoreVector(float oversample) implements ToXContentObject {
public record RescoreVector(Float oversample, Boolean useDirectIO) implements ToXContentObject {
static final String NAME = "rescore_vector";
static final String OVERSAMPLE = "oversample";
static final String DIRECT_IO = "direct_io";

static RescoreVector fromIndexOptions(Map<String, ?> indexOptionsMap, IndexVersion indexVersion) {
Object rescoreVectorNode = indexOptionsMap.remove(NAME);
if (rescoreVectorNode == null) {
return null;
}
Map<String, Object> mappedNode = XContentMapValues.nodeMapValue(rescoreVectorNode, NAME);

Float oversampleValue = null;
Object oversampleNode = mappedNode.get(OVERSAMPLE);
if (oversampleNode == null) {
throw new IllegalArgumentException("Invalid rescore_vector value. Missing required field " + OVERSAMPLE);
}
float oversampleValue = (float) XContentMapValues.nodeDoubleValue(oversampleNode);
if (oversampleValue == 0 && allowsZeroRescore(indexVersion) == false) {
throw new IllegalArgumentException("oversample must be greater than 1");
}
if (oversampleValue < 1 && oversampleValue != 0) {
throw new IllegalArgumentException("oversample must be greater than 1 or exactly 0");
} else if (oversampleValue > 10) {
throw new IllegalArgumentException("oversample must be less than or equal to 10");
if (oversampleNode != null) {
oversampleValue = (float) XContentMapValues.nodeDoubleValue(oversampleNode);
if (oversampleValue == 0 && allowsZeroRescore(indexVersion) == false) {
throw new IllegalArgumentException("oversample must be greater than 1");
}
if (oversampleValue < 1 && oversampleValue != 0) {
throw new IllegalArgumentException("oversample must be greater than 1 or exactly 0");
} else if (oversampleValue > 10) {
throw new IllegalArgumentException("oversample must be less than or equal to 10");
}
}
return new RescoreVector(oversampleValue);

Boolean directIO = (Boolean) mappedNode.get(DIRECT_IO);

return new RescoreVector(oversampleValue, directIO);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(NAME);
builder.field(OVERSAMPLE, oversample);
if (oversample != null) {
builder.field(OVERSAMPLE, oversample);
}
if (useDirectIO != null) {
builder.field(DIRECT_IO, useDirectIO);
}
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -2710,6 +2712,10 @@ && isNotUnitVector(squaredMagnitude)) {
&& quantizedIndexOptions.rescoreVector != null) {
oversample = quantizedIndexOptions.rescoreVector.oversample;
}
if (oversample == null) {
oversample = DEFAULT_OVERSAMPLE;
}

boolean rescore = needsRescore(oversample);
if (rescore) {
// Will get k * oversample for rescoring, and get the top k
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.VectorSimilarityFunction;
import org.apache.lucene.misc.store.DirectIODirectory;
import org.apache.lucene.search.FieldExistsQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.KnnFloatVectorQuery;
Expand All @@ -52,32 +51,19 @@
import org.apache.lucene.search.join.DiversifyingChildrenFloatKnnVectorQuery;
import org.apache.lucene.search.join.QueryBitSetProducer;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.tests.index.BaseKnnVectorsFormatTestCase;
import org.apache.lucene.tests.store.MockDirectoryWrapper;
import org.apache.lucene.tests.util.TestUtil;
import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.vectors.BQVectorUtils;
import org.elasticsearch.index.codec.vectors.OptimizedScalarQuantizer;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.test.IndexSettingsModule;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.OptionalLong;

import static java.lang.String.format;
import static org.apache.lucene.index.VectorSimilarityFunction.DOT_PRODUCT;
Expand Down Expand Up @@ -268,14 +254,6 @@ public void testSimpleOffHeapSize() throws IOException {
}
}

public void testSimpleOffHeapSizeFSDir() throws IOException {
checkDirectIOSupported();
var config = newIndexWriterConfig().setUseCompoundFile(false); // avoid compound files to allow directIO
try (Directory dir = newFSDirectory()) {
testSimpleOffHeapSizeImpl(dir, config, false);
}
}

public void testSimpleOffHeapSizeMMapDir() throws IOException {
try (Directory dir = newMMapDirectory()) {
testSimpleOffHeapSizeImpl(dir, newIndexWriterConfig(), true);
Expand Down Expand Up @@ -315,39 +293,4 @@ static Directory newMMapDirectory() throws IOException {
}
return dir;
}

private Directory newFSDirectory() throws IOException {
Settings settings = Settings.builder()
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.HYBRIDFS.name().toLowerCase(Locale.ROOT))
.build();
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("foo", settings);
Path tempDir = createTempDir().resolve(idxSettings.getUUID()).resolve("0");
Files.createDirectories(tempDir);
ShardPath path = new ShardPath(false, tempDir, tempDir, new ShardId(idxSettings.getIndex(), 0));
Directory dir = (new FsDirectoryFactory()).newDirectory(idxSettings, path);
if (random().nextBoolean()) {
dir = new MockDirectoryWrapper(random(), dir);
}
return dir;
}

static void checkDirectIOSupported() {
assumeTrue("Direct IO is not enabled", ES818BinaryQuantizedVectorsFormat.USE_DIRECT_IO);

Path path = createTempDir("directIOProbe");
try (Directory dir = open(path); IndexOutput out = dir.createOutput("out", IOContext.DEFAULT)) {
out.writeString("test");
} catch (IOException e) {
assumeNoException("test requires a filesystem that supports Direct IO", e);
}
}

static DirectIODirectory open(Path path) throws IOException {
return new DirectIODirectory(FSDirectory.open(path)) {
@Override
protected boolean useDirectIO(String name, IOContext context, OptionalLong fileLength) {
return true;
}
};
}
}
Loading