Skip to content

Commit ca0eb27

Browse files
committed
Add DfsPhase metrics
1 parent 9b353f6 commit ca0eb27

File tree

9 files changed

+213
-6
lines changed

9 files changed

+213
-6
lines changed

server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/IndicesMetricsIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
6868
static final String STANDARD_INDEX_COUNT = "es.indices.standard.total";
6969
static final String STANDARD_BYTES_SIZE = "es.indices.standard.size";
7070
static final String STANDARD_DOCS_COUNT = "es.indices.standard.docs.total";
71+
static final String STANDARD_DFS_COUNT = "es.indices.standard.dfs.total";
72+
static final String STANDARD_DFS_TIME = "es.indices.standard.dfs.time";
73+
static final String STANDARD_DFS_FAILURE = "es.indices.standard.dfs.failure.total";
7174
static final String STANDARD_QUERY_COUNT = "es.indices.standard.query.total";
7275
static final String STANDARD_QUERY_TIME = "es.indices.standard.query.time";
7376
static final String STANDARD_QUERY_FAILURE = "es.indices.standard.query.failure.total";
@@ -245,6 +248,10 @@ public void testIndicesMetrics() {
245248
telemetry,
246249
1,
247250
Map.of(
251+
STANDARD_DFS_COUNT,
252+
equalTo(search1.getDfsCount()),
253+
STANDARD_DFS_TIME,
254+
equalTo(search1.getDfsTimeInMillis()),
248255
STANDARD_QUERY_COUNT,
249256
equalTo(numStandardIndices),
250257
STANDARD_QUERY_TIME,
@@ -266,6 +273,7 @@ public void testIndicesMetrics() {
266273
)
267274
);
268275

276+
269277
client(searchNode).prepareSearch("time*").setPreference(preference).setSize(100).get().decRef();
270278
var search2 = indicesService.stats(CommonStatsFlags.ALL, false).getSearch().getTotal();
271279
collectThenAssertMetrics(
@@ -334,6 +342,8 @@ public void testIndicesMetrics() {
334342
telemetry,
335343
4,
336344
Map.of(
345+
STANDARD_DFS_FAILURE,
346+
equalTo(0L),
337347
STANDARD_QUERY_FAILURE,
338348
equalTo(0L),
339349
STANDARD_FETCH_FAILURE,

server/src/internalClusterTest/java/org/elasticsearch/search/stats/SearchStatsIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,8 @@ public void testSimpleStats() throws Exception {
130130

131131
IndicesStatsResponse indicesStats = indicesAdmin().prepareStats().get();
132132
logger.debug("###### indices search stats: {}", indicesStats.getTotal().getSearch());
133+
assertThat(indicesStats.getTotal().getSearch().getTotal().getDfsCount(),equalTo(0L));
134+
assertThat(indicesStats.getTotal().getSearch().getTotal().getDfsTimeInMillis(), equalTo(0L));
133135
assertThat(indicesStats.getTotal().getSearch().getTotal().getQueryCount(), greaterThan(0L));
134136
assertThat(indicesStats.getTotal().getSearch().getTotal().getQueryTimeInMillis(), greaterThan(0L));
135137
assertThat(indicesStats.getTotal().getSearch().getTotal().getFetchCount(), greaterThan(0L));

server/src/main/java/org/elasticsearch/index/search/stats/SearchStats.java

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ public class SearchStats implements Writeable, ToXContentFragment {
3030

3131
public static class Stats implements Writeable, ToXContentFragment {
3232

33+
private long dfsCount;
34+
private long dfsTimeInMillis;
35+
private long dfsCurrent;
36+
3337
private long queryCount;
3438
private long queryTimeInMillis;
3539
private long queryCurrent;
@@ -46,6 +50,7 @@ public static class Stats implements Writeable, ToXContentFragment {
4650
private long suggestTimeInMillis;
4751
private long suggestCurrent;
4852

53+
private long dfsFailure;
4954
private long queryFailure;
5055
private long fetchFailure;
5156

@@ -54,6 +59,10 @@ private Stats() {
5459
}
5560

5661
public Stats(
62+
long dfsCount,
63+
long dfsTimeInMillis,
64+
long dfsCurrent,
65+
long dfsFailure,
5766
long queryCount,
5867
long queryTimeInMillis,
5968
long queryCurrent,
@@ -69,6 +78,11 @@ public Stats(
6978
long suggestTimeInMillis,
7079
long suggestCurrent
7180
) {
81+
this.dfsCount = dfsCount;
82+
this.dfsTimeInMillis = dfsTimeInMillis;
83+
this.dfsCurrent = dfsCurrent;
84+
this.dfsFailure = dfsFailure;
85+
7286
this.queryCount = queryCount;
7387
this.queryTimeInMillis = queryTimeInMillis;
7488
this.queryCurrent = queryCurrent;
@@ -89,6 +103,10 @@ public Stats(
89103
}
90104

91105
private Stats(StreamInput in) throws IOException {
106+
dfsCount = in.readVLong();
107+
dfsTimeInMillis = in.readVLong();
108+
dfsCurrent = in.readVLong();
109+
92110
queryCount = in.readVLong();
93111
queryTimeInMillis = in.readVLong();
94112
queryCurrent = in.readVLong();
@@ -113,6 +131,10 @@ private Stats(StreamInput in) throws IOException {
113131

114132
@Override
115133
public void writeTo(StreamOutput out) throws IOException {
134+
out.writeVLong(dfsCount);
135+
out.writeVLong(dfsTimeInMillis);
136+
out.writeVLong(dfsCurrent);
137+
116138
out.writeVLong(queryCount);
117139
out.writeVLong(queryTimeInMillis);
118140
out.writeVLong(queryCurrent);
@@ -130,12 +152,18 @@ public void writeTo(StreamOutput out) throws IOException {
130152
out.writeVLong(suggestCurrent);
131153

132154
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
155+
out.writeVLong(dfsFailure);
133156
out.writeVLong(queryFailure);
134157
out.writeVLong(fetchFailure);
135158
}
136159
}
137160

138161
public void add(Stats stats) {
162+
dfsCount += stats.dfsCount;
163+
dfsTimeInMillis += stats.dfsTimeInMillis;
164+
dfsCurrent += stats.dfsCurrent;
165+
dfsFailure += stats.dfsFailure;
166+
139167
queryCount += stats.queryCount;
140168
queryTimeInMillis += stats.queryTimeInMillis;
141169
queryCurrent += stats.queryCurrent;
@@ -156,6 +184,10 @@ public void add(Stats stats) {
156184
}
157185

158186
public void addForClosingShard(Stats stats) {
187+
dfsCount += stats.dfsCount;
188+
dfsTimeInMillis += stats.dfsTimeInMillis;
189+
dfsFailure += stats.dfsFailure;
190+
159191
queryCount += stats.queryCount;
160192
queryTimeInMillis += stats.queryTimeInMillis;
161193
queryFailure += stats.queryFailure;
@@ -173,6 +205,26 @@ public void addForClosingShard(Stats stats) {
173205
suggestTimeInMillis += stats.suggestTimeInMillis;
174206
}
175207

208+
public long getDfsCount() {
209+
return dfsCount;
210+
}
211+
212+
public TimeValue getDfsTime() {
213+
return new TimeValue(dfsTimeInMillis);
214+
}
215+
216+
public long getDfsTimeInMillis() {
217+
return dfsTimeInMillis;
218+
}
219+
220+
public long getDfsCurrent() {
221+
return dfsCurrent;
222+
}
223+
224+
public long getDfsFailure() {
225+
return dfsFailure;
226+
}
227+
176228
public long getQueryCount() {
177229
return queryCount;
178230
}
@@ -251,6 +303,11 @@ public static Stats readStats(StreamInput in) throws IOException {
251303

252304
@Override
253305
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
306+
builder.field(Fields.DFS_TOTAL, dfsCount);
307+
builder.humanReadableField(Fields.DFS_TIME_IN_MILLIS, Fields.DFS_TIME, getDfsTime());
308+
builder.field(Fields.DFS_CURRENT, dfsCurrent);
309+
builder.field(Fields.DFS_FAILURE, dfsFailure);
310+
254311
builder.field(Fields.QUERY_TOTAL, queryCount);
255312
builder.humanReadableField(Fields.QUERY_TIME_IN_MILLIS, Fields.QUERY_TIME, getQueryTime());
256313
builder.field(Fields.QUERY_CURRENT, queryCurrent);
@@ -277,7 +334,11 @@ public boolean equals(Object o) {
277334
if (this == o) return true;
278335
if (o == null || getClass() != o.getClass()) return false;
279336
Stats that = (Stats) o;
280-
return queryCount == that.queryCount
337+
return dfsCount == that.dfsCount
338+
&& dfsTimeInMillis == that.dfsTimeInMillis
339+
&& dfsCurrent == that.dfsCurrent
340+
&& dfsFailure == that.dfsFailure
341+
&& queryCount == that.queryCount
281342
&& queryTimeInMillis == that.queryTimeInMillis
282343
&& queryCurrent == that.queryCurrent
283344
&& queryFailure == that.queryFailure
@@ -296,6 +357,10 @@ public boolean equals(Object o) {
296357
@Override
297358
public int hashCode() {
298359
return Objects.hash(
360+
dfsCount,
361+
dfsTimeInMillis,
362+
dfsCurrent,
363+
dfsFailure,
299364
queryCount,
300365
queryTimeInMillis,
301366
queryCurrent,
@@ -409,6 +474,11 @@ static final class Fields {
409474
static final String SEARCH = "search";
410475
static final String OPEN_CONTEXTS = "open_contexts";
411476
static final String GROUPS = "groups";
477+
static final String DFS_TOTAL = "dfs_total";
478+
static final String DFS_TIME = "dfs_time";
479+
static final String DFS_TIME_IN_MILLIS = "dfs_time_in_millis";
480+
static final String DFS_CURRENT = "dfs_current";
481+
static final String DFS_FAILURE = "dfs_failure";
412482
static final String QUERY_TOTAL = "query_total";
413483
static final String QUERY_TIME = "query_time";
414484
static final String QUERY_TIME_IN_MILLIS = "query_time_in_millis";

server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,27 @@ public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
107107
});
108108
}
109109

110+
@Override
111+
public void onPreDfsPhase(SearchContext searchContext) {
112+
computeStats(searchContext, statsHolder -> statsHolder.dfsCurrent.inc());
113+
}
114+
115+
@Override
116+
public void onDfsPhase(SearchContext searchContext, long tookInNanos) {
117+
computeStats(searchContext, statsHolder -> {
118+
statsHolder.dfsMetric.inc(tookInNanos);
119+
statsHolder.dfsCurrent.dec();
120+
});
121+
}
122+
123+
@Override
124+
public void onFailedDfsPhase(SearchContext searchContext) {
125+
computeStats(searchContext, statsHolder -> {
126+
statsHolder.dfsCurrent.dec();
127+
statsHolder.dfsFailure.inc();
128+
});
129+
}
130+
110131
private void computeStats(SearchContext searchContext, Consumer<StatsHolder> consumer) {
111132
consumer.accept(totalStats);
112133
var groupStats = searchContext.groupStats();
@@ -154,6 +175,7 @@ public void onFreeScrollContext(ReaderContext readerContext) {
154175
}
155176

156177
static final class StatsHolder {
178+
final MeanMetric dfsMetric = new MeanMetric();
157179
final MeanMetric queryMetric = new MeanMetric();
158180
final MeanMetric fetchMetric = new MeanMetric();
159181
/* We store scroll statistics in microseconds because with nanoseconds we run the risk of overflowing the total stats if there are
@@ -165,16 +187,22 @@ static final class StatsHolder {
165187
*/
166188
final MeanMetric scrollMetric = new MeanMetric();
167189
final MeanMetric suggestMetric = new MeanMetric();
190+
final CounterMetric dfsCurrent = new CounterMetric();
168191
final CounterMetric queryCurrent = new CounterMetric();
169192
final CounterMetric fetchCurrent = new CounterMetric();
170193
final CounterMetric scrollCurrent = new CounterMetric();
171194
final CounterMetric suggestCurrent = new CounterMetric();
172195

196+
final CounterMetric dfsFailure = new CounterMetric();
173197
final CounterMetric queryFailure = new CounterMetric();
174198
final CounterMetric fetchFailure = new CounterMetric();
175199

176200
SearchStats.Stats stats() {
177201
return new SearchStats.Stats(
202+
dfsMetric.count(),
203+
TimeUnit.NANOSECONDS.toMillis(dfsMetric.sum()),
204+
dfsCurrent.count(),
205+
dfsFailure.count(),
178206
queryMetric.count(),
179207
TimeUnit.NANOSECONDS.toMillis(queryMetric.sum()),
180208
queryCurrent.count(),

server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,28 @@ default void onFailedFetchPhase(SearchContext searchContext) {}
6565
*/
6666
default void onFetchPhase(SearchContext searchContext, long tookInNanos) {}
6767

68+
/**
69+
* Executed before the DFS phase is executed
70+
* @param searchContext the current search context
71+
*/
72+
default void onPreDfsPhase(SearchContext searchContext) {}
73+
74+
/**
75+
* Executed after the query DFS successfully finished.
76+
* Note: this is not invoked if the DFS phase execution failed.
77+
* @param searchContext the current search context
78+
* @param tookInNanos the number of nanoseconds the query execution took
79+
*
80+
* @see #onFailedQueryPhase(SearchContext)
81+
*/
82+
default void onDfsPhase(SearchContext searchContext, long tookInNanos) {}
83+
84+
/**
85+
* Executed if a dfs phased failed.
86+
* @param searchContext the current search context
87+
*/
88+
default void onFailedDfsPhase(SearchContext searchContext) {}
89+
6890
/**
6991
* Executed when a new reader context was created
7092
* @param readerContext the created context
@@ -182,6 +204,39 @@ public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
182204
}
183205
}
184206

207+
@Override
208+
public void onPreDfsPhase(SearchContext searchContext) {
209+
for (SearchOperationListener listener : listeners) {
210+
try {
211+
listener.onPreDfsPhase(searchContext);
212+
} catch (Exception e) {
213+
logger.warn(() -> "onPreDfsPhase listener [" + listener + "] failed", e);
214+
}
215+
}
216+
}
217+
218+
@Override
219+
public void onFailedDfsPhase(SearchContext searchContext) {
220+
for (SearchOperationListener listener : listeners) {
221+
try {
222+
listener.onFailedDfsPhase(searchContext);
223+
} catch (Exception e) {
224+
logger.warn(() -> "onFailedDfsPhase listener [" + listener + "] failed", e);
225+
}
226+
}
227+
}
228+
229+
@Override
230+
public void onDfsPhase(SearchContext searchContext, long tookInNanos) {
231+
for (SearchOperationListener listener : listeners) {
232+
try {
233+
listener.onDfsPhase(searchContext, tookInNanos);
234+
} catch (Exception e) {
235+
logger.warn(() -> "onDfsPhase listener [" + listener + "] failed", e);
236+
}
237+
}
238+
}
239+
185240
@Override
186241
public void onNewReaderContext(ReaderContext readerContext) {
187242
for (SearchOperationListener listener : listeners) {

server/src/main/java/org/elasticsearch/monitor/metrics/IndicesMetrics.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public IndicesMetrics(MeterRegistry meterRegistry, IndicesService indicesService
5353
}
5454

5555
private static List<AutoCloseable> registerAsyncMetrics(MeterRegistry registry, IndicesStatsCache cache) {
56-
final int TOTAL_METRICS = 52;
56+
final int TOTAL_METRICS = 64;
5757
List<AutoCloseable> metrics = new ArrayList<>(TOTAL_METRICS);
5858
for (IndexMode indexMode : IndexMode.values()) {
5959
String name = indexMode.getName();
@@ -131,6 +131,30 @@ private static List<AutoCloseable> registerAsyncMetrics(MeterRegistry registry,
131131
diffGauge(() -> cache.getOrRefresh().get(indexMode).search.getFetchFailure())
132132
)
133133
);
134+
metrics.add(
135+
registry.registerLongGauge(
136+
"es.indices." + name + ".dfs.total",
137+
"current fetches of " + name + " indices",
138+
"unit",
139+
diffGauge(() -> cache.getOrRefresh().get(indexMode).search.getDfsCount())
140+
)
141+
);
142+
metrics.add(
143+
registry.registerLongGauge(
144+
"es.indices." + name + ".dfs.time",
145+
"current fetch time of " + name + " indices",
146+
"ms",
147+
diffGauge(() -> cache.getOrRefresh().get(indexMode).search.getDfsTimeInMillis())
148+
)
149+
);
150+
metrics.add(
151+
registry.registerLongGauge(
152+
"es.indices." + name + ".dfs.failure.total",
153+
"current fetch failures of " + name + " indices",
154+
"unit",
155+
diffGauge(() -> cache.getOrRefresh().get(indexMode).search.getDfsFailure())
156+
)
157+
);
134158
// indexing
135159
metrics.add(
136160
registry.registerLongGauge(

0 commit comments

Comments
 (0)