Skip to content

Commit 46e6420

Browse files
authored
Add query fetch and indexing metrics per index mode (#113978) (#114140)
This change exposes query, fetch and indexing metrics for each index mode.
1 parent 318188d commit 46e6420

File tree

2 files changed

+348
-12
lines changed

2 files changed

+348
-12
lines changed

server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/IndicesMetricsIT.java

Lines changed: 262 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,38 @@
99

1010
package org.elasticsearch.monitor.metrics;
1111

12+
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
13+
import org.elasticsearch.cluster.metadata.IndexMetadata;
1214
import org.elasticsearch.common.settings.Setting;
1315
import org.elasticsearch.common.settings.Settings;
1416
import org.elasticsearch.core.TimeValue;
17+
import org.elasticsearch.index.mapper.OnScriptError;
18+
import org.elasticsearch.index.query.RangeQueryBuilder;
19+
import org.elasticsearch.indices.IndicesService;
1520
import org.elasticsearch.plugins.Plugin;
1621
import org.elasticsearch.plugins.PluginsService;
22+
import org.elasticsearch.plugins.ScriptPlugin;
23+
import org.elasticsearch.script.LongFieldScript;
24+
import org.elasticsearch.script.ScriptContext;
25+
import org.elasticsearch.script.ScriptEngine;
26+
import org.elasticsearch.search.lookup.SearchLookup;
1727
import org.elasticsearch.telemetry.Measurement;
1828
import org.elasticsearch.telemetry.TestTelemetryPlugin;
1929
import org.elasticsearch.test.ESIntegTestCase;
30+
import org.elasticsearch.xcontent.XContentParser;
31+
import org.elasticsearch.xcontent.json.JsonXContent;
2032
import org.hamcrest.Matcher;
2133

34+
import java.io.IOException;
2235
import java.util.Collection;
2336
import java.util.List;
2437
import java.util.Map;
38+
import java.util.Set;
2539

2640
import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
2741
import static org.hamcrest.Matchers.equalTo;
2842
import static org.hamcrest.Matchers.greaterThan;
43+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
2944
import static org.hamcrest.Matchers.hasSize;
3045

3146
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
@@ -42,7 +57,7 @@ public List<Setting<?>> getSettings() {
4257

4358
@Override
4459
protected Collection<Class<? extends Plugin>> nodePlugins() {
45-
return List.of(TestTelemetryPlugin.class, TestAPMInternalSettings.class);
60+
return List.of(TestTelemetryPlugin.class, TestAPMInternalSettings.class, FailingFieldPlugin.class);
4661
}
4762

4863
@Override
@@ -54,27 +69,57 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
5469
}
5570

5671
static final String STANDARD_INDEX_COUNT = "es.indices.standard.total";
72+
static final String STANDARD_BYTES_SIZE = "es.indices.standard.size";
5773
static final String STANDARD_DOCS_COUNT = "es.indices.standard.docs.total";
58-
static final String STANDARD_BYTES_SIZE = "es.indices.standard.bytes.total";
74+
static final String STANDARD_QUERY_COUNT = "es.indices.standard.query.total";
75+
static final String STANDARD_QUERY_TIME = "es.indices.standard.query.time";
76+
static final String STANDARD_QUERY_FAILURE = "es.indices.standard.query.failure.total";
77+
static final String STANDARD_FETCH_COUNT = "es.indices.standard.fetch.total";
78+
static final String STANDARD_FETCH_TIME = "es.indices.standard.fetch.time";
79+
static final String STANDARD_FETCH_FAILURE = "es.indices.standard.fetch.failure.total";
80+
static final String STANDARD_INDEXING_COUNT = "es.indices.standard.indexing.total";
81+
static final String STANDARD_INDEXING_TIME = "es.indices.standard.indexing.time";
82+
static final String STANDARD_INDEXING_FAILURE = "es.indices.standard.indexing.failure.total";
5983

6084
static final String TIME_SERIES_INDEX_COUNT = "es.indices.time_series.total";
85+
static final String TIME_SERIES_BYTES_SIZE = "es.indices.time_series.size";
6186
static final String TIME_SERIES_DOCS_COUNT = "es.indices.time_series.docs.total";
62-
static final String TIME_SERIES_BYTES_SIZE = "es.indices.time_series.bytes.total";
87+
static final String TIME_SERIES_QUERY_COUNT = "es.indices.time_series.query.total";
88+
static final String TIME_SERIES_QUERY_TIME = "es.indices.time_series.query.time";
89+
static final String TIME_SERIES_QUERY_FAILURE = "es.indices.time_series.query.failure.total";
90+
static final String TIME_SERIES_FETCH_COUNT = "es.indices.time_series.fetch.total";
91+
static final String TIME_SERIES_FETCH_TIME = "es.indices.time_series.fetch.time";
92+
static final String TIME_SERIES_FETCH_FAILURE = "es.indices.time_series.fetch.failure.total";
93+
static final String TIME_SERIES_INDEXING_COUNT = "es.indices.time_series.indexing.total";
94+
static final String TIME_SERIES_INDEXING_TIME = "es.indices.time_series.indexing.time";
95+
static final String TIME_SERIES_INDEXING_FAILURE = "es.indices.time_series.indexing.failure.total";
6396

6497
static final String LOGSDB_INDEX_COUNT = "es.indices.logsdb.total";
98+
static final String LOGSDB_BYTES_SIZE = "es.indices.logsdb.size";
6599
static final String LOGSDB_DOCS_COUNT = "es.indices.logsdb.docs.total";
66-
static final String LOGSDB_BYTES_SIZE = "es.indices.logsdb.bytes.total";
100+
static final String LOGSDB_QUERY_COUNT = "es.indices.logsdb.query.total";
101+
static final String LOGSDB_QUERY_TIME = "es.indices.logsdb.query.time";
102+
static final String LOGSDB_QUERY_FAILURE = "es.indices.logsdb.query.failure.total";
103+
static final String LOGSDB_FETCH_COUNT = "es.indices.logsdb.fetch.total";
104+
static final String LOGSDB_FETCH_TIME = "es.indices.logsdb.fetch.time";
105+
static final String LOGSDB_FETCH_FAILURE = "es.indices.logsdb.fetch.failure.total";
106+
static final String LOGSDB_INDEXING_COUNT = "es.indices.logsdb.indexing.total";
107+
static final String LOGSDB_INDEXING_TIME = "es.indices.logsdb.indexing.time";
108+
static final String LOGSDB_INDEXING_FAILURE = "es.indices.logsdb.indexing.failure.total";
67109

68-
public void testIndicesMetrics() {
110+
public void testIndicesMetrics() throws Exception {
69111
String node = internalCluster().startNode();
70112
ensureStableCluster(1);
71113
final TestTelemetryPlugin telemetry = internalCluster().getInstance(PluginsService.class, node)
72114
.filterPlugins(TestTelemetryPlugin.class)
73115
.findFirst()
74116
.orElseThrow();
117+
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
118+
var indexing0 = indicesService.stats(CommonStatsFlags.ALL, false).getIndexing().getTotal();
75119
telemetry.resetMeter();
76120
long numStandardIndices = randomIntBetween(1, 5);
77121
long numStandardDocs = populateStandardIndices(numStandardIndices);
122+
var indexing1 = indicesService.stats(CommonStatsFlags.ALL, false).getIndexing().getTotal();
78123
collectThenAssertMetrics(
79124
telemetry,
80125
1,
@@ -104,6 +149,7 @@ public void testIndicesMetrics() {
104149

105150
long numTimeSeriesIndices = randomIntBetween(1, 5);
106151
long numTimeSeriesDocs = populateTimeSeriesIndices(numTimeSeriesIndices);
152+
var indexing2 = indicesService.stats(CommonStatsFlags.ALL, false).getIndexing().getTotal();
107153
collectThenAssertMetrics(
108154
telemetry,
109155
2,
@@ -133,6 +179,7 @@ public void testIndicesMetrics() {
133179

134180
long numLogsdbIndices = randomIntBetween(1, 5);
135181
long numLogsdbDocs = populateLogsdbIndices(numLogsdbIndices);
182+
var indexing3 = indicesService.stats(CommonStatsFlags.ALL, false).getIndexing().getTotal();
136183
collectThenAssertMetrics(
137184
telemetry,
138185
3,
@@ -159,6 +206,142 @@ public void testIndicesMetrics() {
159206
greaterThan(0L)
160207
)
161208
);
209+
// indexing stats
210+
collectThenAssertMetrics(
211+
telemetry,
212+
4,
213+
Map.of(
214+
STANDARD_INDEXING_COUNT,
215+
equalTo(numStandardDocs),
216+
STANDARD_INDEXING_TIME,
217+
greaterThanOrEqualTo(0L),
218+
STANDARD_INDEXING_FAILURE,
219+
equalTo(indexing1.getIndexFailedCount() - indexing0.getIndexCount()),
220+
221+
TIME_SERIES_INDEXING_COUNT,
222+
equalTo(numTimeSeriesDocs),
223+
TIME_SERIES_INDEXING_TIME,
224+
greaterThanOrEqualTo(0L),
225+
TIME_SERIES_INDEXING_FAILURE,
226+
equalTo(indexing2.getIndexFailedCount() - indexing1.getIndexFailedCount()),
227+
228+
LOGSDB_INDEXING_COUNT,
229+
equalTo(numLogsdbDocs),
230+
LOGSDB_INDEXING_TIME,
231+
greaterThanOrEqualTo(0L),
232+
LOGSDB_INDEXING_FAILURE,
233+
equalTo(indexing3.getIndexFailedCount() - indexing2.getIndexFailedCount())
234+
)
235+
);
236+
telemetry.resetMeter();
237+
238+
// search and fetch
239+
client().prepareSearch("standard*").setSize(100).get().decRef();
240+
var nodeStats1 = indicesService.stats(CommonStatsFlags.ALL, false).getSearch().getTotal();
241+
collectThenAssertMetrics(
242+
telemetry,
243+
1,
244+
Map.of(
245+
STANDARD_QUERY_COUNT,
246+
equalTo(numStandardIndices),
247+
STANDARD_QUERY_TIME,
248+
equalTo(nodeStats1.getQueryTimeInMillis()),
249+
STANDARD_FETCH_COUNT,
250+
equalTo(nodeStats1.getFetchCount()),
251+
STANDARD_FETCH_TIME,
252+
equalTo(nodeStats1.getFetchTimeInMillis()),
253+
254+
TIME_SERIES_QUERY_COUNT,
255+
equalTo(0L),
256+
TIME_SERIES_QUERY_TIME,
257+
equalTo(0L),
258+
259+
LOGSDB_QUERY_COUNT,
260+
equalTo(0L),
261+
LOGSDB_QUERY_TIME,
262+
equalTo(0L)
263+
)
264+
);
265+
266+
client().prepareSearch("time*").setSize(100).get().decRef();
267+
var nodeStats2 = indicesService.stats(CommonStatsFlags.ALL, false).getSearch().getTotal();
268+
collectThenAssertMetrics(
269+
telemetry,
270+
2,
271+
Map.of(
272+
STANDARD_QUERY_COUNT,
273+
equalTo(numStandardIndices),
274+
STANDARD_QUERY_TIME,
275+
equalTo(nodeStats1.getQueryTimeInMillis()),
276+
277+
TIME_SERIES_QUERY_COUNT,
278+
equalTo(numTimeSeriesIndices),
279+
TIME_SERIES_QUERY_TIME,
280+
equalTo(nodeStats2.getQueryTimeInMillis() - nodeStats1.getQueryTimeInMillis()),
281+
TIME_SERIES_FETCH_COUNT,
282+
equalTo(nodeStats2.getFetchCount() - nodeStats1.getFetchCount()),
283+
TIME_SERIES_FETCH_TIME,
284+
equalTo(nodeStats2.getFetchTimeInMillis() - nodeStats1.getFetchTimeInMillis()),
285+
286+
LOGSDB_QUERY_COUNT,
287+
equalTo(0L),
288+
LOGSDB_QUERY_TIME,
289+
equalTo(0L)
290+
)
291+
);
292+
client().prepareSearch("logs*").setSize(100).get().decRef();
293+
var nodeStats3 = indicesService.stats(CommonStatsFlags.ALL, false).getSearch().getTotal();
294+
collectThenAssertMetrics(
295+
telemetry,
296+
3,
297+
Map.of(
298+
STANDARD_QUERY_COUNT,
299+
equalTo(numStandardIndices),
300+
STANDARD_QUERY_TIME,
301+
equalTo(nodeStats1.getQueryTimeInMillis()),
302+
303+
TIME_SERIES_QUERY_COUNT,
304+
equalTo(numTimeSeriesIndices),
305+
TIME_SERIES_QUERY_TIME,
306+
equalTo(nodeStats2.getQueryTimeInMillis() - nodeStats1.getQueryTimeInMillis()),
307+
308+
LOGSDB_QUERY_COUNT,
309+
equalTo(numLogsdbIndices),
310+
LOGSDB_QUERY_TIME,
311+
equalTo(nodeStats3.getQueryTimeInMillis() - nodeStats2.getQueryTimeInMillis()),
312+
LOGSDB_FETCH_COUNT,
313+
equalTo(nodeStats3.getFetchCount() - nodeStats2.getFetchCount()),
314+
LOGSDB_FETCH_TIME,
315+
equalTo(nodeStats3.getFetchTimeInMillis() - nodeStats2.getFetchTimeInMillis())
316+
)
317+
);
318+
// search failures
319+
expectThrows(Exception.class, () -> { client().prepareSearch("logs*").setRuntimeMappings(parseMapping("""
320+
{
321+
"fail_me": {
322+
"type": "long",
323+
"script": {"source": "<>", "lang": "failing_field"}
324+
}
325+
}
326+
""")).setQuery(new RangeQueryBuilder("fail_me").gte(0)).setAllowPartialSearchResults(true).get(); });
327+
collectThenAssertMetrics(
328+
telemetry,
329+
4,
330+
Map.of(
331+
STANDARD_QUERY_FAILURE,
332+
equalTo(0L),
333+
STANDARD_FETCH_FAILURE,
334+
equalTo(0L),
335+
TIME_SERIES_QUERY_FAILURE,
336+
equalTo(0L),
337+
TIME_SERIES_FETCH_FAILURE,
338+
equalTo(0L),
339+
LOGSDB_QUERY_FAILURE,
340+
equalTo(numLogsdbIndices),
341+
LOGSDB_FETCH_FAILURE,
342+
equalTo(0L)
343+
)
344+
);
162345
}
163346

164347
void collectThenAssertMetrics(TestTelemetryPlugin telemetry, int times, Map<String, Matcher<Long>> matchers) {
@@ -175,7 +358,7 @@ int populateStandardIndices(long numIndices) {
175358
int totalDocs = 0;
176359
for (int i = 0; i < numIndices; i++) {
177360
String indexName = "standard-" + i;
178-
createIndex(indexName);
361+
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build());
179362
int numDocs = between(1, 5);
180363
for (int d = 0; d < numDocs; d++) {
181364
indexDoc(indexName, Integer.toString(d), "f", Integer.toString(d));
@@ -190,7 +373,11 @@ int populateTimeSeriesIndices(long numIndices) {
190373
int totalDocs = 0;
191374
for (int i = 0; i < numIndices; i++) {
192375
String indexName = "time_series-" + i;
193-
Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host")).build();
376+
Settings settings = Settings.builder()
377+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
378+
.put("mode", "time_series")
379+
.putList("routing_path", List.of("host"))
380+
.build();
194381
client().admin()
195382
.indices()
196383
.prepareCreate(indexName)
@@ -214,6 +401,7 @@ int populateTimeSeriesIndices(long numIndices) {
214401
}
215402
totalDocs += numDocs;
216403
flush(indexName);
404+
refresh(indexName);
217405
}
218406
return totalDocs;
219407
}
@@ -222,7 +410,7 @@ int populateLogsdbIndices(long numIndices) {
222410
int totalDocs = 0;
223411
for (int i = 0; i < numIndices; i++) {
224412
String indexName = "logsdb-" + i;
225-
Settings settings = Settings.builder().put("mode", "logsdb").build();
413+
Settings settings = Settings.builder().put("mode", "logsdb").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build();
226414
client().admin()
227415
.indices()
228416
.prepareCreate(indexName)
@@ -237,9 +425,75 @@ int populateLogsdbIndices(long numIndices) {
237425
.setSource("@timestamp", timestamp, "host.name", randomFrom("prod", "qa"), "cpu", randomIntBetween(1, 100))
238426
.get();
239427
}
428+
int numFailures = between(0, 2);
429+
for (int d = 0; d < numFailures; d++) {
430+
expectThrows(Exception.class, () -> {
431+
client().prepareIndex(indexName)
432+
.setSource(
433+
"@timestamp",
434+
"malformed-timestamp",
435+
"host.name",
436+
randomFrom("prod", "qa"),
437+
"cpu",
438+
randomIntBetween(1, 100)
439+
)
440+
.get();
441+
});
442+
}
240443
totalDocs += numDocs;
241444
flush(indexName);
445+
refresh(indexName);
242446
}
243447
return totalDocs;
244448
}
449+
450+
private Map<String, Object> parseMapping(String mapping) throws IOException {
451+
try (XContentParser parser = createParser(JsonXContent.jsonXContent, mapping)) {
452+
return parser.map();
453+
}
454+
}
455+
456+
public static class FailingFieldPlugin extends Plugin implements ScriptPlugin {
457+
458+
@Override
459+
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
460+
return new ScriptEngine() {
461+
@Override
462+
public String getType() {
463+
return "failing_field";
464+
}
465+
466+
@Override
467+
@SuppressWarnings("unchecked")
468+
public <FactoryType> FactoryType compile(
469+
String name,
470+
String code,
471+
ScriptContext<FactoryType> context,
472+
Map<String, String> params
473+
) {
474+
return (FactoryType) new LongFieldScript.Factory() {
475+
@Override
476+
public LongFieldScript.LeafFactory newFactory(
477+
String fieldName,
478+
Map<String, Object> params,
479+
SearchLookup searchLookup,
480+
OnScriptError onScriptError
481+
) {
482+
return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) {
483+
@Override
484+
public void execute() {
485+
throw new IllegalStateException("Accessing failing field");
486+
}
487+
};
488+
}
489+
};
490+
}
491+
492+
@Override
493+
public Set<ScriptContext<?>> getSupportedContexts() {
494+
return Set.of(LongFieldScript.CONTEXT);
495+
}
496+
};
497+
}
498+
}
245499
}

0 commit comments

Comments
 (0)