Skip to content

Commit 978f34c

Browse files
committed
Use attributes and add more tests
1 parent 70ed46d commit 978f34c

File tree

5 files changed

+244
-112
lines changed

5 files changed

+244
-112
lines changed

modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexPluginMetricsIT.java

Lines changed: 100 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,28 +9,35 @@
99

1010
package org.elasticsearch.index.reindex;
1111

12+
import org.elasticsearch.ElasticsearchStatusException;
13+
import org.elasticsearch.common.bytes.BytesArray;
14+
import org.elasticsearch.common.settings.Settings;
1215
import org.elasticsearch.index.query.QueryBuilders;
1316
import org.elasticsearch.plugins.Plugin;
1417
import org.elasticsearch.plugins.PluginsService;
1518
import org.elasticsearch.reindex.BulkIndexByScrollResponseMatcher;
1619
import org.elasticsearch.reindex.ReindexPlugin;
20+
import org.elasticsearch.reindex.TransportReindexAction;
21+
import org.elasticsearch.rest.root.MainRestPlugin;
1722
import org.elasticsearch.search.sort.SortOrder;
1823
import org.elasticsearch.telemetry.Measurement;
1924
import org.elasticsearch.telemetry.TestTelemetryPlugin;
2025
import org.elasticsearch.test.ESIntegTestCase;
2126

27+
import java.net.InetSocketAddress;
2228
import java.util.Arrays;
2329
import java.util.Collection;
2430
import java.util.List;
31+
import java.util.Map;
2532

2633
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
2734
import static org.elasticsearch.reindex.DeleteByQueryMetrics.DELETE_BY_QUERY_TIME_HISTOGRAM;
28-
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_FAILURE_HISTOGRAM;
29-
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_FAILURE_HISTOGRAM_REMOTE;
30-
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_SUCCESS_HISTOGRAM;
31-
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_SUCCESS_HISTOGRAM_REMOTE;
35+
import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE;
36+
import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_SOURCE;
37+
import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_LOCAL;
38+
import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_REMOTE;
39+
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_COMPLETION_HISTOGRAM;
3240
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM;
33-
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM_REMOTE;
3441
import static org.elasticsearch.reindex.UpdateByQueryMetrics.UPDATE_BY_QUERY_TIME_HISTOGRAM;
3542
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
3643
import static org.hamcrest.Matchers.equalTo;
@@ -40,7 +47,20 @@
4047
public class ReindexPluginMetricsIT extends ESIntegTestCase {
4148
@Override
4249
protected Collection<Class<? extends Plugin>> nodePlugins() {
43-
return Arrays.asList(ReindexPlugin.class, TestTelemetryPlugin.class);
50+
return Arrays.asList(ReindexPlugin.class, TestTelemetryPlugin.class, MainRestPlugin.class);
51+
}
52+
53+
@Override
54+
protected boolean addMockHttpTransport() {
55+
return false;
56+
}
57+
58+
@Override
59+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
60+
return Settings.builder()
61+
.put(super.nodeSettings(nodeOrdinal, otherSettings))
62+
.put(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey(), "*:*")
63+
.build();
4464
}
4565

4666
protected ReindexRequestBuilder reindex() {
@@ -59,6 +79,62 @@ public static BulkIndexByScrollResponseMatcher matcher() {
5979
return new BulkIndexByScrollResponseMatcher();
6080
}
6181

82+
public void testReindexFromRemoteMetrics() throws Exception {
83+
final String dataNodeName = internalCluster().startNode();
84+
85+
InetSocketAddress remoteAddress = randomFrom(cluster().httpAddresses());
86+
RemoteInfo remote = new RemoteInfo(
87+
"http",
88+
remoteAddress.getHostString(),
89+
remoteAddress.getPort(),
90+
null,
91+
new BytesArray("{\"match_all\":{}}"),
92+
null,
93+
null,
94+
Map.of(),
95+
RemoteInfo.DEFAULT_SOCKET_TIMEOUT,
96+
RemoteInfo.DEFAULT_CONNECT_TIMEOUT
97+
);
98+
99+
final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
100+
.filterPlugins(TestTelemetryPlugin.class)
101+
.findFirst()
102+
.orElseThrow();
103+
104+
var expectedException = assertThrows(
105+
"Source index not created yet, should throw not found exception",
106+
ElasticsearchStatusException.class,
107+
() -> reindex().source("source").setRemoteInfo(remote).destination("dest").get()
108+
);
109+
110+
// assert failure metrics
111+
assertBusy(() -> {
112+
testTelemetryPlugin.collect();
113+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
114+
List<Measurement> completions = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM);
115+
assertThat(completions.size(), equalTo(1));
116+
assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE), equalTo(expectedException.status().name()));
117+
assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE));
118+
});
119+
120+
// now create the source index
121+
indexRandom(true, prepareIndex("source").setId("1").setSource("foo", "a"));
122+
assertHitCount(prepareSearch("source").setSize(0), 1);
123+
124+
reindex().source("source").setRemoteInfo(remote).destination("dest").get();
125+
126+
// assert success metrics
127+
assertBusy(() -> {
128+
testTelemetryPlugin.collect();
129+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(2));
130+
List<Measurement> completions = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM);
131+
assertThat(completions.size(), equalTo(2));
132+
assertNull(completions.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE));
133+
assertThat(completions.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE));
134+
});
135+
136+
}
137+
62138
public void testReindexMetrics() throws Exception {
63139
final String dataNodeName = internalCluster().startNode();
64140

@@ -82,11 +158,7 @@ public void testReindexMetrics() throws Exception {
82158
assertBusy(() -> {
83159
testTelemetryPlugin.collect();
84160
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
85-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0));
86-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(1));
87-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0));
88-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0));
89-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0));
161+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(1));
90162
});
91163

92164
// Now none of them
@@ -95,35 +167,29 @@ public void testReindexMetrics() throws Exception {
95167
assertBusy(() -> {
96168
testTelemetryPlugin.collect();
97169
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(2));
98-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0));
99-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(2));
100-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0));
101-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0));
102-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0));
170+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(2));
103171
});
104172

105173
// Now half of them
106174
reindex().source("source").destination("dest_half").filter(termQuery("foo", "a")).get();
107175
assertBusy(() -> {
108176
testTelemetryPlugin.collect();
109177
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(3));
110-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0));
111-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(3));
112-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0));
113-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0));
114-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0));
178+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(3));
115179
});
116180

117181
// Limit with maxDocs
118182
reindex().source("source").destination("dest_size_one").maxDocs(1).get();
119183
assertBusy(() -> {
120184
testTelemetryPlugin.collect();
121185
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(4));
122-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0));
123-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(4));
124-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0));
125-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0));
126-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0));
186+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(4));
187+
188+
// asset all metric attributes are correct
189+
testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).forEach(m -> {
190+
assertNull(m.attributes().get(ATTRIBUTE_NAME_ERROR_TYPE));
191+
assertThat(m.attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL));
192+
});
127193
});
128194
}
129195

@@ -144,11 +210,14 @@ public void testReindexMetricsWithBulkFailure() throws Exception {
144210
assertBusy(() -> {
145211
testTelemetryPlugin.collect();
146212
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
147-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0));
148-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(0));
149-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0));
150-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(1));
151-
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0));
213+
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM).size(), equalTo(1));
214+
assertThat(
215+
testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_COMPLETION_HISTOGRAM)
216+
.getFirst()
217+
.attributes()
218+
.get(ATTRIBUTE_NAME_ERROR_TYPE),
219+
equalTo("org.elasticsearch.index.mapper.DocumentParsingException")
220+
);
152221
});
153222
}
154223

modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexMetrics.java

Lines changed: 46 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -9,72 +9,74 @@
99

1010
package org.elasticsearch.reindex;
1111

12+
import org.elasticsearch.ElasticsearchStatusException;
1213
import org.elasticsearch.telemetry.metric.LongHistogram;
1314
import org.elasticsearch.telemetry.metric.MeterRegistry;
1415

16+
import java.util.HashMap;
17+
import java.util.Map;
18+
1519
public class ReindexMetrics {
1620

1721
public static final String REINDEX_TIME_HISTOGRAM = "es.reindex.duration.histogram";
18-
// metrics for remote reindex should be a subset of the all metrics
19-
public static final String REINDEX_TIME_HISTOGRAM_REMOTE = "es.reindex.duration.histogram.remote";
20-
public static final String REINDEX_SUCCESS_HISTOGRAM = "es.reindex.completion.success";
21-
public static final String REINDEX_SUCCESS_HISTOGRAM_REMOTE = "es.reindex.completion.success.remote";
22-
public static final String REINDEX_FAILURE_HISTOGRAM = "es.reindex.completion.failure";
23-
public static final String REINDEX_FAILURE_HISTOGRAM_REMOTE = "es.reindex.completion.failure.remote";
22+
public static final String REINDEX_COMPLETION_HISTOGRAM = "es.reindex.completion.histogram";
23+
24+
// refers to https://opentelemetry.io/docs/specs/semconv/registry/attributes/error/#error-type
25+
public static final String ATTRIBUTE_NAME_ERROR_TYPE = "error.type";
26+
27+
public static final String ATTRIBUTE_NAME_SOURCE = "reindex.source";
28+
public static final String ATTRIBUTE_VALUE_SOURCE_LOCAL = "local";
29+
public static final String ATTRIBUTE_VALUE_SOURCE_REMOTE = "remote";
2430

2531
private final LongHistogram reindexTimeSecsHistogram;
26-
private final LongHistogram reindexTimeSecsHistogramRemote;
27-
private final LongHistogram reindexSuccessHistogram;
28-
private final LongHistogram reindexSuccessHistogramRemote;
29-
private final LongHistogram reindexFailureHistogram;
30-
private final LongHistogram reindexFailureHistogramRemote;
32+
private final LongHistogram reindexCompletionHistogram;
3133

3234
public ReindexMetrics(MeterRegistry meterRegistry) {
3335
this.reindexTimeSecsHistogram = meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds");
34-
this.reindexTimeSecsHistogramRemote = meterRegistry.registerLongHistogram(
35-
REINDEX_TIME_HISTOGRAM_REMOTE,
36-
"Time to reindex by search from remote cluster",
37-
"seconds"
38-
);
39-
40-
this.reindexSuccessHistogram = meterRegistry.registerLongHistogram(
41-
REINDEX_SUCCESS_HISTOGRAM,
42-
"Number of successful reindex",
43-
"unit"
44-
);
45-
this.reindexSuccessHistogramRemote = meterRegistry.registerLongHistogram(
46-
REINDEX_SUCCESS_HISTOGRAM_REMOTE,
47-
"Number of successful reindex from remote cluster",
48-
"unit"
49-
);
50-
51-
this.reindexFailureHistogram = meterRegistry.registerLongHistogram(REINDEX_FAILURE_HISTOGRAM, "Number of failed reindex", "unit");
52-
this.reindexFailureHistogramRemote = meterRegistry.registerLongHistogram(
53-
REINDEX_FAILURE_HISTOGRAM_REMOTE,
54-
"Number of failed reindex from remote cluster",
36+
this.reindexCompletionHistogram = meterRegistry.registerLongHistogram(
37+
REINDEX_COMPLETION_HISTOGRAM,
38+
"Number of completed reindex operations",
5539
"unit"
5640
);
5741
}
5842

5943
public long recordTookTime(long tookTime, boolean remote) {
60-
reindexTimeSecsHistogram.record(tookTime);
61-
if (remote) {
62-
reindexTimeSecsHistogramRemote.record(tookTime);
63-
}
44+
Map<String, Object> attributes = getAttributes(remote);
45+
46+
reindexTimeSecsHistogram.record(tookTime, attributes);
6447
return tookTime;
6548
}
6649

6750
public void recordSuccess(boolean remote) {
68-
reindexSuccessHistogram.record(1L);
69-
if (remote) {
70-
reindexSuccessHistogramRemote.record(1L);
71-
}
51+
Map<String, Object> attributes = getAttributes(remote);
52+
// attribute ATTRIBUTE_ERROR_TYPE being absent indicates success
53+
assert attributes.get(ATTRIBUTE_NAME_ERROR_TYPE) == null : "error.type attribute must not be present for successes";
54+
55+
reindexCompletionHistogram.record(1L, attributes);
7256
}
7357

74-
public void recordFailure(boolean remote) {
75-
reindexFailureHistogram.record(1L);
76-
if (remote) {
77-
reindexFailureHistogramRemote.record(1L);
58+
public void recordFailure(boolean remote, Throwable e) {
59+
Map<String, Object> attributes = getAttributes(remote);
60+
// best effort to extract useful error type if possible
61+
String errorType;
62+
if (e instanceof ElasticsearchStatusException ese) {
63+
errorType = ese.status().name();
64+
} else {
65+
errorType = e.getClass().getTypeName();
7866
}
67+
attributes.put(ATTRIBUTE_NAME_ERROR_TYPE, errorType);
68+
69+
// attribute ATTRIBUTE_ERROR_TYPE being present indicates failure
70+
// https://opentelemetry.io/docs/specs/semconv/general/recording-errors/#recording-errors-on-metrics
71+
assert attributes.get(ATTRIBUTE_NAME_ERROR_TYPE) != null : "error.type attribute must be present for failures";
72+
73+
reindexCompletionHistogram.record(1L, attributes);
74+
}
75+
76+
private Map<String, Object> getAttributes(boolean remote) {
77+
Map<String, Object> attributes = new HashMap<>();
78+
attributes.put(ATTRIBUTE_NAME_SOURCE, remote ? "remote" : "local");
79+
80+
return attributes;
7981
}
8082
}

modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import java.util.ArrayList;
7070
import java.util.List;
7171
import java.util.Map;
72+
import java.util.Optional;
7273
import java.util.concurrent.TimeUnit;
7374
import java.util.concurrent.atomic.AtomicInteger;
7475
import java.util.function.BiFunction;
@@ -152,12 +153,25 @@ static ActionListener<BulkByScrollResponse> wrapWithMetrics(
152153
if (metrics == null) {
153154
return listener;
154155
}
156+
157+
// add completion metrics
155158
var withCompletionMetrics = new ActionListener<BulkByScrollResponse>() {
156159
@Override
157160
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
158-
if ((bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().isEmpty() == false)
159-
|| (bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().isEmpty() == false)) {
160-
metrics.recordFailure(isRemote);
161+
var searchExceptionSample = Optional.ofNullable(bulkByScrollResponse.getSearchFailures())
162+
.stream()
163+
.flatMap(List::stream)
164+
.map(ScrollableHitSource.SearchFailure::getReason)
165+
.findFirst();
166+
var bulkExceptionSample = Optional.ofNullable(bulkByScrollResponse.getBulkFailures())
167+
.stream()
168+
.flatMap(List::stream)
169+
.map(BulkItemResponse.Failure::getCause)
170+
.findFirst();
171+
if (searchExceptionSample.isPresent() || bulkExceptionSample.isPresent()) {
172+
// record only the first sample error in metric
173+
Throwable e = searchExceptionSample.orElseGet(bulkExceptionSample::get);
174+
metrics.recordFailure(isRemote, e);
161175
listener.onResponse(bulkByScrollResponse);
162176
} else {
163177
metrics.recordSuccess(isRemote);
@@ -167,11 +181,12 @@ public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
167181

168182
@Override
169183
public void onFailure(Exception e) {
170-
metrics.recordFailure(isRemote);
184+
metrics.recordFailure(isRemote, e);
171185
listener.onFailure(e);
172186
}
173187
};
174188

189+
// add duration metric
175190
return ActionListener.runAfter(withCompletionMetrics, () -> {
176191
long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime);
177192
metrics.recordTookTime(elapsedTime, isRemote);

0 commit comments

Comments
 (0)