Skip to content

Commit 32374db

Browse files
authored
Metrics for Reindexing (#111845)
This PR adds metrics for the Reindexing plugin, to measure the end-to-end time taken by a reindex request, update-by-query request and delete-by-query request.
1 parent d832e6e commit 32374db

File tree

14 files changed

+507
-16
lines changed

14 files changed

+507
-16
lines changed
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.index.reindex;
10+
11+
import org.elasticsearch.index.query.QueryBuilders;
12+
import org.elasticsearch.plugins.Plugin;
13+
import org.elasticsearch.plugins.PluginsService;
14+
import org.elasticsearch.reindex.BulkIndexByScrollResponseMatcher;
15+
import org.elasticsearch.reindex.ReindexPlugin;
16+
import org.elasticsearch.search.sort.SortOrder;
17+
import org.elasticsearch.telemetry.Measurement;
18+
import org.elasticsearch.telemetry.TestTelemetryPlugin;
19+
import org.elasticsearch.test.ESIntegTestCase;
20+
21+
import java.util.Arrays;
22+
import java.util.Collection;
23+
import java.util.List;
24+
25+
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
26+
import static org.elasticsearch.reindex.DeleteByQueryMetrics.DELETE_BY_QUERY_TIME_HISTOGRAM;
27+
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM;
28+
import static org.elasticsearch.reindex.UpdateByQueryMetrics.UPDATE_BY_QUERY_TIME_HISTOGRAM;
29+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
30+
import static org.hamcrest.Matchers.equalTo;
31+
32+
@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
33+
public class ReindexPluginMetricsIT extends ESIntegTestCase {
34+
@Override
35+
protected Collection<Class<? extends Plugin>> nodePlugins() {
36+
return Arrays.asList(ReindexPlugin.class, TestTelemetryPlugin.class);
37+
}
38+
39+
protected ReindexRequestBuilder reindex() {
40+
return new ReindexRequestBuilder(client());
41+
}
42+
43+
protected UpdateByQueryRequestBuilder updateByQuery() {
44+
return new UpdateByQueryRequestBuilder(client());
45+
}
46+
47+
protected DeleteByQueryRequestBuilder deleteByQuery() {
48+
return new DeleteByQueryRequestBuilder(client());
49+
}
50+
51+
public static BulkIndexByScrollResponseMatcher matcher() {
52+
return new BulkIndexByScrollResponseMatcher();
53+
}
54+
55+
public void testReindexMetrics() throws Exception {
56+
final String dataNodeName = internalCluster().startNode();
57+
58+
indexRandom(
59+
true,
60+
prepareIndex("source").setId("1").setSource("foo", "a"),
61+
prepareIndex("source").setId("2").setSource("foo", "a"),
62+
prepareIndex("source").setId("3").setSource("foo", "b"),
63+
prepareIndex("source").setId("4").setSource("foo", "c")
64+
);
65+
assertHitCount(prepareSearch("source").setSize(0), 4);
66+
67+
final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
68+
.filterPlugins(TestTelemetryPlugin.class)
69+
.findFirst()
70+
.orElseThrow();
71+
72+
// Copy all the docs
73+
reindex().source("source").destination("dest").get();
74+
// Use assertBusy to wait for all threads to complete so we get deterministic results
75+
assertBusy(() -> {
76+
testTelemetryPlugin.collect();
77+
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
78+
assertThat(measurements.size(), equalTo(1));
79+
});
80+
81+
// Now none of them
82+
createIndex("none");
83+
reindex().source("source").destination("none").filter(termQuery("foo", "no_match")).get();
84+
assertBusy(() -> {
85+
testTelemetryPlugin.collect();
86+
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
87+
assertThat(measurements.size(), equalTo(2));
88+
});
89+
90+
// Now half of them
91+
reindex().source("source").destination("dest_half").filter(termQuery("foo", "a")).get();
92+
assertBusy(() -> {
93+
testTelemetryPlugin.collect();
94+
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
95+
assertThat(measurements.size(), equalTo(3));
96+
});
97+
98+
// Limit with maxDocs
99+
reindex().source("source").destination("dest_size_one").maxDocs(1).get();
100+
assertBusy(() -> {
101+
testTelemetryPlugin.collect();
102+
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
103+
assertThat(measurements.size(), equalTo(4));
104+
});
105+
}
106+
107+
public void testDeleteByQueryMetrics() throws Exception {
108+
final String dataNodeName = internalCluster().startNode();
109+
110+
indexRandom(
111+
true,
112+
prepareIndex("test").setId("1").setSource("foo", "a"),
113+
prepareIndex("test").setId("2").setSource("foo", "a"),
114+
prepareIndex("test").setId("3").setSource("foo", "b"),
115+
prepareIndex("test").setId("4").setSource("foo", "c"),
116+
prepareIndex("test").setId("5").setSource("foo", "d"),
117+
prepareIndex("test").setId("6").setSource("foo", "e"),
118+
prepareIndex("test").setId("7").setSource("foo", "f")
119+
);
120+
121+
assertHitCount(prepareSearch("test").setSize(0), 7);
122+
123+
final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
124+
.filterPlugins(TestTelemetryPlugin.class)
125+
.findFirst()
126+
.orElseThrow();
127+
128+
// Deletes two docs that matches "foo:a"
129+
deleteByQuery().source("test").filter(termQuery("foo", "a")).refresh(true).get();
130+
assertBusy(() -> {
131+
testTelemetryPlugin.collect();
132+
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(DELETE_BY_QUERY_TIME_HISTOGRAM);
133+
assertThat(measurements.size(), equalTo(1));
134+
});
135+
136+
// Deletes the two first docs with limit by size
137+
DeleteByQueryRequestBuilder request = deleteByQuery().source("test").filter(QueryBuilders.matchAllQuery()).size(2).refresh(true);
138+
request.source().addSort("foo.keyword", SortOrder.ASC);
139+
request.get();
140+
assertBusy(() -> {
141+
testTelemetryPlugin.collect();
142+
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(DELETE_BY_QUERY_TIME_HISTOGRAM);
143+
assertThat(measurements.size(), equalTo(2));
144+
});
145+
146+
// Deletes but match no docs
147+
deleteByQuery().source("test").filter(termQuery("foo", "no_match")).refresh(true).get();
148+
assertBusy(() -> {
149+
testTelemetryPlugin.collect();
150+
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(DELETE_BY_QUERY_TIME_HISTOGRAM);
151+
assertThat(measurements.size(), equalTo(3));
152+
});
153+
154+
// Deletes all remaining docs
155+
deleteByQuery().source("test").filter(QueryBuilders.matchAllQuery()).refresh(true).get();
156+
assertBusy(() -> {
157+
testTelemetryPlugin.collect();
158+
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(DELETE_BY_QUERY_TIME_HISTOGRAM);
159+
assertThat(measurements.size(), equalTo(4));
160+
});
161+
}
162+
163+
public void testUpdateByQueryMetrics() throws Exception {
164+
final String dataNodeName = internalCluster().startNode();
165+
166+
indexRandom(
167+
true,
168+
prepareIndex("test").setId("1").setSource("foo", "a"),
169+
prepareIndex("test").setId("2").setSource("foo", "a"),
170+
prepareIndex("test").setId("3").setSource("foo", "b"),
171+
prepareIndex("test").setId("4").setSource("foo", "c")
172+
);
173+
assertHitCount(prepareSearch("test").setSize(0), 4);
174+
assertEquals(1, client().prepareGet("test", "1").get().getVersion());
175+
assertEquals(1, client().prepareGet("test", "4").get().getVersion());
176+
177+
final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
178+
.filterPlugins(TestTelemetryPlugin.class)
179+
.findFirst()
180+
.orElseThrow();
181+
182+
// Reindex all the docs
183+
updateByQuery().source("test").refresh(true).get();
184+
assertBusy(() -> {
185+
testTelemetryPlugin.collect();
186+
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(UPDATE_BY_QUERY_TIME_HISTOGRAM);
187+
assertThat(measurements.size(), equalTo(1));
188+
});
189+
190+
// Now none of them
191+
updateByQuery().source("test").filter(termQuery("foo", "no_match")).refresh(true).get();
192+
assertBusy(() -> {
193+
testTelemetryPlugin.collect();
194+
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(UPDATE_BY_QUERY_TIME_HISTOGRAM);
195+
assertThat(measurements.size(), equalTo(2));
196+
});
197+
198+
// Now half of them
199+
updateByQuery().source("test").filter(termQuery("foo", "a")).refresh(true).get();
200+
assertBusy(() -> {
201+
testTelemetryPlugin.collect();
202+
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(UPDATE_BY_QUERY_TIME_HISTOGRAM);
203+
assertThat(measurements.size(), equalTo(3));
204+
});
205+
206+
// Limit with size
207+
UpdateByQueryRequestBuilder request = updateByQuery().source("test").size(3).refresh(true);
208+
request.source().addSort("foo.keyword", SortOrder.ASC);
209+
request.get();
210+
assertBusy(() -> {
211+
testTelemetryPlugin.collect();
212+
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(UPDATE_BY_QUERY_TIME_HISTOGRAM);
213+
assertThat(measurements.size(), equalTo(4));
214+
});
215+
}
216+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.reindex;
10+
11+
import org.elasticsearch.telemetry.metric.LongHistogram;
12+
import org.elasticsearch.telemetry.metric.MeterRegistry;
13+
14+
public class DeleteByQueryMetrics {
15+
public static final String DELETE_BY_QUERY_TIME_HISTOGRAM = "es.delete_by_query.duration.histogram";
16+
17+
private final LongHistogram deleteByQueryTimeSecsHistogram;
18+
19+
public DeleteByQueryMetrics(MeterRegistry meterRegistry) {
20+
this(
21+
meterRegistry.registerLongHistogram(DELETE_BY_QUERY_TIME_HISTOGRAM, "Time taken to execute Delete by Query request", "seconds")
22+
);
23+
}
24+
25+
private DeleteByQueryMetrics(LongHistogram deleteByQueryTimeSecsHistogram) {
26+
this.deleteByQueryTimeSecsHistogram = deleteByQueryTimeSecsHistogram;
27+
}
28+
29+
public long recordTookTime(long tookTime) {
30+
deleteByQueryTimeSecsHistogram.record(tookTime);
31+
return tookTime;
32+
}
33+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.reindex;
10+
11+
import org.elasticsearch.telemetry.metric.LongHistogram;
12+
import org.elasticsearch.telemetry.metric.MeterRegistry;
13+
14+
public class ReindexMetrics {
15+
16+
public static final String REINDEX_TIME_HISTOGRAM = "es.reindex.duration.histogram";
17+
18+
private final LongHistogram reindexTimeSecsHistogram;
19+
20+
public ReindexMetrics(MeterRegistry meterRegistry) {
21+
this(meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "millis"));
22+
}
23+
24+
private ReindexMetrics(LongHistogram reindexTimeSecsHistogram) {
25+
this.reindexTimeSecsHistogram = reindexTimeSecsHistogram;
26+
}
27+
28+
public long recordTookTime(long tookTime) {
29+
reindexTimeSecsHistogram.record(tookTime);
30+
return tookTime;
31+
}
32+
}

modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexPlugin.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import java.util.ArrayList;
3535
import java.util.Arrays;
3636
import java.util.Collection;
37-
import java.util.Collections;
3837
import java.util.List;
3938
import java.util.function.Predicate;
4039
import java.util.function.Supplier;
@@ -85,8 +84,11 @@ public List<RestHandler> getRestHandlers(
8584

8685
@Override
8786
public Collection<?> createComponents(PluginServices services) {
88-
return Collections.singletonList(
89-
new ReindexSslConfig(services.environment().settings(), services.environment(), services.resourceWatcherService())
87+
return List.of(
88+
new ReindexSslConfig(services.environment().settings(), services.environment(), services.resourceWatcherService()),
89+
new ReindexMetrics(services.telemetryProvider().getMeterRegistry()),
90+
new UpdateByQueryMetrics(services.telemetryProvider().getMeterRegistry()),
91+
new DeleteByQueryMetrics(services.telemetryProvider().getMeterRegistry())
9092
);
9193
}
9294

modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.common.lucene.uid.Versions;
3838
import org.elasticsearch.common.settings.Settings;
3939
import org.elasticsearch.common.xcontent.XContentHelper;
40+
import org.elasticsearch.core.Nullable;
4041
import org.elasticsearch.index.IndexMode;
4142
import org.elasticsearch.index.IndexSettings;
4243
import org.elasticsearch.index.VersionType;
@@ -65,6 +66,7 @@
6566
import java.util.ArrayList;
6667
import java.util.List;
6768
import java.util.Map;
69+
import java.util.concurrent.TimeUnit;
6870
import java.util.concurrent.atomic.AtomicInteger;
6971
import java.util.function.BiFunction;
7072
import java.util.function.LongSupplier;
@@ -82,26 +84,31 @@ public class Reindexer {
8284
private final ThreadPool threadPool;
8385
private final ScriptService scriptService;
8486
private final ReindexSslConfig reindexSslConfig;
87+
private final ReindexMetrics reindexMetrics;
8588

8689
Reindexer(
8790
ClusterService clusterService,
8891
Client client,
8992
ThreadPool threadPool,
9093
ScriptService scriptService,
91-
ReindexSslConfig reindexSslConfig
94+
ReindexSslConfig reindexSslConfig,
95+
@Nullable ReindexMetrics reindexMetrics
9296
) {
9397
this.clusterService = clusterService;
9498
this.client = client;
9599
this.threadPool = threadPool;
96100
this.scriptService = scriptService;
97101
this.reindexSslConfig = reindexSslConfig;
102+
this.reindexMetrics = reindexMetrics;
98103
}
99104

100105
public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListener<Void> listener) {
101106
BulkByScrollParallelizationHelper.initTaskState(task, request, client, listener);
102107
}
103108

104109
public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkClient, ActionListener<BulkByScrollResponse> listener) {
110+
long startTime = System.nanoTime();
111+
105112
BulkByScrollParallelizationHelper.executeSlicedAction(
106113
task,
107114
request,
@@ -122,7 +129,12 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl
122129
clusterService.state(),
123130
reindexSslConfig,
124131
request,
125-
listener
132+
ActionListener.runAfter(listener, () -> {
133+
long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime);
134+
if (reindexMetrics != null) {
135+
reindexMetrics.recordTookTime(elapsedTime);
136+
}
137+
})
126138
);
127139
searchAction.start();
128140
}

0 commit comments

Comments
 (0)