Skip to content

Commit 7f05ab9

Browse files
authored
Fix and test off-heap stats when using direct IO for accessing the raw vectors (#128615)
Fix and test off-heap stats when using direct IO for accessing the raw vectors. The direct IO reader is not using off-heap, so it returns an empty map to indicate that there is no off-heap requirements. I added some overloaded of tests with different directories to verify this. Note: For 9.1 we're still using reflection to access the internals of non-ES readers, but DirectIO is an ES reader so we can use our internal OffHeapStats interface (rather than reflection). This is all replaced when we eventually get Lucene 10.3.
1 parent 1b151ed commit 7f05ab9

File tree

7 files changed

+188
-34
lines changed

7 files changed

+188
-34
lines changed

docs/changelog/128615.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 128615
2+
summary: Fix and test off-heap stats when using direct IO for accessing the raw vectors
3+
area: Vector Search
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/index/codec/vectors/es818/DirectIOLucene99FlatVectorsReader.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,18 @@
4444
import org.apache.lucene.util.RamUsageEstimator;
4545
import org.apache.lucene.util.SuppressForbidden;
4646
import org.apache.lucene.util.hnsw.RandomVectorScorer;
47+
import org.elasticsearch.index.codec.vectors.reflect.OffHeapStats;
4748

4849
import java.io.IOException;
4950
import java.io.UncheckedIOException;
51+
import java.util.Map;
5052

5153
import static org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader.readSimilarityFunction;
5254
import static org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader.readVectorEncoding;
5355

5456
/** Copied from Lucene99FlatVectorsReader in Lucene 10.2, then modified to support DirectIOIndexInputSupplier */
5557
@SuppressForbidden(reason = "Copied from lucene")
56-
public class DirectIOLucene99FlatVectorsReader extends FlatVectorsReader {
58+
public class DirectIOLucene99FlatVectorsReader extends FlatVectorsReader implements OffHeapStats {
5759

5860
private static final boolean USE_DIRECT_IO = Boolean.parseBoolean(System.getProperty("vector.rescoring.directio", "true"));
5961

@@ -282,6 +284,11 @@ public void close() throws IOException {
282284
IOUtils.close(vectorData);
283285
}
284286

287+
@Override
288+
public Map<String, Long> getOffHeapByteSize(FieldInfo fieldInfo) {
289+
return Map.of(); // no off-heap
290+
}
291+
285292
private record FieldEntry(
286293
VectorSimilarityFunction similarityFunction,
287294
VectorEncoding vectorEncoding,

server/src/main/java/org/elasticsearch/index/codec/vectors/es818/MergeReaderWrapper.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,21 @@
1111

1212
import org.apache.lucene.codecs.hnsw.FlatVectorsReader;
1313
import org.apache.lucene.index.ByteVectorValues;
14+
import org.apache.lucene.index.FieldInfo;
1415
import org.apache.lucene.index.FloatVectorValues;
1516
import org.apache.lucene.search.KnnCollector;
1617
import org.apache.lucene.util.Accountable;
1718
import org.apache.lucene.util.Bits;
1819
import org.apache.lucene.util.hnsw.RandomVectorScorer;
1920
import org.elasticsearch.core.IOUtils;
21+
import org.elasticsearch.index.codec.vectors.reflect.OffHeapByteSizeUtils;
22+
import org.elasticsearch.index.codec.vectors.reflect.OffHeapStats;
2023

2124
import java.io.IOException;
2225
import java.util.Collection;
26+
import java.util.Map;
2327

24-
class MergeReaderWrapper extends FlatVectorsReader {
28+
class MergeReaderWrapper extends FlatVectorsReader implements OffHeapStats {
2529

2630
private final FlatVectorsReader mainReader;
2731
private final FlatVectorsReader mergeReader;
@@ -86,4 +90,9 @@ public void search(String field, byte[] target, KnnCollector knnCollector, Bits
8690
public void close() throws IOException {
8791
IOUtils.close(mainReader, mergeReader);
8892
}
93+
94+
@Override
95+
public Map<String, Long> getOffHeapByteSize(FieldInfo fieldInfo) {
96+
return OffHeapByteSizeUtils.getOffHeapByteSize(mainReader, fieldInfo);
97+
}
8998
}

server/src/main/java/org/elasticsearch/index/codec/vectors/reflect/OffHeapByteSizeUtils.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader;
2020
import org.apache.lucene.codecs.lucene99.Lucene99ScalarQuantizedVectorsReader;
2121
import org.apache.lucene.index.FieldInfo;
22-
import org.elasticsearch.index.codec.vectors.es818.DirectIOLucene99FlatVectorsReader;
2322

2423
import java.util.Map;
2524
import java.util.stream.Collectors;
@@ -52,9 +51,6 @@ public static Map<String, Long> getOffHeapByteSize(KnnVectorsReader reader, Fiel
5251
case Lucene99FlatVectorsReader flatVectorsReader -> {
5352
return OffHeapReflectionUtils.getOffHeapByteSizeF99FLT(flatVectorsReader, fieldInfo);
5453
}
55-
case DirectIOLucene99FlatVectorsReader flatVectorsReader -> {
56-
return OffHeapReflectionUtils.getOffHeapByteSizeF99FLT(flatVectorsReader, fieldInfo);
57-
}
5854
case Lucene95HnswVectorsReader lucene95HnswVectorsReader -> {
5955
return OffHeapReflectionUtils.getOffHeapByteSizeL95HNSW(lucene95HnswVectorsReader, fieldInfo);
6056
}

server/src/main/java/org/elasticsearch/index/codec/vectors/reflect/OffHeapReflectionUtils.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.lucene.index.FieldInfo;
2222
import org.apache.lucene.index.VectorEncoding;
2323
import org.elasticsearch.core.SuppressForbidden;
24-
import org.elasticsearch.index.codec.vectors.es818.DirectIOLucene99FlatVectorsReader;
2524

2625
import java.lang.invoke.MethodHandle;
2726
import java.lang.invoke.MethodHandles;
@@ -47,15 +46,12 @@ private OffHeapReflectionUtils() {}
4746
private static final VarHandle RAW_VECTORS_READER_HNDL_SQ;
4847
private static final MethodHandle GET_FIELD_ENTRY_HANDLE_L99FLT;
4948
private static final MethodHandle VECTOR_DATA_LENGTH_HANDLE_L99FLT;
50-
private static final MethodHandle GET_FIELD_ENTRY_HANDLE_DIOL99FLT;
51-
private static final MethodHandle VECTOR_DATA_LENGTH_HANDLE_DIOL99FLT;
5249
private static final MethodHandle GET_FIELD_ENTRY_HANDLE_L99HNSW;
5350
private static final MethodHandle GET_VECTOR_INDEX_LENGTH_HANDLE_L99HNSW;
5451
private static final VarHandle FLAT_VECTORS_READER_HNDL_L99HNSW;
5552

5653
static final Class<?> L99_SQ_VR_CLS = Lucene99ScalarQuantizedVectorsReader.class;
5754
static final Class<?> L99_FLT_VR_CLS = Lucene99FlatVectorsReader.class;
58-
static final Class<?> DIOL99_FLT_VR_CLS = DirectIOLucene99FlatVectorsReader.class;
5955
static final Class<?> L99_HNSW_VR_CLS = Lucene99HnswVectorsReader.class;
6056

6157
// old codecs
@@ -100,12 +96,6 @@ private OffHeapReflectionUtils() {}
10096
mt = methodType(cls, String.class, VectorEncoding.class);
10197
GET_FIELD_ENTRY_HANDLE_L99FLT = lookup.findVirtual(L99_FLT_VR_CLS, "getFieldEntry", mt);
10298
VECTOR_DATA_LENGTH_HANDLE_L99FLT = lookup.findVirtual(cls, "vectorDataLength", methodType(long.class));
103-
// DirectIOLucene99FlatVectorsReader
104-
cls = Class.forName("org.elasticsearch.index.codec.vectors.es818.DirectIOLucene99FlatVectorsReader$FieldEntry");
105-
lookup = MethodHandles.privateLookupIn(DIOL99_FLT_VR_CLS, MethodHandles.lookup());
106-
mt = methodType(cls, String.class, VectorEncoding.class);
107-
GET_FIELD_ENTRY_HANDLE_DIOL99FLT = lookup.findVirtual(DIOL99_FLT_VR_CLS, "getFieldEntry", mt);
108-
VECTOR_DATA_LENGTH_HANDLE_DIOL99FLT = lookup.findVirtual(cls, "vectorDataLength", methodType(long.class));
10999
// Lucene99HnswVectorsReader
110100
cls = Class.forName("org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsReader$FieldEntry");
111101
lookup = MethodHandles.privateLookupIn(L99_HNSW_VR_CLS, MethodHandles.lookup());
@@ -182,18 +172,6 @@ static Map<String, Long> getOffHeapByteSizeF99FLT(Lucene99FlatVectorsReader read
182172
throw new AssertionError("should not reach here");
183173
}
184174

185-
@SuppressForbidden(reason = "static type is not accessible")
186-
static Map<String, Long> getOffHeapByteSizeF99FLT(DirectIOLucene99FlatVectorsReader reader, FieldInfo fieldInfo) {
187-
try {
188-
var entry = GET_FIELD_ENTRY_HANDLE_DIOL99FLT.invoke(reader, fieldInfo.name, fieldInfo.getVectorEncoding());
189-
long len = (long) VECTOR_DATA_LENGTH_HANDLE_DIOL99FLT.invoke(entry);
190-
return Map.of(FLAT_VECTOR_DATA_EXTENSION, len);
191-
} catch (Throwable t) {
192-
handleThrowable(t);
193-
}
194-
throw new AssertionError("should not reach here");
195-
}
196-
197175
@SuppressForbidden(reason = "static type is not accessible")
198176
static Map<String, Long> getOffHeapByteSizeL99HNSW(Lucene99HnswVectorsReader reader, FieldInfo fieldInfo) {
199177
try {

server/src/test/java/org/elasticsearch/index/codec/vectors/es818/ES818BinaryQuantizedVectorsFormatTests.java

Lines changed: 82 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,37 @@
3535
import org.apache.lucene.index.KnnVectorValues;
3636
import org.apache.lucene.index.LeafReader;
3737
import org.apache.lucene.index.VectorSimilarityFunction;
38+
import org.apache.lucene.misc.store.DirectIODirectory;
3839
import org.apache.lucene.search.IndexSearcher;
3940
import org.apache.lucene.search.KnnFloatVectorQuery;
4041
import org.apache.lucene.search.Query;
4142
import org.apache.lucene.search.TopDocs;
4243
import org.apache.lucene.search.TotalHits;
4344
import org.apache.lucene.store.Directory;
45+
import org.apache.lucene.store.FSDirectory;
46+
import org.apache.lucene.store.IOContext;
47+
import org.apache.lucene.store.IndexOutput;
48+
import org.apache.lucene.store.MMapDirectory;
4449
import org.apache.lucene.tests.index.BaseKnnVectorsFormatTestCase;
50+
import org.apache.lucene.tests.store.MockDirectoryWrapper;
4551
import org.apache.lucene.tests.util.TestUtil;
4652
import org.elasticsearch.common.logging.LogConfigurator;
53+
import org.elasticsearch.common.settings.Settings;
54+
import org.elasticsearch.index.IndexModule;
55+
import org.elasticsearch.index.IndexSettings;
4756
import org.elasticsearch.index.codec.vectors.BQVectorUtils;
4857
import org.elasticsearch.index.codec.vectors.OptimizedScalarQuantizer;
4958
import org.elasticsearch.index.codec.vectors.reflect.OffHeapByteSizeUtils;
59+
import org.elasticsearch.index.shard.ShardId;
60+
import org.elasticsearch.index.shard.ShardPath;
61+
import org.elasticsearch.index.store.FsDirectoryFactory;
62+
import org.elasticsearch.test.IndexSettingsModule;
5063

5164
import java.io.IOException;
65+
import java.nio.file.Files;
66+
import java.nio.file.Path;
5267
import java.util.Locale;
68+
import java.util.OptionalLong;
5369

5470
import static java.lang.String.format;
5571
import static org.apache.lucene.index.VectorSimilarityFunction.DOT_PRODUCT;
@@ -183,8 +199,28 @@ public void testQuantizedVectorsWriteAndRead() throws IOException {
183199
}
184200

185201
public void testSimpleOffHeapSize() throws IOException {
202+
try (Directory dir = newDirectory()) {
203+
testSimpleOffHeapSizeImpl(dir, newIndexWriterConfig(), true);
204+
}
205+
}
206+
207+
public void testSimpleOffHeapSizeFSDir() throws IOException {
208+
checkDirectIOSupported();
209+
var config = newIndexWriterConfig().setUseCompoundFile(false); // avoid compound files to allow directIO
210+
try (Directory dir = newFSDirectory()) {
211+
testSimpleOffHeapSizeImpl(dir, config, false);
212+
}
213+
}
214+
215+
public void testSimpleOffHeapSizeMMapDir() throws IOException {
216+
try (Directory dir = newMMapDirectory()) {
217+
testSimpleOffHeapSizeImpl(dir, newIndexWriterConfig(), true);
218+
}
219+
}
220+
221+
public void testSimpleOffHeapSizeImpl(Directory dir, IndexWriterConfig config, boolean expectVecOffHeap) throws IOException {
186222
float[] vector = randomVector(random().nextInt(12, 500));
187-
try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, newIndexWriterConfig())) {
223+
try (IndexWriter w = new IndexWriter(dir, config)) {
188224
Document doc = new Document();
189225
doc.add(new KnnFloatVectorField("f", vector, DOT_PRODUCT));
190226
w.addDocument(doc);
@@ -198,11 +234,54 @@ public void testSimpleOffHeapSize() throws IOException {
198234
}
199235
var fieldInfo = r.getFieldInfos().fieldInfo("f");
200236
var offHeap = OffHeapByteSizeUtils.getOffHeapByteSize(knnVectorsReader, fieldInfo);
201-
assertEquals(2, offHeap.size());
202-
assertEquals(vector.length * Float.BYTES, (long) offHeap.get("vec"));
237+
assertEquals(expectVecOffHeap ? 2 : 1, offHeap.size());
203238
assertTrue(offHeap.get("veb") > 0L);
239+
if (expectVecOffHeap) {
240+
assertEquals(vector.length * Float.BYTES, (long) offHeap.get("vec"));
241+
}
204242
}
205243
}
206244
}
207245
}
246+
247+
static Directory newMMapDirectory() throws IOException {
248+
Directory dir = new MMapDirectory(createTempDir("ES818BinaryQuantizedVectorsFormatTests"));
249+
if (random().nextBoolean()) {
250+
dir = new MockDirectoryWrapper(random(), dir);
251+
}
252+
return dir;
253+
}
254+
255+
private Directory newFSDirectory() throws IOException {
256+
Settings settings = Settings.builder()
257+
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.HYBRIDFS.name().toLowerCase(Locale.ROOT))
258+
.build();
259+
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("foo", settings);
260+
Path tempDir = createTempDir().resolve(idxSettings.getUUID()).resolve("0");
261+
Files.createDirectories(tempDir);
262+
ShardPath path = new ShardPath(false, tempDir, tempDir, new ShardId(idxSettings.getIndex(), 0));
263+
Directory dir = (new FsDirectoryFactory()).newDirectory(idxSettings, path);
264+
if (random().nextBoolean()) {
265+
dir = new MockDirectoryWrapper(random(), dir);
266+
}
267+
return dir;
268+
}
269+
270+
static void checkDirectIOSupported() {
271+
Path path = createTempDir("directIOProbe");
272+
try (Directory dir = open(path); IndexOutput out = dir.createOutput("out", IOContext.DEFAULT)) {
273+
out.writeString("test");
274+
} catch (IOException e) {
275+
assumeNoException("test requires a filesystem that supports Direct IO", e);
276+
}
277+
}
278+
279+
static DirectIODirectory open(Path path) throws IOException {
280+
return new DirectIODirectory(FSDirectory.open(path)) {
281+
@Override
282+
protected boolean useDirectIO(String name, IOContext context, OptionalLong fileLength) {
283+
return true;
284+
}
285+
};
286+
}
208287
}

server/src/test/java/org/elasticsearch/index/codec/vectors/es818/ES818HnswBinaryQuantizedVectorsFormatTests.java

Lines changed: 83 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,37 @@
3232
import org.apache.lucene.index.FloatVectorValues;
3333
import org.apache.lucene.index.IndexReader;
3434
import org.apache.lucene.index.IndexWriter;
35+
import org.apache.lucene.index.IndexWriterConfig;
3536
import org.apache.lucene.index.KnnVectorValues;
3637
import org.apache.lucene.index.LeafReader;
3738
import org.apache.lucene.index.VectorSimilarityFunction;
39+
import org.apache.lucene.misc.store.DirectIODirectory;
3840
import org.apache.lucene.search.TopDocs;
3941
import org.apache.lucene.store.Directory;
42+
import org.apache.lucene.store.FSDirectory;
43+
import org.apache.lucene.store.IOContext;
44+
import org.apache.lucene.store.IndexOutput;
45+
import org.apache.lucene.store.MMapDirectory;
4046
import org.apache.lucene.tests.index.BaseKnnVectorsFormatTestCase;
47+
import org.apache.lucene.tests.store.MockDirectoryWrapper;
4148
import org.apache.lucene.tests.util.TestUtil;
4249
import org.apache.lucene.util.SameThreadExecutorService;
4350
import org.elasticsearch.common.logging.LogConfigurator;
51+
import org.elasticsearch.common.settings.Settings;
52+
import org.elasticsearch.index.IndexModule;
53+
import org.elasticsearch.index.IndexSettings;
4454
import org.elasticsearch.index.codec.vectors.reflect.OffHeapByteSizeUtils;
55+
import org.elasticsearch.index.shard.ShardId;
56+
import org.elasticsearch.index.shard.ShardPath;
57+
import org.elasticsearch.index.store.FsDirectoryFactory;
58+
import org.elasticsearch.test.IndexSettingsModule;
4559

4660
import java.io.IOException;
61+
import java.nio.file.Files;
62+
import java.nio.file.Path;
4763
import java.util.Arrays;
4864
import java.util.Locale;
65+
import java.util.OptionalLong;
4966

5067
import static java.lang.String.format;
5168
import static org.apache.lucene.index.VectorSimilarityFunction.DOT_PRODUCT;
@@ -134,8 +151,28 @@ public void testVectorSimilarityFuncs() {
134151
}
135152

136153
public void testSimpleOffHeapSize() throws IOException {
154+
try (Directory dir = newDirectory()) {
155+
testSimpleOffHeapSizeImpl(dir, newIndexWriterConfig(), true);
156+
}
157+
}
158+
159+
public void testSimpleOffHeapSizeFSDir() throws IOException {
160+
checkDirectIOSupported();
161+
var config = newIndexWriterConfig().setUseCompoundFile(false); // avoid compound files to allow directIO
162+
try (Directory dir = newFSDirectory()) {
163+
testSimpleOffHeapSizeImpl(dir, config, false);
164+
}
165+
}
166+
167+
public void testSimpleOffHeapSizeMMapDir() throws IOException {
168+
try (Directory dir = newMMapDirectory()) {
169+
testSimpleOffHeapSizeImpl(dir, newIndexWriterConfig(), true);
170+
}
171+
}
172+
173+
public void testSimpleOffHeapSizeImpl(Directory dir, IndexWriterConfig config, boolean expectVecOffHeap) throws IOException {
137174
float[] vector = randomVector(random().nextInt(12, 500));
138-
try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, newIndexWriterConfig())) {
175+
try (IndexWriter w = new IndexWriter(dir, config)) {
139176
Document doc = new Document();
140177
doc.add(new KnnFloatVectorField("f", vector, DOT_PRODUCT));
141178
w.addDocument(doc);
@@ -149,12 +186,55 @@ public void testSimpleOffHeapSize() throws IOException {
149186
}
150187
var fieldInfo = r.getFieldInfos().fieldInfo("f");
151188
var offHeap = OffHeapByteSizeUtils.getOffHeapByteSize(knnVectorsReader, fieldInfo);
152-
assertEquals(3, offHeap.size());
153-
assertEquals(vector.length * Float.BYTES, (long) offHeap.get("vec"));
189+
assertEquals(expectVecOffHeap ? 3 : 2, offHeap.size());
154190
assertEquals(1L, (long) offHeap.get("vex"));
155191
assertTrue(offHeap.get("veb") > 0L);
192+
if (expectVecOffHeap) {
193+
assertEquals(vector.length * Float.BYTES, (long) offHeap.get("vec"));
194+
}
156195
}
157196
}
158197
}
159198
}
199+
200+
static Directory newMMapDirectory() throws IOException {
201+
Directory dir = new MMapDirectory(createTempDir("ES818BinaryQuantizedVectorsFormatTests"));
202+
if (random().nextBoolean()) {
203+
dir = new MockDirectoryWrapper(random(), dir);
204+
}
205+
return dir;
206+
}
207+
208+
private Directory newFSDirectory() throws IOException {
209+
Settings settings = Settings.builder()
210+
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.HYBRIDFS.name().toLowerCase(Locale.ROOT))
211+
.build();
212+
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("foo", settings);
213+
Path tempDir = createTempDir().resolve(idxSettings.getUUID()).resolve("0");
214+
Files.createDirectories(tempDir);
215+
ShardPath path = new ShardPath(false, tempDir, tempDir, new ShardId(idxSettings.getIndex(), 0));
216+
Directory dir = (new FsDirectoryFactory()).newDirectory(idxSettings, path);
217+
if (random().nextBoolean()) {
218+
dir = new MockDirectoryWrapper(random(), dir);
219+
}
220+
return dir;
221+
}
222+
223+
static void checkDirectIOSupported() {
224+
Path path = createTempDir("directIOProbe");
225+
try (Directory dir = open(path); IndexOutput out = dir.createOutput("out", IOContext.DEFAULT)) {
226+
out.writeString("test");
227+
} catch (IOException e) {
228+
assumeNoException("test requires a filesystem that supports Direct IO", e);
229+
}
230+
}
231+
232+
static DirectIODirectory open(Path path) throws IOException {
233+
return new DirectIODirectory(FSDirectory.open(path)) {
234+
@Override
235+
protected boolean useDirectIO(String name, IOContext context, OptionalLong fileLength) {
236+
return true;
237+
}
238+
};
239+
}
160240
}

0 commit comments

Comments
 (0)