Skip to content

Commit b423b7b

Browse files
onFailure() with TaskCancelledException when cancellation is detected
1 parent 8d6f7cc commit b423b7b

File tree

2 files changed

+43
-4
lines changed

2 files changed

+43
-4
lines changed

server/src/main/java/org/elasticsearch/common/util/CancellableSingleObjectCache.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,13 @@ protected boolean isFresh(Key currentKey, Key newKey) {
100100
return currentKey.equals(newKey);
101101
}
102102

103+
/**
104+
* Sets the currently cached item reference to {@code null}, which will result in a {@code refresh()} on the next {@code get()} call.
105+
*/
106+
protected void clearCurrentCachedItem() {
107+
this.currentCachedItemRef.set(null);
108+
}
109+
103110
/**
104111
* Start a retrieval for the value associated with the given {@code input}, and pass it to the given {@code listener}.
105112
* <p>
@@ -110,7 +117,8 @@ protected boolean isFresh(Key currentKey, Key newKey) {
110117
*
111118
* @param input The input to compute the desired value, converted to a {@link Key} to determine if the value that's currently
112119
* cached or pending is fresh enough.
113-
* @param isCancelled Returns {@code true} if the listener no longer requires the value being computed.
120+
* @param isCancelled Returns {@code true} if the listener no longer requires the value being computed. The listener is expected to be
121+
* completed as soon as possible when cancellation is detected.
114122
* @param listener The listener to notify when the desired value becomes available.
115123
*/
116124
public final void get(Input input, BooleanSupplier isCancelled, ActionListener<Value> listener) {
@@ -230,11 +238,15 @@ boolean addListener(ActionListener<Value> listener, BooleanSupplier isCancelled)
230238
ActionListener.completeWith(listener, future::actionResult);
231239
} else {
232240
// Refresh is still pending; it's not cancelled because there are still references.
233-
future.addListener(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext));
241+
final var cancellableListener = ActionListener.notifyOnce(
242+
ContextPreservingActionListener.wrapPreservingContext(listener, threadContext)
243+
);
244+
future.addListener(cancellableListener);
234245
final AtomicBoolean released = new AtomicBoolean();
235246
cancellationChecks.add(() -> {
236247
if (released.get() == false && isCancelled.getAsBoolean() && released.compareAndSet(false, true)) {
237248
decRef();
249+
cancellableListener.onFailure(new TaskCancelledException("task cancelled"));
238250
}
239251
});
240252
}

server/src/test/java/org/elasticsearch/common/util/CancellableSingleObjectCacheTests.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,35 @@ public void testListenerCompletedByRefreshEvenIfDiscarded() throws ExecutionExce
101101
testCache.completeNextRefresh("foo", 1);
102102
assertThat(future2.result(), equalTo(1));
103103

104-
// ... and the original listener is also completed successfully
105-
assertThat(future1.result(), sameInstance(future2.result()));
104+
// We expect the first listener to have been completed with a cancellation exception when detected in the ensureNotCancelled() call.
105+
assertTrue(future1.isDone());
106+
expectThrows(ExecutionException.class, TaskCancelledException.class, future1::result);
107+
}
108+
109+
public void testBothListenersReceiveTaskCancelledExceptionWhenBothSupersededAndNewTasksAreCancelled() {
110+
final TestCache testCache = new TestCache();
111+
112+
// This computation is superseded and then cancelled.
113+
final AtomicBoolean isCancelled = new AtomicBoolean();
114+
final TestFuture future1 = new TestFuture();
115+
testCache.get("foo", isCancelled::get, future1);
116+
testCache.assertPendingRefreshes(1);
117+
118+
// A second get() call that supersedes the original refresh and starts another one, but will be cancelled as well.
119+
final TestFuture future2 = new TestFuture();
120+
testCache.get("bar", isCancelled::get, future2);
121+
testCache.assertPendingRefreshes(2);
122+
123+
testCache.assertNextRefreshCancelled();
124+
assertFalse(future1.isDone());
125+
testCache.assertPendingRefreshes(1);
126+
assertFalse(future2.isDone());
127+
128+
isCancelled.set(true);
129+
// This next refresh should also fail with a cancellation exception.
130+
testCache.completeNextRefresh("bar", 1);
131+
expectThrows(ExecutionException.class, TaskCancelledException.class, future1::result);
132+
expectThrows(ExecutionException.class, TaskCancelledException.class, future2::result);
106133
}
107134

108135
public void testListenerCompletedWithCancellationExceptionIfRefreshCancelled() throws ExecutionException {

0 commit comments

Comments
 (0)