Skip to content

Commit 2ef1b2d

Browse files
committed
Correct reindex timing metrics
1 parent 20d9166 commit 2ef1b2d

File tree

2 files changed

+70
-19
lines changed

2 files changed

+70
-19
lines changed

modules/reindex/src/main/java/org/elasticsearch/reindex/Reindexer.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -156,18 +156,13 @@ public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListen
156156
}
157157

158158
public void execute(BulkByScrollTask task, ReindexRequest request, Client bulkClient, ActionListener<BulkByScrollResponse> listener) {
159-
// todo(szy/sam): handle sliced and non-sliced
160-
// todo(szy/sam): bug, we send System::nanoTime across JVMs
161-
final long startTime = System.nanoTime();
162-
163159
// todo: move relocations to BulkByPaginatedSearchParallelizationHelper rather than having it in Reindexer, makes it generic
164160
// for update-by-query and delete-by-query
165161
final ActionListener<BulkByScrollResponse> responseListener = wrapWithMetrics(
166162
listenerWithRelocations(task, request, listener),
167163
reindexMetrics,
168164
task,
169-
request,
170-
startTime
165+
request
171166
);
172167

173168
final boolean isRemote = request.getRemoteInfo() != null;
@@ -401,8 +396,18 @@ static ActionListener<BulkByScrollResponse> wrapWithMetrics(
401396
ActionListener<BulkByScrollResponse> listener,
402397
@Nullable ReindexMetrics metrics,
403398
BulkByScrollTask task,
404-
ReindexRequest request,
405-
long startTime
399+
ReindexRequest request
400+
) {
401+
return wrapWithMetrics(listener, metrics, task, request, System::currentTimeMillis);
402+
}
403+
404+
/** Overload accepting a clock supplier for testability. Visible for testing*/
405+
static ActionListener<BulkByScrollResponse> wrapWithMetrics(
406+
final ActionListener<BulkByScrollResponse> listener,
407+
final @Nullable ReindexMetrics metrics,
408+
final BulkByScrollTask task,
409+
final ReindexRequest request,
410+
final LongSupplier currentTimeMillisSupplier
406411
) {
407412
if (metrics == null) {
408413
return listener;
@@ -414,11 +419,13 @@ static ActionListener<BulkByScrollResponse> wrapWithMetrics(
414419
}
415420
final boolean isRemote = request.getRemoteInfo() != null;
416421
final ReindexMetrics.SlicingMode slicingMode = ReindexMetrics.resolveSlicingMode(request);
417-
// todo(szy): add relocation metrics
418422
return new ActionListener<>() {
419423
private void recordDuration() {
420-
long elapsedTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - startTime);
421-
metrics.recordTookTime(elapsedTime, isRemote, slicingMode);
424+
// handles relocations
425+
long elapsedTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(
426+
currentTimeMillisSupplier.getAsLong() - task.relocationOrigin().originalStartTimeMillis()
427+
);
428+
metrics.recordTookTime(elapsedTimeSeconds, isRemote, slicingMode);
422429
}
423430

424431
@Override

modules/reindex/src/test/java/org/elasticsearch/reindex/ReindexerTests.java

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.elasticsearch.transport.TransportResponseHandler;
7171
import org.elasticsearch.transport.TransportService;
7272
import org.elasticsearch.watcher.ResourceWatcherService;
73+
import org.mockito.ArgumentCaptor;
7374

7475
import java.io.IOException;
7576
import java.io.OutputStream;
@@ -82,6 +83,7 @@
8283
import java.util.Optional;
8384
import java.util.Set;
8485
import java.util.concurrent.ExecutorService;
86+
import java.util.concurrent.TimeUnit;
8587
import java.util.concurrent.atomic.AtomicInteger;
8688
import java.util.function.BiPredicate;
8789
import java.util.function.Consumer;
@@ -114,7 +116,7 @@ public void testWrapWithMetricsSuccess() {
114116
ReindexMetrics metrics = mock();
115117
ActionListener<BulkByScrollResponse> listener = spy(ActionListener.noop());
116118
BulkByScrollTask task = createNonSlicedWorkerTask();
117-
var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong());
119+
var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest());
118120

119121
BulkByScrollResponse response = reindexResponseWithBulkAndSearchFailures(null, null);
120122
wrapped.onResponse(response);
@@ -129,7 +131,7 @@ public void testWrapWithMetricsFailure() {
129131
ReindexMetrics metrics = mock();
130132
ActionListener<BulkByScrollResponse> listener = spy(ActionListener.noop());
131133
BulkByScrollTask task = createNonSlicedWorkerTask();
132-
var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong());
134+
var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest());
133135

134136
Exception exception = new Exception("random failure");
135137
wrapped.onFailure(exception);
@@ -144,7 +146,7 @@ public void testWrapWithMetricsBulkFailure() {
144146
ReindexMetrics metrics = mock();
145147
ActionListener<BulkByScrollResponse> listener = spy(ActionListener.noop());
146148
BulkByScrollTask task = createNonSlicedWorkerTask();
147-
var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong());
149+
var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest());
148150

149151
Exception exception = new Exception("random failure");
150152
Exception anotherException = new Exception("another failure");
@@ -164,7 +166,7 @@ public void testWrapWithMetricsSearchFailure() {
164166
ReindexMetrics metrics = mock();
165167
ActionListener<BulkByScrollResponse> listener = spy(ActionListener.noop());
166168
BulkByScrollTask task = createNonSlicedWorkerTask();
167-
var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong());
169+
var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest());
168170

169171
Exception exception = new Exception("random failure");
170172
Exception anotherException = new Exception("another failure");
@@ -185,7 +187,7 @@ public void testWrapWithMetricsSkipsSliceWorker() {
185187
ActionListener<BulkByScrollResponse> listener = spy(ActionListener.noop());
186188
BulkByScrollTask task = createSliceWorkerTask();
187189

188-
var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong());
190+
var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest());
189191

190192
assertSame(listener, wrapped);
191193
verifyNoMoreInteractions(metrics);
@@ -196,7 +198,7 @@ public void testWrapWithMetricsWrapsLeader() {
196198
ActionListener<BulkByScrollResponse> listener = spy(ActionListener.noop());
197199
BulkByScrollTask task = createLeaderTask();
198200

199-
var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong());
201+
var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest());
200202

201203
assertNotSame(listener, wrapped);
202204

@@ -212,7 +214,7 @@ public void testWrapWithMetricsSkipsMetricsWhenRelocating() {
212214
ActionListener<BulkByScrollResponse> listener = spy(ActionListener.noop());
213215
BulkByScrollTask task = createNonSlicedWorkerTask();
214216

215-
var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), randomNonNegativeLong());
217+
var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest());
216218

217219
BulkByScrollResponse response = reindexResponseWithResumeInfo();
218220
wrapped.onResponse(response);
@@ -221,6 +223,44 @@ public void testWrapWithMetricsSkipsMetricsWhenRelocating() {
221223
verifyNoMoreInteractions(metrics);
222224
}
223225

226+
public void testWrapWithMetricsRecordsDurationFromRelocationOrigin() {
227+
final long taskStartTimeMillis = TimeUnit.SECONDS.toMillis(randomLongBetween(0, 100));
228+
final long currentTimeMillis = taskStartTimeMillis + TimeUnit.SECONDS.toMillis(randomIntBetween(0, 100));
229+
final long expectedElapsedSeconds = TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - taskStartTimeMillis);
230+
231+
final ReindexMetrics metrics = mock();
232+
final ActionListener<BulkByScrollResponse> listener = spy(ActionListener.noop());
233+
final ResumeInfo.RelocationOrigin origin = new ResumeInfo.RelocationOrigin(randomTaskId(), taskStartTimeMillis);
234+
final BulkByScrollTask task = nonSlicedWorkerTaskWithOrigin(origin);
235+
236+
final var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), () -> currentTimeMillis);
237+
238+
final BulkByScrollResponse response = reindexResponseWithBulkAndSearchFailures(null, null);
239+
wrapped.onResponse(response);
240+
241+
final ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
242+
verify(metrics).recordTookTime(captor.capture(), eq(false), any());
243+
assertThat(captor.getValue(), equalTo(expectedElapsedSeconds));
244+
}
245+
246+
public void testWrapWithMetricsRecordsDurationForNewTask() {
247+
final ReindexMetrics metrics = mock();
248+
final ActionListener<BulkByScrollResponse> listener = spy(ActionListener.noop());
249+
final BulkByScrollTask task = nonSlicedWorkerTaskWithOrigin(null);
250+
final long taskStartTime = task.getStartTime();
251+
final long currentTimeMillis = taskStartTime + TimeUnit.SECONDS.toMillis(randomIntBetween(0, 100));
252+
final long expectedElapsedSeconds = TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - taskStartTime);
253+
254+
final var wrapped = Reindexer.wrapWithMetrics(listener, metrics, task, reindexRequest(), () -> currentTimeMillis);
255+
256+
final BulkByScrollResponse response = reindexResponseWithBulkAndSearchFailures(null, null);
257+
wrapped.onResponse(response);
258+
259+
final ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
260+
verify(metrics).recordTookTime(captor.capture(), eq(false), any());
261+
assertThat(captor.getValue(), equalTo(expectedElapsedSeconds));
262+
}
263+
224264
// listenerWithRelocations tests
225265

226266
public void testListenerWithRelocationsPassesThroughForWorkerWithLeaderParent() {
@@ -1233,6 +1273,10 @@ private BulkByScrollResponse reindexResponseWithResumeInfo() {
12331273
}
12341274

12351275
private static BulkByScrollTask createNonSlicedWorkerTask() {
1276+
return nonSlicedWorkerTaskWithOrigin(randomOrigin());
1277+
}
1278+
1279+
private static BulkByScrollTask nonSlicedWorkerTaskWithOrigin(ResumeInfo.RelocationOrigin origin) {
12361280
BulkByScrollTask task = new BulkByScrollTask(
12371281
randomTaskId(),
12381282
randomAlphaOfLength(10),
@@ -1241,7 +1285,7 @@ private static BulkByScrollTask createNonSlicedWorkerTask() {
12411285
TaskId.EMPTY_TASK_ID,
12421286
Map.of(),
12431287
randomBoolean(),
1244-
randomOrigin()
1288+
origin
12451289
);
12461290
task.setWorker(Float.POSITIVE_INFINITY, null);
12471291
return task;

0 commit comments

Comments
 (0)