Skip to content

Commit 0a98bf8

Browse files
authored
Include clusterApplyListener in long cluster apply warnings (#120087)
Relates: ES-10249
1 parent fc0a1e1 commit 0a98bf8

File tree

3 files changed

+95
-10
lines changed

3 files changed

+95
-10
lines changed

docs/changelog/120087.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120087
2+
summary: Include `clusterApplyListener` in long cluster apply warnings
3+
area: Cluster Coordination
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,31 @@ public void run() {
158158
}
159159
}
160160

161+
private record TimedListener(ActionListener<Void> listener, Recorder recorder) implements ActionListener<Void> {
162+
163+
@Override
164+
public void onResponse(Void response) {
165+
try (Releasable ignored = recorder.record("listener.onResponse")) {
166+
listener.onResponse(null);
167+
} catch (Exception e) {
168+
assert false : e;
169+
logger.error("exception thrown by listener.onResponse", e);
170+
}
171+
}
172+
173+
@Override
174+
public void onFailure(Exception e) {
175+
assert e != null;
176+
try (Releasable ignored = recorder.record("listener.onFailure")) {
177+
listener.onFailure(e);
178+
} catch (Exception inner) {
179+
e.addSuppressed(inner);
180+
assert false : e;
181+
logger.error(() -> "exception thrown by listener.onFailure", e);
182+
}
183+
}
184+
}
185+
161186
@Override
162187
protected synchronized void doStop() {
163188
for (Map.Entry<TimeoutClusterStateListener, NotifyTimeout> onGoingTimeout : timeoutClusterStateListeners.entrySet()) {
@@ -394,12 +419,14 @@ private void runTask(String source, Function<ClusterState, ClusterState> updateF
394419

395420
final long startTimeMillis = threadPool.relativeTimeInMillis();
396421
final Recorder stopWatch = new Recorder(threadPool, slowTaskThreadDumpTimeout);
422+
final TimedListener timedListener = new TimedListener(clusterApplyListener, stopWatch);
397423
final ClusterState newClusterState;
398424
try {
399425
try (Releasable ignored = stopWatch.record("running task [" + source + ']')) {
400426
newClusterState = updateFunction.apply(previousClusterState);
401427
}
402428
} catch (Exception e) {
429+
timedListener.onFailure(e);
403430
TimeValue executionTime = getTimeSince(startTimeMillis);
404431
logger.trace(
405432
() -> format(
@@ -412,15 +439,14 @@ private void runTask(String source, Function<ClusterState, ClusterState> updateF
412439
e
413440
);
414441
warnAboutSlowTaskIfNeeded(executionTime, source, stopWatch);
415-
clusterApplyListener.onFailure(e);
416442
return;
417443
}
418444

419445
if (previousClusterState == newClusterState) {
446+
timedListener.onResponse(null);
420447
TimeValue executionTime = getTimeSince(startTimeMillis);
421448
logger.debug("processing [{}]: took [{}] no change in cluster state", source, executionTime);
422449
warnAboutSlowTaskIfNeeded(executionTime, source, stopWatch);
423-
clusterApplyListener.onResponse(null);
424450
} else {
425451
if (logger.isTraceEnabled()) {
426452
logger.debug("cluster state updated, version [{}], source [{}]\n{}", newClusterState.version(), source, newClusterState);
@@ -430,6 +456,7 @@ private void runTask(String source, Function<ClusterState, ClusterState> updateF
430456
try {
431457
setIsApplyingClusterState();
432458
applyChanges(previousClusterState, newClusterState, source, stopWatch);
459+
timedListener.onResponse(null);
433460
TimeValue executionTime = getTimeSince(startTimeMillis);
434461
logger.debug(
435462
"processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})",
@@ -439,8 +466,11 @@ private void runTask(String source, Function<ClusterState, ClusterState> updateF
439466
newClusterState.stateUUID()
440467
);
441468
warnAboutSlowTaskIfNeeded(executionTime, source, stopWatch);
442-
clusterApplyListener.onResponse(null);
443469
} catch (Exception e) {
470+
// failing to apply a cluster state with an exception indicates a bug in validation or in one of the appliers; if we
471+
// continue we will retry with the same cluster state but that might not help.
472+
assert applicationMayFail();
473+
timedListener.onFailure(e);
444474
TimeValue executionTime = getTimeSince(startTimeMillis);
445475
if (logger.isTraceEnabled()) {
446476
logger.warn(() -> format("""
@@ -460,10 +490,6 @@ private void runTask(String source, Function<ClusterState, ClusterState> updateF
460490
e
461491
);
462492
}
463-
// failing to apply a cluster state with an exception indicates a bug in validation or in one of the appliers; if we
464-
// continue we will retry with the same cluster state but that might not help.
465-
assert applicationMayFail();
466-
clusterApplyListener.onFailure(e);
467493
} finally {
468494
clearIsApplyingClusterState();
469495
}

server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ public long relativeTimeInMillis() {
6666
assertThat(Thread.currentThread().getName(), containsString(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME));
6767
return currentTimeMillis;
6868
}
69+
70+
@Override
71+
public long rawRelativeTimeInMillis() {
72+
assertThat(Thread.currentThread().getName(), containsString(ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME));
73+
return currentTimeMillis;
74+
}
6975
};
7076
clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
7177
allowClusterStateApplicationFailure = false;
@@ -207,15 +213,33 @@ public void testLongClusterStateUpdateLogging() throws Exception {
207213
);
208214
mockLog.addExpectation(
209215
new MockLog.SeenEventExpectation(
210-
"test4",
216+
"test3",
211217
ClusterApplierService.class.getCanonicalName(),
212218
Level.WARN,
213219
"*cluster state applier task [test3] took [34s] which is above the warn threshold of [*]: "
214220
+ "[running task [test3]] took [*"
215221
)
216222
);
223+
mockLog.addExpectation(
224+
new MockLog.SeenEventExpectation(
225+
"test4",
226+
ClusterApplierService.class.getCanonicalName(),
227+
Level.WARN,
228+
"*cluster state applier task [test4] took [36s] which is above the warn threshold of [*]: "
229+
+ "[running task [test4]] took [*"
230+
)
231+
);
232+
mockLog.addExpectation(
233+
new MockLog.SeenEventExpectation(
234+
"test5",
235+
ClusterApplierService.class.getCanonicalName(),
236+
Level.WARN,
237+
"*cluster state applier task [test5] took [38s] which is above the warn threshold of [*]: "
238+
+ "[running task [test5]] took [*"
239+
)
240+
);
217241

218-
final CountDownLatch latch = new CountDownLatch(4);
242+
final CountDownLatch latch = new CountDownLatch(6);
219243
final CountDownLatch processedFirstTask = new CountDownLatch(1);
220244
currentTimeMillis = randomLongBetween(0L, Long.MAX_VALUE / 2);
221245
clusterApplierService.runOnApplierThread(
@@ -266,9 +290,39 @@ public void onFailure(Exception e) {
266290
}
267291
}
268292
);
293+
clusterApplierService.runOnApplierThread("test4", Priority.HIGH, currentState -> {
294+
// do nothing (testing that onResponse is included in timing)
295+
}, new ActionListener<>() {
296+
297+
@Override
298+
public void onResponse(Void unused) {
299+
advanceTime(TimeValue.timeValueSeconds(36).millis());
300+
latch.countDown();
301+
}
302+
303+
@Override
304+
public void onFailure(Exception e) {
305+
fail();
306+
}
307+
});
308+
clusterApplierService.runOnApplierThread("test5", Priority.HIGH, currentState -> {
309+
throw new IllegalArgumentException("Testing that onFailure is included in timing");
310+
}, new ActionListener<>() {
311+
312+
@Override
313+
public void onResponse(Void unused) {
314+
fail();
315+
}
316+
317+
@Override
318+
public void onFailure(Exception e) {
319+
advanceTime(TimeValue.timeValueSeconds(38).millis());
320+
latch.countDown();
321+
}
322+
});
269323
// Additional update task to make sure all previous logging made it to the loggerName
270324
// We don't check logging for this on since there is no guarantee that it will occur before our check
271-
clusterApplierService.runOnApplierThread("test4", Priority.HIGH, currentState -> {}, new ActionListener<>() {
325+
clusterApplierService.runOnApplierThread("test6", Priority.HIGH, currentState -> {}, new ActionListener<>() {
272326
@Override
273327
public void onResponse(Void ignored) {
274328
latch.countDown();

0 commit comments

Comments
 (0)