99
1010package org .elasticsearch .monitor .metrics ;
1111
12+ import org .elasticsearch .action .admin .indices .stats .CommonStatsFlags ;
13+ import org .elasticsearch .cluster .metadata .IndexMetadata ;
1214import org .elasticsearch .common .settings .Setting ;
1315import org .elasticsearch .common .settings .Settings ;
1416import org .elasticsearch .core .TimeValue ;
17+ import org .elasticsearch .index .mapper .OnScriptError ;
18+ import org .elasticsearch .index .query .RangeQueryBuilder ;
19+ import org .elasticsearch .indices .IndicesService ;
1520import org .elasticsearch .plugins .Plugin ;
1621import org .elasticsearch .plugins .PluginsService ;
22+ import org .elasticsearch .plugins .ScriptPlugin ;
23+ import org .elasticsearch .script .LongFieldScript ;
24+ import org .elasticsearch .script .ScriptContext ;
25+ import org .elasticsearch .script .ScriptEngine ;
26+ import org .elasticsearch .search .lookup .SearchLookup ;
1727import org .elasticsearch .telemetry .Measurement ;
1828import org .elasticsearch .telemetry .TestTelemetryPlugin ;
1929import org .elasticsearch .test .ESIntegTestCase ;
30+ import org .elasticsearch .xcontent .XContentParser ;
31+ import org .elasticsearch .xcontent .json .JsonXContent ;
2032import org .hamcrest .Matcher ;
2133
34+ import java .io .IOException ;
2235import java .util .Collection ;
2336import java .util .List ;
2437import java .util .Map ;
38+ import java .util .Set ;
2539
2640import static org .elasticsearch .index .mapper .DateFieldMapper .DEFAULT_DATE_TIME_FORMATTER ;
2741import static org .hamcrest .Matchers .equalTo ;
2842import static org .hamcrest .Matchers .greaterThan ;
43+ import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
2944import static org .hamcrest .Matchers .hasSize ;
3045
3146@ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST , numDataNodes = 0 , numClientNodes = 0 )
@@ -42,7 +57,7 @@ public List<Setting<?>> getSettings() {
4257
4358 @ Override
4459 protected Collection <Class <? extends Plugin >> nodePlugins () {
45- return List .of (TestTelemetryPlugin .class , TestAPMInternalSettings .class );
60+ return List .of (TestTelemetryPlugin .class , TestAPMInternalSettings .class , FailingFieldPlugin . class );
4661 }
4762
4863 @ Override
@@ -54,27 +69,57 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
5469 }
5570
5671 static final String STANDARD_INDEX_COUNT = "es.indices.standard.total" ;
72+ static final String STANDARD_BYTES_SIZE = "es.indices.standard.size" ;
5773 static final String STANDARD_DOCS_COUNT = "es.indices.standard.docs.total" ;
58- static final String STANDARD_BYTES_SIZE = "es.indices.standard.bytes.total" ;
74+ static final String STANDARD_QUERY_COUNT = "es.indices.standard.query.total" ;
75+ static final String STANDARD_QUERY_TIME = "es.indices.standard.query.time" ;
76+ static final String STANDARD_QUERY_FAILURE = "es.indices.standard.query.failure.total" ;
77+ static final String STANDARD_FETCH_COUNT = "es.indices.standard.fetch.total" ;
78+ static final String STANDARD_FETCH_TIME = "es.indices.standard.fetch.time" ;
79+ static final String STANDARD_FETCH_FAILURE = "es.indices.standard.fetch.failure.total" ;
80+ static final String STANDARD_INDEXING_COUNT = "es.indices.standard.indexing.total" ;
81+ static final String STANDARD_INDEXING_TIME = "es.indices.standard.indexing.time" ;
82+ static final String STANDARD_INDEXING_FAILURE = "es.indices.standard.indexing.failure.total" ;
5983
6084 static final String TIME_SERIES_INDEX_COUNT = "es.indices.time_series.total" ;
85+ static final String TIME_SERIES_BYTES_SIZE = "es.indices.time_series.size" ;
6186 static final String TIME_SERIES_DOCS_COUNT = "es.indices.time_series.docs.total" ;
62- static final String TIME_SERIES_BYTES_SIZE = "es.indices.time_series.bytes.total" ;
87+ static final String TIME_SERIES_QUERY_COUNT = "es.indices.time_series.query.total" ;
88+ static final String TIME_SERIES_QUERY_TIME = "es.indices.time_series.query.time" ;
89+ static final String TIME_SERIES_QUERY_FAILURE = "es.indices.time_series.query.failure.total" ;
90+ static final String TIME_SERIES_FETCH_COUNT = "es.indices.time_series.fetch.total" ;
91+ static final String TIME_SERIES_FETCH_TIME = "es.indices.time_series.fetch.time" ;
92+ static final String TIME_SERIES_FETCH_FAILURE = "es.indices.time_series.fetch.failure.total" ;
93+ static final String TIME_SERIES_INDEXING_COUNT = "es.indices.time_series.indexing.total" ;
94+ static final String TIME_SERIES_INDEXING_TIME = "es.indices.time_series.indexing.time" ;
95+ static final String TIME_SERIES_INDEXING_FAILURE = "es.indices.time_series.indexing.failure.total" ;
6396
6497 static final String LOGSDB_INDEX_COUNT = "es.indices.logsdb.total" ;
98+ static final String LOGSDB_BYTES_SIZE = "es.indices.logsdb.size" ;
6599 static final String LOGSDB_DOCS_COUNT = "es.indices.logsdb.docs.total" ;
66- static final String LOGSDB_BYTES_SIZE = "es.indices.logsdb.bytes.total" ;
100+ static final String LOGSDB_QUERY_COUNT = "es.indices.logsdb.query.total" ;
101+ static final String LOGSDB_QUERY_TIME = "es.indices.logsdb.query.time" ;
102+ static final String LOGSDB_QUERY_FAILURE = "es.indices.logsdb.query.failure.total" ;
103+ static final String LOGSDB_FETCH_COUNT = "es.indices.logsdb.fetch.total" ;
104+ static final String LOGSDB_FETCH_TIME = "es.indices.logsdb.fetch.time" ;
105+ static final String LOGSDB_FETCH_FAILURE = "es.indices.logsdb.fetch.failure.total" ;
106+ static final String LOGSDB_INDEXING_COUNT = "es.indices.logsdb.indexing.total" ;
107+ static final String LOGSDB_INDEXING_TIME = "es.indices.logsdb.indexing.time" ;
108+ static final String LOGSDB_INDEXING_FAILURE = "es.indices.logsdb.indexing.failure.total" ;
67109
68- public void testIndicesMetrics () {
110+ public void testIndicesMetrics () throws Exception {
69111 String node = internalCluster ().startNode ();
70112 ensureStableCluster (1 );
71113 final TestTelemetryPlugin telemetry = internalCluster ().getInstance (PluginsService .class , node )
72114 .filterPlugins (TestTelemetryPlugin .class )
73115 .findFirst ()
74116 .orElseThrow ();
117+ final IndicesService indicesService = internalCluster ().getInstance (IndicesService .class , node );
118+ var indexing0 = indicesService .stats (CommonStatsFlags .ALL , false ).getIndexing ().getTotal ();
75119 telemetry .resetMeter ();
76120 long numStandardIndices = randomIntBetween (1 , 5 );
77121 long numStandardDocs = populateStandardIndices (numStandardIndices );
122+ var indexing1 = indicesService .stats (CommonStatsFlags .ALL , false ).getIndexing ().getTotal ();
78123 collectThenAssertMetrics (
79124 telemetry ,
80125 1 ,
@@ -104,6 +149,7 @@ public void testIndicesMetrics() {
104149
105150 long numTimeSeriesIndices = randomIntBetween (1 , 5 );
106151 long numTimeSeriesDocs = populateTimeSeriesIndices (numTimeSeriesIndices );
152+ var indexing2 = indicesService .stats (CommonStatsFlags .ALL , false ).getIndexing ().getTotal ();
107153 collectThenAssertMetrics (
108154 telemetry ,
109155 2 ,
@@ -133,6 +179,7 @@ public void testIndicesMetrics() {
133179
134180 long numLogsdbIndices = randomIntBetween (1 , 5 );
135181 long numLogsdbDocs = populateLogsdbIndices (numLogsdbIndices );
182+ var indexing3 = indicesService .stats (CommonStatsFlags .ALL , false ).getIndexing ().getTotal ();
136183 collectThenAssertMetrics (
137184 telemetry ,
138185 3 ,
@@ -159,6 +206,142 @@ public void testIndicesMetrics() {
159206 greaterThan (0L )
160207 )
161208 );
209+ // indexing stats
210+ collectThenAssertMetrics (
211+ telemetry ,
212+ 4 ,
213+ Map .of (
214+ STANDARD_INDEXING_COUNT ,
215+ equalTo (numStandardDocs ),
216+ STANDARD_INDEXING_TIME ,
217+ greaterThanOrEqualTo (0L ),
218+ STANDARD_INDEXING_FAILURE ,
219+ equalTo (indexing1 .getIndexFailedCount () - indexing0 .getIndexCount ()),
220+
221+ TIME_SERIES_INDEXING_COUNT ,
222+ equalTo (numTimeSeriesDocs ),
223+ TIME_SERIES_INDEXING_TIME ,
224+ greaterThanOrEqualTo (0L ),
225+ TIME_SERIES_INDEXING_FAILURE ,
226+ equalTo (indexing2 .getIndexFailedCount () - indexing1 .getIndexFailedCount ()),
227+
228+ LOGSDB_INDEXING_COUNT ,
229+ equalTo (numLogsdbDocs ),
230+ LOGSDB_INDEXING_TIME ,
231+ greaterThanOrEqualTo (0L ),
232+ LOGSDB_INDEXING_FAILURE ,
233+ equalTo (indexing3 .getIndexFailedCount () - indexing2 .getIndexFailedCount ())
234+ )
235+ );
236+ telemetry .resetMeter ();
237+
238+ // search and fetch
239+ client ().prepareSearch ("standard*" ).setSize (100 ).get ().decRef ();
240+ var nodeStats1 = indicesService .stats (CommonStatsFlags .ALL , false ).getSearch ().getTotal ();
241+ collectThenAssertMetrics (
242+ telemetry ,
243+ 1 ,
244+ Map .of (
245+ STANDARD_QUERY_COUNT ,
246+ equalTo (numStandardIndices ),
247+ STANDARD_QUERY_TIME ,
248+ equalTo (nodeStats1 .getQueryTimeInMillis ()),
249+ STANDARD_FETCH_COUNT ,
250+ equalTo (nodeStats1 .getFetchCount ()),
251+ STANDARD_FETCH_TIME ,
252+ equalTo (nodeStats1 .getFetchTimeInMillis ()),
253+
254+ TIME_SERIES_QUERY_COUNT ,
255+ equalTo (0L ),
256+ TIME_SERIES_QUERY_TIME ,
257+ equalTo (0L ),
258+
259+ LOGSDB_QUERY_COUNT ,
260+ equalTo (0L ),
261+ LOGSDB_QUERY_TIME ,
262+ equalTo (0L )
263+ )
264+ );
265+
266+ client ().prepareSearch ("time*" ).setSize (100 ).get ().decRef ();
267+ var nodeStats2 = indicesService .stats (CommonStatsFlags .ALL , false ).getSearch ().getTotal ();
268+ collectThenAssertMetrics (
269+ telemetry ,
270+ 2 ,
271+ Map .of (
272+ STANDARD_QUERY_COUNT ,
273+ equalTo (numStandardIndices ),
274+ STANDARD_QUERY_TIME ,
275+ equalTo (nodeStats1 .getQueryTimeInMillis ()),
276+
277+ TIME_SERIES_QUERY_COUNT ,
278+ equalTo (numTimeSeriesIndices ),
279+ TIME_SERIES_QUERY_TIME ,
280+ equalTo (nodeStats2 .getQueryTimeInMillis () - nodeStats1 .getQueryTimeInMillis ()),
281+ TIME_SERIES_FETCH_COUNT ,
282+ equalTo (nodeStats2 .getFetchCount () - nodeStats1 .getFetchCount ()),
283+ TIME_SERIES_FETCH_TIME ,
284+ equalTo (nodeStats2 .getFetchTimeInMillis () - nodeStats1 .getFetchTimeInMillis ()),
285+
286+ LOGSDB_QUERY_COUNT ,
287+ equalTo (0L ),
288+ LOGSDB_QUERY_TIME ,
289+ equalTo (0L )
290+ )
291+ );
292+ client ().prepareSearch ("logs*" ).setSize (100 ).get ().decRef ();
293+ var nodeStats3 = indicesService .stats (CommonStatsFlags .ALL , false ).getSearch ().getTotal ();
294+ collectThenAssertMetrics (
295+ telemetry ,
296+ 3 ,
297+ Map .of (
298+ STANDARD_QUERY_COUNT ,
299+ equalTo (numStandardIndices ),
300+ STANDARD_QUERY_TIME ,
301+ equalTo (nodeStats1 .getQueryTimeInMillis ()),
302+
303+ TIME_SERIES_QUERY_COUNT ,
304+ equalTo (numTimeSeriesIndices ),
305+ TIME_SERIES_QUERY_TIME ,
306+ equalTo (nodeStats2 .getQueryTimeInMillis () - nodeStats1 .getQueryTimeInMillis ()),
307+
308+ LOGSDB_QUERY_COUNT ,
309+ equalTo (numLogsdbIndices ),
310+ LOGSDB_QUERY_TIME ,
311+ equalTo (nodeStats3 .getQueryTimeInMillis () - nodeStats2 .getQueryTimeInMillis ()),
312+ LOGSDB_FETCH_COUNT ,
313+ equalTo (nodeStats3 .getFetchCount () - nodeStats2 .getFetchCount ()),
314+ LOGSDB_FETCH_TIME ,
315+ equalTo (nodeStats3 .getFetchTimeInMillis () - nodeStats2 .getFetchTimeInMillis ())
316+ )
317+ );
318+ // search failures
319+ expectThrows (Exception .class , () -> { client ().prepareSearch ("logs*" ).setRuntimeMappings (parseMapping ("""
320+ {
321+ "fail_me": {
322+ "type": "long",
323+ "script": {"source": "<>", "lang": "failing_field"}
324+ }
325+ }
326+ """ )).setQuery (new RangeQueryBuilder ("fail_me" ).gte (0 )).setAllowPartialSearchResults (true ).get (); });
327+ collectThenAssertMetrics (
328+ telemetry ,
329+ 4 ,
330+ Map .of (
331+ STANDARD_QUERY_FAILURE ,
332+ equalTo (0L ),
333+ STANDARD_FETCH_FAILURE ,
334+ equalTo (0L ),
335+ TIME_SERIES_QUERY_FAILURE ,
336+ equalTo (0L ),
337+ TIME_SERIES_FETCH_FAILURE ,
338+ equalTo (0L ),
339+ LOGSDB_QUERY_FAILURE ,
340+ equalTo (numLogsdbIndices ),
341+ LOGSDB_FETCH_FAILURE ,
342+ equalTo (0L )
343+ )
344+ );
162345 }
163346
164347 void collectThenAssertMetrics (TestTelemetryPlugin telemetry , int times , Map <String , Matcher <Long >> matchers ) {
@@ -175,7 +358,7 @@ int populateStandardIndices(long numIndices) {
175358 int totalDocs = 0 ;
176359 for (int i = 0 ; i < numIndices ; i ++) {
177360 String indexName = "standard-" + i ;
178- createIndex (indexName );
361+ createIndex (indexName , Settings . builder (). put ( IndexMetadata . SETTING_NUMBER_OF_SHARDS , 1 ). build () );
179362 int numDocs = between (1 , 5 );
180363 for (int d = 0 ; d < numDocs ; d ++) {
181364 indexDoc (indexName , Integer .toString (d ), "f" , Integer .toString (d ));
@@ -190,7 +373,11 @@ int populateTimeSeriesIndices(long numIndices) {
190373 int totalDocs = 0 ;
191374 for (int i = 0 ; i < numIndices ; i ++) {
192375 String indexName = "time_series-" + i ;
193- Settings settings = Settings .builder ().put ("mode" , "time_series" ).putList ("routing_path" , List .of ("host" )).build ();
376+ Settings settings = Settings .builder ()
377+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 1 )
378+ .put ("mode" , "time_series" )
379+ .putList ("routing_path" , List .of ("host" ))
380+ .build ();
194381 client ().admin ()
195382 .indices ()
196383 .prepareCreate (indexName )
@@ -214,6 +401,7 @@ int populateTimeSeriesIndices(long numIndices) {
214401 }
215402 totalDocs += numDocs ;
216403 flush (indexName );
404+ refresh (indexName );
217405 }
218406 return totalDocs ;
219407 }
@@ -222,7 +410,7 @@ int populateLogsdbIndices(long numIndices) {
222410 int totalDocs = 0 ;
223411 for (int i = 0 ; i < numIndices ; i ++) {
224412 String indexName = "logsdb-" + i ;
225- Settings settings = Settings .builder ().put ("mode" , "logsdb" ).build ();
413+ Settings settings = Settings .builder ().put ("mode" , "logsdb" ).put ( IndexMetadata . SETTING_NUMBER_OF_SHARDS , 1 ). build ();
226414 client ().admin ()
227415 .indices ()
228416 .prepareCreate (indexName )
@@ -237,9 +425,75 @@ int populateLogsdbIndices(long numIndices) {
237425 .setSource ("@timestamp" , timestamp , "host.name" , randomFrom ("prod" , "qa" ), "cpu" , randomIntBetween (1 , 100 ))
238426 .get ();
239427 }
428+ int numFailures = between (0 , 2 );
429+ for (int d = 0 ; d < numFailures ; d ++) {
430+ expectThrows (Exception .class , () -> {
431+ client ().prepareIndex (indexName )
432+ .setSource (
433+ "@timestamp" ,
434+ "malformed-timestamp" ,
435+ "host.name" ,
436+ randomFrom ("prod" , "qa" ),
437+ "cpu" ,
438+ randomIntBetween (1 , 100 )
439+ )
440+ .get ();
441+ });
442+ }
240443 totalDocs += numDocs ;
241444 flush (indexName );
445+ refresh (indexName );
242446 }
243447 return totalDocs ;
244448 }
449+
450+ private Map <String , Object > parseMapping (String mapping ) throws IOException {
451+ try (XContentParser parser = createParser (JsonXContent .jsonXContent , mapping )) {
452+ return parser .map ();
453+ }
454+ }
455+
456+ public static class FailingFieldPlugin extends Plugin implements ScriptPlugin {
457+
458+ @ Override
459+ public ScriptEngine getScriptEngine (Settings settings , Collection <ScriptContext <?>> contexts ) {
460+ return new ScriptEngine () {
461+ @ Override
462+ public String getType () {
463+ return "failing_field" ;
464+ }
465+
466+ @ Override
467+ @ SuppressWarnings ("unchecked" )
468+ public <FactoryType > FactoryType compile (
469+ String name ,
470+ String code ,
471+ ScriptContext <FactoryType > context ,
472+ Map <String , String > params
473+ ) {
474+ return (FactoryType ) new LongFieldScript .Factory () {
475+ @ Override
476+ public LongFieldScript .LeafFactory newFactory (
477+ String fieldName ,
478+ Map <String , Object > params ,
479+ SearchLookup searchLookup ,
480+ OnScriptError onScriptError
481+ ) {
482+ return ctx -> new LongFieldScript (fieldName , params , searchLookup , onScriptError , ctx ) {
483+ @ Override
484+ public void execute () {
485+ throw new IllegalStateException ("Accessing failing field" );
486+ }
487+ };
488+ }
489+ };
490+ }
491+
492+ @ Override
493+ public Set <ScriptContext <?>> getSupportedContexts () {
494+ return Set .of (LongFieldScript .CONTEXT );
495+ }
496+ };
497+ }
498+ }
245499}
0 commit comments