Skip to content

Commit e842eeb

Browse files
committed
Avoid using direct I/O during vector merges
The vector file isn't reopened during merge operations, so relying on `IOContext` to disable direct I/O during merges is ineffective. This change updates the strategy to explicitly open the vector file twice when direct I/O is enabled: once for default reads and once for direct I/O. We then switch to the appropriate index input in `getMergeInstance`, following the same approach used by other formats.
1 parent c373c79 commit e842eeb

File tree

2 files changed

+62
-23
lines changed

2 files changed

+62
-23
lines changed

server/src/main/java/org/elasticsearch/index/codec/vectors/es818/DirectIOLucene99FlatVectorsReader.java

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import org.apache.lucene.util.hnsw.RandomVectorScorer;
4646

4747
import java.io.IOException;
48-
import java.io.UncheckedIOException;
4948

5049
import static org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader.readSimilarityFunction;
5150
import static org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader.readVectorEncoding;
@@ -58,31 +57,69 @@ public class DirectIOLucene99FlatVectorsReader extends FlatVectorsReader {
5857

5958
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(DirectIOLucene99FlatVectorsReader.class);
6059

61-
private final IntObjectHashMap<FieldEntry> fields = new IntObjectHashMap<>();
60+
private final IntObjectHashMap<FieldEntry> fields;
6261
private final IndexInput vectorData;
62+
private final IndexInput vectorDataDirect;
63+
private final IndexInput vectorDataMerge;
6364
private final FieldInfos fieldInfos;
65+
private final boolean isClone;
6466

6567
public DirectIOLucene99FlatVectorsReader(SegmentReadState state, FlatVectorsScorer scorer) throws IOException {
6668
super(scorer);
69+
this.fields = new IntObjectHashMap<>();
6770
int versionMeta = readMetadata(state);
6871
this.fieldInfos = state.fieldInfos;
6972
boolean success = false;
7073
try {
71-
vectorData = openDataInput(
74+
vectorDataDirect = openDataInput(
7275
state,
7376
versionMeta,
7477
DirectIOLucene99FlatVectorsFormat.VECTOR_DATA_EXTENSION,
7578
DirectIOLucene99FlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
76-
// Flat formats are used to randomly access vectors from their node ID that is stored
77-
// in the HNSW graph.
78-
state.context.withReadAdvice(ReadAdvice.RANDOM)
79+
state.context,
80+
true
7981
);
8082
success = true;
8183
} finally {
8284
if (success == false) {
8385
IOUtils.closeWhileHandlingException(this);
8486
}
8587
}
88+
89+
success = false;
90+
try {
91+
vectorDataMerge = openDataInput(
92+
state,
93+
versionMeta,
94+
DirectIOLucene99FlatVectorsFormat.VECTOR_DATA_EXTENSION,
95+
DirectIOLucene99FlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
96+
// We use sequential access since this input is only used for merging
97+
USE_DIRECT_IO ? state.context.withReadAdvice(ReadAdvice.SEQUENTIAL) : state.context,
98+
false
99+
);
100+
success = true;
101+
} finally {
102+
if (success == false) {
103+
IOUtils.closeWhileHandlingException(this);
104+
}
105+
}
106+
this.vectorData = vectorDataDirect;
107+
this.isClone = false;
108+
}
109+
110+
/**
111+
* Returns a {@link DirectIOLucene99FlatVectorsReader} that switch the raw vector data to use
112+
* the provided {@link IndexInput}.
113+
* This is useful for merges since we want to switch from directIO to sequential reads.
114+
*/
115+
private DirectIOLucene99FlatVectorsReader(DirectIOLucene99FlatVectorsReader clone, IndexInput vectorData) {
116+
super(clone.vectorScorer);
117+
this.fields = clone.fields;
118+
this.vectorData = vectorData;
119+
this.vectorDataDirect = clone.vectorDataDirect;
120+
this.vectorDataMerge = clone.vectorDataMerge;
121+
this.fieldInfos = clone.fieldInfos;
122+
this.isClone = true;
86123
}
87124

88125
private int readMetadata(SegmentReadState state) throws IOException {
@@ -118,11 +155,13 @@ private static IndexInput openDataInput(
118155
int versionMeta,
119156
String fileExtension,
120157
String codecName,
121-
IOContext context
158+
IOContext context,
159+
boolean useDirectIO
122160
) throws IOException {
123161
String fileName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, fileExtension);
124162
// use direct IO for accessing raw vector data for searches
125-
IndexInput in = USE_DIRECT_IO
163+
IndexInput in = useDirectIO
164+
&& USE_DIRECT_IO
126165
&& context.context() == IOContext.Context.DEFAULT
127166
&& state.directory instanceof DirectIOIndexInputSupplier did
128167
? did.openInputDirect(fileName, context)
@@ -153,6 +192,11 @@ private static IndexInput openDataInput(
153192
}
154193
}
155194

195+
// only for tests
196+
IndexInput getRawVectorsInput() {
197+
return vectorData;
198+
}
199+
156200
private void readFields(ChecksumIndexInput meta, FieldInfos infos) throws IOException {
157201
for (int fieldNumber = meta.readInt(); fieldNumber != -1; fieldNumber = meta.readInt()) {
158202
FieldInfo info = infos.fieldInfo(fieldNumber);
@@ -176,13 +220,8 @@ public void checkIntegrity() throws IOException {
176220

177221
@Override
178222
public FlatVectorsReader getMergeInstance() {
179-
try {
180-
// Update the read advice since vectors are guaranteed to be accessed sequentially for merge
181-
this.vectorData.updateReadAdvice(ReadAdvice.SEQUENTIAL);
182-
return this;
183-
} catch (IOException exception) {
184-
throw new UncheckedIOException(exception);
185-
}
223+
// for merges we use sequential access instead of direct IO
224+
return new DirectIOLucene99FlatVectorsReader(this, vectorDataMerge);
186225
}
187226

188227
private FieldEntry getFieldEntry(String field, VectorEncoding expectedEncoding) {
@@ -267,16 +306,11 @@ public RandomVectorScorer getRandomVectorScorer(String field, byte[] target) thr
267306
);
268307
}
269308

270-
@Override
271-
public void finishMerge() throws IOException {
272-
// This makes sure that the access pattern hint is reverted back since HNSW implementation
273-
// needs it
274-
this.vectorData.updateReadAdvice(ReadAdvice.RANDOM);
275-
}
276-
277309
@Override
278310
public void close() throws IOException {
279-
IOUtils.close(vectorData);
311+
if (isClone == false) {
312+
IOUtils.close(vectorDataMerge, vectorDataDirect);
313+
}
280314
}
281315

282316
private record FieldEntry(

server/src/main/java/org/elasticsearch/index/codec/vectors/es818/ES818BinaryQuantizedVectorsReader.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ public class ES818BinaryQuantizedVectorsReader extends FlatVectorsReader impleme
119119
}
120120
}
121121

122+
// only for tests
123+
FlatVectorsReader getRawVectorsReader() {
124+
return rawVectorsReader;
125+
}
126+
122127
private void readFields(ChecksumIndexInput meta, FieldInfos infos) throws IOException {
123128
for (int fieldNumber = meta.readInt(); fieldNumber != -1; fieldNumber = meta.readInt()) {
124129
FieldInfo info = infos.fieldInfo(fieldNumber);

0 commit comments

Comments
 (0)