Skip to content

Commit 7aee580

Browse files
authored
[Remote Vector Index Build] Add metric collection (opensearch-project#2615)
Signed-off-by: owenhalpert <ohalpert@gmail.com>
1 parent 8ef9562 commit 7aee580

File tree

14 files changed

+549
-64
lines changed

14 files changed

+549
-64
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1010
* [Remote Vector Index Build] Move client to separate module [#2603](https://github.com/opensearch-project/k-NN/pull/2603)
1111
* Add filter function to KNNQueryBuilder with unit tests and integration tests [#2599](https://github.com/opensearch-project/k-NN/pull/2599)
1212
* [Lucene On Faiss] Add a new mode, memory-optimized-search enable user to run vector search on FAISS index under memory constrained environment. [#2630](https://github.com/opensearch-project/k-NN/pull/2630)
13+
* [Remote Vector Index Build] Add metric collection for remote build process [#2615](https://github.com/opensearch-project/k-NN/pull/2615)
1314
### Enhancements
1415
### Bug Fixes
1516
* Fixing bug to prevent NullPointerException while doing PUT mappings [#2556](https://github.com/opensearch-project/k-NN/issues/2556)

src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexBuildStrategyFactory.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.lucene.index.FieldInfo;
1010
import org.opensearch.index.IndexSettings;
1111
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
12+
import org.opensearch.knn.index.codec.nativeindex.model.BuildIndexParams;
1213
import org.opensearch.knn.index.codec.nativeindex.remote.RemoteIndexBuildStrategy;
1314
import org.opensearch.knn.index.engine.KNNEngine;
1415
import org.opensearch.knn.index.engine.KNNLibraryIndexingContext;
@@ -42,17 +43,19 @@ public NativeIndexBuildStrategyFactory(Supplier<RepositoriesService> repositorie
4243
}
4344

4445
/**
45-
* @param fieldInfo Field related attributes/info
46-
* @param totalLiveDocs Number of documents with the vector field. This values comes from {@link org.opensearch.knn.index.codec.KNN990Codec.NativeEngines990KnnVectorsWriter#flush}
47-
* and {@link org.opensearch.knn.index.codec.KNN990Codec.NativeEngines990KnnVectorsWriter#mergeOneField}
48-
* @param knnVectorValues An instance of {@link KNNVectorValues} which is used to evaluate the size threshold KNN_REMOTE_VECTOR_BUILD_THRESHOLD
49-
* @return The {@link NativeIndexBuildStrategy} to be used. Intended to be used by {@link NativeIndexWriter}
46+
* @param fieldInfo Field related attributes/info
47+
* @param totalLiveDocs Number of documents with the vector field. This values comes from {@link org.opensearch.knn.index.codec.KNN990Codec.NativeEngines990KnnVectorsWriter#flush}
48+
* and {@link org.opensearch.knn.index.codec.KNN990Codec.NativeEngines990KnnVectorsWriter#mergeOneField}
49+
* @param knnVectorValues An instance of {@link KNNVectorValues} which is used to evaluate the size threshold KNN_REMOTE_VECTOR_BUILD_THRESHOLD
50+
* @param indexInfo An instance of {@link BuildIndexParams} containing relevant index info
51+
* @return The {@link NativeIndexBuildStrategy} to be used. Intended to be used by {@link NativeIndexWriter}
5052
* @throws IOException
5153
*/
5254
public NativeIndexBuildStrategy getBuildStrategy(
5355
final FieldInfo fieldInfo,
5456
final int totalLiveDocs,
55-
final KNNVectorValues<?> knnVectorValues
57+
final KNNVectorValues<?> knnVectorValues,
58+
BuildIndexParams indexInfo
5659
) throws IOException {
5760
final KNNEngine knnEngine = extractKNNEngine(fieldInfo);
5861
boolean isTemplate = fieldInfo.attributes().containsKey(MODEL_ID);

src/main/java/org/opensearch/knn/index/codec/nativeindex/NativeIndexWriter.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public static NativeIndexWriter getWriter(
102102
* @throws IOException
103103
*/
104104
public void flushIndex(final Supplier<KNNVectorValues<?>> knnVectorValuesSupplier, int totalLiveDocs) throws IOException {
105-
buildAndWriteIndex(knnVectorValuesSupplier, totalLiveDocs);
105+
buildAndWriteIndex(knnVectorValuesSupplier, totalLiveDocs, true);
106106
recordRefreshStats();
107107
}
108108

@@ -122,11 +122,12 @@ public void mergeIndex(final Supplier<KNNVectorValues<?>> knnVectorValuesSupplie
122122

123123
long bytesPerVector = knnVectorValues.bytesPerVector();
124124
startMergeStats(totalLiveDocs, bytesPerVector);
125-
buildAndWriteIndex(knnVectorValuesSupplier, totalLiveDocs);
125+
buildAndWriteIndex(knnVectorValuesSupplier, totalLiveDocs, false);
126126
endMergeStats(totalLiveDocs, bytesPerVector);
127127
}
128128

129-
private void buildAndWriteIndex(final Supplier<KNNVectorValues<?>> knnVectorValuesSupplier, int totalLiveDocs) throws IOException {
129+
private void buildAndWriteIndex(final Supplier<KNNVectorValues<?>> knnVectorValuesSupplier, int totalLiveDocs, boolean isFlush)
130+
throws IOException {
130131
if (totalLiveDocs == 0) {
131132
log.debug("No live docs for field {}", fieldInfo.name);
132133
return;
@@ -146,12 +147,14 @@ private void buildAndWriteIndex(final Supplier<KNNVectorValues<?>> knnVectorValu
146147
indexOutputWithBuffer,
147148
knnEngine,
148149
knnVectorValuesSupplier,
149-
totalLiveDocs
150+
totalLiveDocs,
151+
isFlush
150152
);
151153
NativeIndexBuildStrategy indexBuilder = indexBuilderFactory.getBuildStrategy(
152154
fieldInfo,
153155
totalLiveDocs,
154-
knnVectorValuesSupplier.get()
156+
knnVectorValuesSupplier.get(),
157+
nativeIndexParams
155158
);
156159
indexBuilder.buildAndWriteIndex(nativeIndexParams);
157160
CodecUtil.writeFooter(output);
@@ -166,7 +169,8 @@ private BuildIndexParams indexParams(
166169
IndexOutputWithBuffer indexOutputWithBuffer,
167170
KNNEngine knnEngine,
168171
Supplier<KNNVectorValues<?>> knnVectorValuesSupplier,
169-
int totalLiveDocs
172+
int totalLiveDocs,
173+
boolean isFlush
170174
) throws IOException {
171175
final Map<String, Object> parameters;
172176
VectorDataType vectorDataType;
@@ -192,6 +196,7 @@ private BuildIndexParams indexParams(
192196
.knnVectorValuesSupplier(knnVectorValuesSupplier)
193197
.totalLiveDocs(totalLiveDocs)
194198
.segmentWriteState(state)
199+
.isFlush(isFlush)
195200
.build();
196201
}
197202

src/main/java/org/opensearch/knn/index/codec/nativeindex/model/BuildIndexParams.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,5 @@ public class BuildIndexParams {
3636
Supplier<KNNVectorValues<?>> knnVectorValuesSupplier;
3737
int totalLiveDocs;
3838
SegmentWriteState segmentWriteState;
39+
boolean isFlush;
3940
}
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.knn.index.codec.nativeindex.remote;
7+
8+
import lombok.extern.log4j.Log4j2;
9+
import org.opensearch.common.StopWatch;
10+
import org.opensearch.knn.index.codec.nativeindex.model.BuildIndexParams;
11+
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
12+
13+
import java.io.IOException;
14+
15+
import static org.opensearch.knn.index.codec.util.KNNCodecUtil.initializeVectorValues;
16+
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.BUILD_REQUEST_FAILURE_COUNT;
17+
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.BUILD_REQUEST_SUCCESS_COUNT;
18+
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.INDEX_BUILD_FAILURE_COUNT;
19+
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.INDEX_BUILD_SUCCESS_COUNT;
20+
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.READ_FAILURE_COUNT;
21+
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.READ_SUCCESS_COUNT;
22+
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.READ_TIME;
23+
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_CURRENT_FLUSH_OPERATIONS;
24+
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_CURRENT_FLUSH_SIZE;
25+
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_CURRENT_MERGE_OPERATIONS;
26+
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_CURRENT_MERGE_SIZE;
27+
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_FLUSH_TIME;
28+
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_MERGE_TIME;
29+
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.WAITING_TIME;
30+
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.WRITE_FAILURE_COUNT;
31+
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.WRITE_SUCCESS_COUNT;
32+
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.WRITE_TIME;
33+
34+
/**
35+
* Class to handle all metric collection for the remote index build.
36+
* Each phase has its own StopWatch and `start` and `end` methods.
37+
*/
38+
@Log4j2
39+
public class RemoteIndexBuildMetrics {
40+
private final StopWatch overallStopWatch;
41+
private final StopWatch writeStopWatch;
42+
private final StopWatch buildRequestStopWatch;
43+
private final StopWatch waiterStopWatch;
44+
private final StopWatch readStopWatch;
45+
private long size;
46+
private boolean isFlush;
47+
private String fieldName;
48+
49+
public RemoteIndexBuildMetrics() {
50+
this.overallStopWatch = new StopWatch();
51+
this.writeStopWatch = new StopWatch();
52+
this.buildRequestStopWatch = new StopWatch();
53+
this.waiterStopWatch = new StopWatch();
54+
this.readStopWatch = new StopWatch();
55+
}
56+
57+
/**
58+
* Helper method to collect remote index build metrics on start
59+
*/
60+
public void startRemoteIndexBuildMetrics(BuildIndexParams indexInfo) throws IOException {
61+
KNNVectorValues<?> knnVectorValues = indexInfo.getKnnVectorValuesSupplier().get();
62+
initializeVectorValues(knnVectorValues);
63+
this.size = (long) indexInfo.getTotalLiveDocs() * knnVectorValues.bytesPerVector();
64+
this.isFlush = indexInfo.isFlush();
65+
this.fieldName = indexInfo.getFieldName();
66+
overallStopWatch.start();
67+
if (isFlush) {
68+
REMOTE_INDEX_BUILD_CURRENT_FLUSH_OPERATIONS.increment();
69+
REMOTE_INDEX_BUILD_CURRENT_FLUSH_SIZE.incrementBy(size);
70+
} else {
71+
REMOTE_INDEX_BUILD_CURRENT_MERGE_OPERATIONS.increment();
72+
REMOTE_INDEX_BUILD_CURRENT_MERGE_SIZE.incrementBy(size);
73+
}
74+
}
75+
76+
// Repository read phase metric helpers
77+
public void startRepositoryWriteMetrics() {
78+
writeStopWatch.start();
79+
}
80+
81+
public void endRepositoryWriteMetrics(boolean success) {
82+
long time_in_millis = writeStopWatch.stop().totalTime().millis();
83+
if (success) {
84+
WRITE_SUCCESS_COUNT.increment();
85+
WRITE_TIME.incrementBy(time_in_millis);
86+
log.debug("Repository write took {} ms for vector field [{}]", time_in_millis, fieldName);
87+
} else {
88+
WRITE_FAILURE_COUNT.increment();
89+
}
90+
}
91+
92+
// Build request phase metric helpers
93+
public void startBuildRequestMetrics() {
94+
buildRequestStopWatch.start();
95+
}
96+
97+
public void endBuildRequestMetrics(boolean success) {
98+
long time_in_millis = buildRequestStopWatch.stop().totalTime().millis();
99+
if (success) {
100+
BUILD_REQUEST_SUCCESS_COUNT.increment();
101+
log.debug("Submit vector build took {} ms for vector field [{}]", time_in_millis, fieldName);
102+
} else {
103+
BUILD_REQUEST_FAILURE_COUNT.increment();
104+
}
105+
}
106+
107+
// Await index build phase metric helpers
108+
public void startWaitingMetrics() {
109+
waiterStopWatch.start();
110+
}
111+
112+
public void endWaitingMetrics() {
113+
long time_in_millis = waiterStopWatch.stop().totalTime().millis();
114+
WAITING_TIME.incrementBy(time_in_millis);
115+
log.debug("Await vector build took {} ms for vector field [{}]", time_in_millis, fieldName);
116+
}
117+
118+
// Repository read phase metric helpers
119+
public void startRepositoryReadMetrics() {
120+
readStopWatch.start();
121+
}
122+
123+
public void endRepositoryReadMetrics(boolean success) {
124+
long time_in_millis = readStopWatch.stop().totalTime().millis();
125+
if (success) {
126+
READ_SUCCESS_COUNT.increment();
127+
READ_TIME.incrementBy(time_in_millis);
128+
log.debug("Repository read took {} ms for vector field [{}]", time_in_millis, fieldName);
129+
} else {
130+
READ_FAILURE_COUNT.increment();
131+
}
132+
}
133+
134+
/**
135+
* Helper method to collect overall remote index build metrics
136+
*/
137+
public void endRemoteIndexBuildMetrics(boolean wasSuccessful) {
138+
long time_in_millis = overallStopWatch.stop().totalTime().millis();
139+
if (wasSuccessful) {
140+
INDEX_BUILD_SUCCESS_COUNT.increment();
141+
log.debug("Remote index build succeeded after {} ms for vector field [{}]", time_in_millis, fieldName);
142+
} else {
143+
INDEX_BUILD_FAILURE_COUNT.increment();
144+
log.warn("Remote index build failed after {} ms for vector field [{}]", time_in_millis, fieldName);
145+
}
146+
if (isFlush) {
147+
REMOTE_INDEX_BUILD_CURRENT_FLUSH_OPERATIONS.decrement();
148+
REMOTE_INDEX_BUILD_CURRENT_FLUSH_SIZE.decrementBy(size);
149+
REMOTE_INDEX_BUILD_FLUSH_TIME.incrementBy(time_in_millis);
150+
} else {
151+
REMOTE_INDEX_BUILD_CURRENT_MERGE_OPERATIONS.decrement();
152+
REMOTE_INDEX_BUILD_CURRENT_MERGE_SIZE.decrementBy(size);
153+
REMOTE_INDEX_BUILD_MERGE_TIME.incrementBy(time_in_millis);
154+
}
155+
}
156+
}

0 commit comments

Comments
 (0)