Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@

import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.reindex.DeleteByQueryMetrics.DELETE_BY_QUERY_TIME_HISTOGRAM;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_FAILURE_HISTOGRAM;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_FAILURE_HISTOGRAM_REMOTE;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_SUCCESS_HISTOGRAM;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_SUCCESS_HISTOGRAM_REMOTE;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM_REMOTE;
import static org.elasticsearch.reindex.UpdateByQueryMetrics.UPDATE_BY_QUERY_TIME_HISTOGRAM;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
public class ReindexPluginMetricsIT extends ESIntegTestCase {
Expand Down Expand Up @@ -75,33 +81,74 @@ public void testReindexMetrics() throws Exception {
// Use assertBusy to wait for all threads to complete so we get deterministic results
assertBusy(() -> {
testTelemetryPlugin.collect();
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
assertThat(measurements.size(), equalTo(1));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(1));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0));
});

// Now none of them
createIndex("none");
reindex().source("source").destination("none").filter(termQuery("foo", "no_match")).get();
assertBusy(() -> {
testTelemetryPlugin.collect();
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
assertThat(measurements.size(), equalTo(2));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(2));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(2));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0));
});

// Now half of them
reindex().source("source").destination("dest_half").filter(termQuery("foo", "a")).get();
assertBusy(() -> {
testTelemetryPlugin.collect();
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
assertThat(measurements.size(), equalTo(3));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(3));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(3));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0));
});

// Limit with maxDocs
reindex().source("source").destination("dest_size_one").maxDocs(1).get();
assertBusy(() -> {
testTelemetryPlugin.collect();
List<Measurement> measurements = testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM);
assertThat(measurements.size(), equalTo(4));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(4));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(4));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0));
});
}

public void testReindexMetricsWithBulkFailure() throws Exception {
final String dataNodeName = internalCluster().startNode();
final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, dataNodeName)
.filterPlugins(TestTelemetryPlugin.class)
.findFirst()
.orElseThrow();

// source and destination have conflicting mappings to cause bulk failures
indexRandom(true, prepareIndex("source").setId("2").setSource("test", "words words"));
indexRandom(true, prepareIndex("dest").setId("1").setSource("test", 10));

var response = reindex().source("source").destination("dest").get();
assertThat(response.getBulkFailures().size(), greaterThanOrEqualTo(1));

assertBusy(() -> {
testTelemetryPlugin.collect();
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM).size(), equalTo(1));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_TIME_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_SUCCESS_HISTOGRAM_REMOTE).size(), equalTo(0));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM).size(), equalTo(1));
assertThat(testTelemetryPlugin.getLongHistogramMeasurement(REINDEX_FAILURE_HISTOGRAM_REMOTE).size(), equalTo(0));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,66 @@
public class ReindexMetrics {

public static final String REINDEX_TIME_HISTOGRAM = "es.reindex.duration.histogram";
// metrics for remote reindex should be a subset of the all metrics
public static final String REINDEX_TIME_HISTOGRAM_REMOTE = "es.reindex.duration.histogram.remote";
public static final String REINDEX_SUCCESS_HISTOGRAM = "es.reindex.completion.success";
public static final String REINDEX_SUCCESS_HISTOGRAM_REMOTE = "es.reindex.completion.success.remote";
public static final String REINDEX_FAILURE_HISTOGRAM = "es.reindex.completion.failure";
public static final String REINDEX_FAILURE_HISTOGRAM_REMOTE = "es.reindex.completion.failure.remote";

private final LongHistogram reindexTimeSecsHistogram;
private final LongHistogram reindexTimeSecsHistogramRemote;
private final LongHistogram reindexSuccessHistogram;
private final LongHistogram reindexSuccessHistogramRemote;
private final LongHistogram reindexFailureHistogram;
private final LongHistogram reindexFailureHistogramRemote;

public ReindexMetrics(MeterRegistry meterRegistry) {
this(meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds"));
}
this.reindexTimeSecsHistogram = meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds");
this.reindexTimeSecsHistogramRemote = meterRegistry.registerLongHistogram(
REINDEX_TIME_HISTOGRAM_REMOTE,
"Time to reindex by search from remote cluster",
"seconds"
);

this.reindexSuccessHistogram = meterRegistry.registerLongHistogram(
REINDEX_SUCCESS_HISTOGRAM,
"Number of successful reindex",
"unit"
);
this.reindexSuccessHistogramRemote = meterRegistry.registerLongHistogram(
REINDEX_SUCCESS_HISTOGRAM_REMOTE,
"Number of successful reindex from remote cluster",
"unit"
);

private ReindexMetrics(LongHistogram reindexTimeSecsHistogram) {
this.reindexTimeSecsHistogram = reindexTimeSecsHistogram;
this.reindexFailureHistogram = meterRegistry.registerLongHistogram(REINDEX_FAILURE_HISTOGRAM, "Number of failed reindex", "unit");
this.reindexFailureHistogramRemote = meterRegistry.registerLongHistogram(
REINDEX_FAILURE_HISTOGRAM_REMOTE,
"Number of failed reindex from remote cluster",
"unit"
);
}

public long recordTookTime(long tookTime) {
public long recordTookTime(long tookTime, boolean remote) {
reindexTimeSecsHistogram.record(tookTime);
if (remote) {
reindexTimeSecsHistogramRemote.record(tookTime);
}
return tookTime;
}

public void recordSuccess(boolean remote) {
reindexSuccessHistogram.record(1L);
if (remote) {
reindexSuccessHistogramRemote.record(1L);
}
}

public void recordFailure(boolean remote) {
reindexFailureHistogram.record(1L);
if (remote) {
reindexFailureHistogramRemote.record(1L);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,49 @@ public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkCl
projectResolver.getProjectState(clusterService.state()),
reindexSslConfig,
request,
ActionListener.runAfter(listener, () -> {
long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime);
if (reindexMetrics != null) {
reindexMetrics.recordTookTime(elapsedTime);
}
})
wrapWithMetrics(listener, reindexMetrics, startTime, request.getRemoteInfo() != null)
);
searchAction.start();
}
);
}

// Visible for testing
static ActionListener<BulkByScrollResponse> wrapWithMetrics(
ActionListener<BulkByScrollResponse> listener,
@Nullable ReindexMetrics metrics,
long startTime,
boolean isRemote
) {
if (metrics == null) {
return listener;
}
var withCompletionMetrics = new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
if ((bulkByScrollResponse.getBulkFailures() != null && bulkByScrollResponse.getBulkFailures().isEmpty() == false)
|| (bulkByScrollResponse.getSearchFailures() != null && bulkByScrollResponse.getSearchFailures().isEmpty() == false)) {
metrics.recordFailure(isRemote);
listener.onResponse(bulkByScrollResponse);
} else {
metrics.recordSuccess(isRemote);
listener.onResponse(bulkByScrollResponse);
}
}

@Override
public void onFailure(Exception e) {
metrics.recordFailure(isRemote);
listener.onFailure(e);
}
};

return ActionListener.runAfter(withCompletionMetrics, () -> {
long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime);
metrics.recordTookTime(elapsedTime, isRemote);
});
}

/**
* Build the {@link RestClient} used for reindexing from remote clusters.
* @param remoteInfo connection information for the remote cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,87 @@

import java.util.List;

import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_FAILURE_HISTOGRAM;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_FAILURE_HISTOGRAM_REMOTE;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_SUCCESS_HISTOGRAM;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_SUCCESS_HISTOGRAM_REMOTE;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM;
import static org.elasticsearch.reindex.ReindexMetrics.REINDEX_TIME_HISTOGRAM_REMOTE;

public class ReindexMetricsTests extends ESTestCase {

private RecordingMeterRegistry recordingMeterRegistry;
private RecordingMeterRegistry registry;
private ReindexMetrics metrics;

@Before
public void createMetrics() {
recordingMeterRegistry = new RecordingMeterRegistry();
metrics = new ReindexMetrics(recordingMeterRegistry);
registry = new RecordingMeterRegistry();
metrics = new ReindexMetrics(registry);
}

public void testRecordTookTime() {
int secondsTaken = randomIntBetween(1, 50);
metrics.recordTookTime(secondsTaken);
List<Measurement> measurements = recordingMeterRegistry.getRecorder()
.getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM);
assertEquals(measurements.size(), 1);
assertEquals(measurements.get(0).getLong(), secondsTaken);
long secondsTaken = randomLongBetween(1, Long.MAX_VALUE);

// first metric
metrics.recordTookTime(secondsTaken, false);

List<Measurement> measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM);
assertEquals(1, measurements.size());
assertEquals(secondsTaken, measurements.getFirst().getLong());

// second metric
long remoteSecondsTaken = randomLongBetween(1, Long.MAX_VALUE);
metrics.recordTookTime(remoteSecondsTaken, true);

measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM);
assertEquals(2, measurements.size());
assertEquals(secondsTaken, measurements.getFirst().getLong());
assertEquals(remoteSecondsTaken, measurements.getLast().getLong());
List<Measurement> measurementsRemote = registry.getRecorder()
.getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_TIME_HISTOGRAM_REMOTE);
assertEquals(1, measurementsRemote.size());
assertEquals(remoteSecondsTaken, measurementsRemote.getFirst().getLong());
}

public void testRecordSuccess() {
// first metric
metrics.recordSuccess(false);

List<Measurement> measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_SUCCESS_HISTOGRAM);
assertEquals(1, measurements.size());
assertEquals(1, measurements.getFirst().getLong());

// second metric
metrics.recordSuccess(true);

measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_SUCCESS_HISTOGRAM);
assertEquals(2, measurements.size());
assertEquals(1, measurements.getFirst().getLong());
assertEquals(1, measurements.getLast().getLong());
List<Measurement> measurementsRemote = registry.getRecorder()
.getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_SUCCESS_HISTOGRAM_REMOTE);
assertEquals(1, measurementsRemote.size());
assertEquals(1, measurements.getFirst().getLong());
}

public void testRecordFailure() {
// first metric
metrics.recordFailure(false);

List<Measurement> measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_FAILURE_HISTOGRAM);
assertEquals(1, measurements.size());
assertEquals(1, measurements.getFirst().getLong());

// second metric
metrics.recordFailure(true);

measurements = registry.getRecorder().getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_FAILURE_HISTOGRAM);
assertEquals(2, measurements.size());
assertEquals(1, measurements.getFirst().getLong());
assertEquals(1, measurements.getLast().getLong());
List<Measurement> measurementsRemote = registry.getRecorder()
.getMeasurements(InstrumentType.LONG_HISTOGRAM, REINDEX_FAILURE_HISTOGRAM_REMOTE);
assertEquals(1, measurementsRemote.size());
assertEquals(1, measurements.getFirst().getLong());
}
}
Loading