Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,58 @@

package org.elasticsearch.index.reindex;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.reindex.BulkIndexByScrollResponseMatcher;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.reindex.TransportReindexAction;
import org.elasticsearch.rest.root.MainRestPlugin;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.ESIntegTestCase;

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.reindex.DeleteByQueryMetrics.DELETE_BY_QUERY_TIME_HISTOGRAM;
import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_ERROR_TYPE;
import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_NAME_SOURCE;
import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_LOCAL;
import static org.elasticsearch.reindex.ReindexMetrics.ATTRIBUTE_VALUE_SOURCE_REMOTE;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_COMPLETION_COUNTER;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM;
import static org.elasticsearch.reindex.UpdateByQueryMetrics.UPDATE_BY_QUERY_TIME_HISTOGRAM;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
public class ReindexPluginMetricsIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(ReindexPlugin.class, TestTelemetryPlugin.class);
return Arrays.asList(ReindexPlugin.class, TestTelemetryPlugin.class, MainRestPlugin.class);
}

@Override
protected boolean addMockHttpTransport() {
return false;
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey(), "*:*")
.build();
}

protected ReindexRequestBuilder reindex() {
Expand All @@ -53,6 +79,62 @@ public static BulkIndexByScrollResponseMatcher matcher() {
return new BulkIndexByScrollResponseMatcher();
}

public void testReindexFromRemoteMetrics() throws Exception {
final String dataNodeName = internalCluster().startNode();

InetSocketAddress remoteAddress = randomFrom(cluster().httpAddresses());
RemoteInfo remote = new RemoteInfo(
"http",
remoteAddress.getHostString(),
remoteAddress.getPort(),
null,
new BytesArray("{\"match_all\":{}}"),
null,
null,
Map.of(),
RemoteInfo.DEFAULT_SOCKET_TIMEOUT,
RemoteInfo.DEFAULT_CONNECT_TIMEOUT
);

final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
.filterPlugins(TestTelemetryPlugin.class)
.findFirst()
.orElseThrow();

var expectedException = assertThrows(
"Source index not created yet, should throw not found exception",
ElasticsearchStatusException.class,
() -> reindex().source("source").setRemoteInfo(remote).destination("dest").get()
);

// assert failure metrics
assertBusy(() -> {
testTelemetryPlugin.collect();
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
List<Measurement> completions = testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER);
assertThat(completions.size(), equalTo(1));
assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_ERROR_TYPE), equalTo(expectedException.status().name()));
assertThat(completions.getFirst().attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE));
});

// now create the source index
indexRandom(true, prepareIndex("source").setId("1").setSource("foo", "a"));
assertHitCount(prepareSearch("source").setSize(0), 1);

reindex().source("source").setRemoteInfo(remote).destination("dest").get();

// assert success metrics
assertBusy(() -> {
testTelemetryPlugin.collect();
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(2));
List<Measurement> completions = testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER);
assertThat(completions.size(), equalTo(2));
assertNull(completions.get(1).attributes().get(ATTRIBUTE_NAME_ERROR_TYPE));
assertThat(completions.get(1).attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_REMOTE));
});

}

public void testReindexMetrics() throws Exception {
final String dataNodeName = internalCluster().startNode();

Expand All @@ -75,33 +157,67 @@ public void testReindexMetrics() throws Exception {
// Use assertBusy to wait for all threads to complete so we get deterministic results
assertBusy(() -> {
testTelemetryPlugin.collect();
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
assertThat(measurements.size(), equalTo(1));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
assertThat(testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER).size(), equalTo(1));
});

// Now none of them
createIndex("none");
reindex().source("source").destination("none").filter(termQuery("foo", "no_match")).get();
assertBusy(() -> {
testTelemetryPlugin.collect();
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
assertThat(measurements.size(), equalTo(2));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(2));
assertThat(testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER).size(), equalTo(2));
});

// Now half of them
reindex().source("source").destination("dest_half").filter(termQuery("foo", "a")).get();
assertBusy(() -> {
testTelemetryPlugin.collect();
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
assertThat(measurements.size(), equalTo(3));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(3));
assertThat(testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER).size(), equalTo(3));
});

// Limit with maxDocs
reindex().source("source").destination("dest_size_one").maxDocs(1).get();
assertBusy(() -> {
testTelemetryPlugin.collect();
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
assertThat(measurements.size(), equalTo(4));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(4));
assertThat(testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER).size(), equalTo(4));

// asset all metric attributes are correct
testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER).forEach(m -> {
assertNull(m.attributes().get(ATTRIBUTE_NAME_ERROR_TYPE));
assertThat(m.attributes().get(ATTRIBUTE_NAME_SOURCE), equalTo(ATTRIBUTE_VALUE_SOURCE_LOCAL));
});
});
}

public void testReindexMetricsWithBulkFailure() throws Exception {
final String dataNodeName = internalCluster().startNode();
final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
.filterPlugins(TestTelemetryPlugin.class)
.findFirst()
.orElseThrow();

// source and destination have conflicting mappings to cause bulk failures
indexRandom(true, prepareIndex("source").setId("2").setSource("test", "words words"));
indexRandom(true, prepareIndex("dest").setId("1").setSource("test", 10));

var response = reindex().source("source").destination("dest").get();
assertThat(response.getBulkFailures().size(), greaterThanOrEqualTo(1));

assertBusy(() -> {
testTelemetryPlugin.collect();
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
assertThat(testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER).size(), equalTo(1));
assertThat(
testTelemetryPlugin.getLongCounterMeasurement(REINDEX_COMPLETION_COUNTER)
.getFirst()
.attributes()
.get(ATTRIBUTE_NAME_ERROR_TYPE),
equalTo("org.elasticsearch.index.mapper.DocumentParsingException")
);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,75 @@

package org.elasticsearch.reindex;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.telemetry.metric.LongCounter;
import org.elasticsearch.telemetry.metric.LongHistogram;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.HashMap;
import java.util.Map;

public class ReindexMetrics {

public static final String REINDEX_TIME_HISTOGRAM = "es.reindex.duration.histogram";
public static final String REINDEX_COMPLETION_COUNTER = "es.reindex.completion.total";

// refers to https://opentelemetry.io/docs/specs/semconv/registry/attributes/error/#error-type
public static final String ATTRIBUTE_NAME_ERROR_TYPE = "error.type";

public static final String ATTRIBUTE_NAME_SOURCE = "reindex.source";
public static final String ATTRIBUTE_VALUE_SOURCE_LOCAL = "local";
public static final String ATTRIBUTE_VALUE_SOURCE_REMOTE = "remote";

private final LongHistogram reindexTimeSecsHistogram;
private final LongCounter reindexCompletionCounter;

public ReindexMetrics(MeterRegistry meterRegistry) {
this(meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds"));
this.reindexTimeSecsHistogram = meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds");
this.reindexCompletionCounter = meterRegistry.registerLongCounter(
REINDEX_COMPLETION_COUNTER,
"Number of completed reindex operations",
"unit"
);
}

private ReindexMetrics(LongHistogram reindexTimeSecsHistogram) {
this.reindexTimeSecsHistogram = reindexTimeSecsHistogram;
}
public long recordTookTime(long tookTime, boolean remote) {
Map<String, Object> attributes = getAttributes(remote);

public long recordTookTime(long tookTime) {
reindexTimeSecsHistogram.record(tookTime);
reindexTimeSecsHistogram.record(tookTime, attributes);
return tookTime;
}

public void recordSuccess(boolean remote) {
Map<String, Object> attributes = getAttributes(remote);
// attribute ATTRIBUTE_ERROR_TYPE being absent indicates success
assert attributes.get(ATTRIBUTE_NAME_ERROR_TYPE) == null : "error.type attribute must not be present for successes";

reindexCompletionCounter.incrementBy(1, attributes);
}

public void recordFailure(boolean remote, Throwable e) {
Map<String, Object> attributes = getAttributes(remote);
// best effort to extract useful error type if possible
String errorType;
if (e instanceof ElasticsearchStatusException ese) {
errorType = ese.status().name();
} else {
errorType = e.getClass().getTypeName();
}
attributes.put(ATTRIBUTE_NAME_ERROR_TYPE, errorType);

// attribute ATTRIBUTE_ERROR_TYPE being present indicates failure
// https://opentelemetry.io/docs/specs/semconv/general/recording-errors/#recording-errors-on-metrics
assert attributes.get(ATTRIBUTE_NAME_ERROR_TYPE) != null : "error.type attribute must be present for failures";

reindexCompletionCounter.incrementBy(1, attributes);
}

private Map<String, Object> getAttributes(boolean remote) {
Map<String, Object> attributes = new HashMap<>();
attributes.put(ATTRIBUTE_NAME_SOURCE, remote ? ATTRIBUTE_VALUE_SOURCE_REMOTE : ATTRIBUTE_VALUE_SOURCE_LOCAL);

return attributes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -135,18 +136,63 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl
projectResolver.getProjectState(clusterService.state()),
reindexSslConfig,
request,
ActionListener.runAfter(listener, () -> {
long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime);
if (reindexMetrics != null) {
reindexMetrics.recordTookTime(elapsedTime);
}
})
wrapWithMetrics(listener, reindexMetrics, startTime, request.getRemoteInfo() != null)
);
searchAction.start();
}
);
}

// Visible for testing
static ActionListener<BulkByScrollResponse> wrapWithMetrics(
ActionListener<BulkByScrollResponse> listener,
@Nullable ReindexMetrics metrics,
long startTime,
boolean isRemote
) {
if (metrics == null) {
return listener;
}

// add completion metrics
var withCompletionMetrics = new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
var searchExceptionSample = Optional.ofNullable(bulkByScrollResponse.getSearchFailures())
.stream()
.flatMap(List::stream)
.map(ScrollableHitSource.SearchFailure::getReason)
.findFirst();
var bulkExceptionSample = Optional.ofNullable(bulkByScrollResponse.getBulkFailures())
.stream()
.flatMap(List::stream)
.map(BulkItemResponse.Failure::getCause)
.findFirst();
if (searchExceptionSample.isPresent() || bulkExceptionSample.isPresent()) {
// record only the first sample error in metric
Throwable e = searchExceptionSample.orElseGet(bulkExceptionSample::get);
metrics.recordFailure(isRemote, e);
listener.onResponse(bulkByScrollResponse);
} else {
metrics.recordSuccess(isRemote);
listener.onResponse(bulkByScrollResponse);
}
}

@Override
public void onFailure(Exception e) {
metrics.recordFailure(isRemote, e);
listener.onFailure(e);
}
};

// add duration metric
return ActionListener.runAfter(withCompletionMetrics, () -> {
long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime);
metrics.recordTookTime(elapsedTime, isRemote);
});
}

/**
* Build the {@link RestClient} used for reindexing from remote clusters.
* @param remoteInfo connection information for the remote cluster
Expand Down
Loading