Skip to content

Commit 0b1c91c

Browse files
authored
Fold BaseFuture into PlainActionFuture (#93152)
Following #93145 and #93146, a `PlainActionFuture` is now just a `BaseFuture` plus a few utility `actionGet(...)` methods. The only (non-test) subclass of `BaseFuture` that doesn't inherit these methods is `ListenableFuture`, and there's no problem with adding them there, so this commit makes `ListenableFuture` inherit from `PlainActionFuture` and folds the now-unnecessary `BaseFuture` into `PlainActionFuture`.
1 parent 226db04 commit 0b1c91c

File tree

6 files changed

+369
-366
lines changed

6 files changed

+369
-366
lines changed

server/src/main/java/org/elasticsearch/action/support/ListenableActionFuture.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,29 @@
99
package org.elasticsearch.action.support;
1010

1111
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.common.util.concurrent.ListenableFuture;
1213

1314
import java.util.ArrayList;
1415
import java.util.List;
1516

1617
/**
17-
* A {@code Future} and {@link ActionListener} against which other {@link ActionListener}s can be registered later, to support
18-
* fanning-out a result to a dynamic collection of listeners.
18+
* An {@link ActionListener} which allows for the result to fan out to a (dynamic) collection of other listeners, added using {@link
19+
* #addListener}. Listeners added before completion are retained until completion; listeners added after completion are completed
20+
* immediately.
21+
*
22+
* Similar to {@link ListenableFuture}, with the following differences:
23+
*
24+
* <ul>
25+
* <li>This listener will silently ignore additional completions, whereas {@link ListenableFuture} must not be completed more than once.
26+
* <li>This listener completes the retained listeners on directly the completing thread, so you must use {@link ThreadedActionListener} if
27+
* dispatching is needed. In contrast, {@link ListenableFuture} allows to dispatch only the retained listeners, while immediately-completed
28+
* listeners are completed on the subscribing thread.
29+
* <li>This listener completes the retained listeners in the context of the completing thread, so you must remember to use {@link
30+
* ContextPreservingActionListener} to capture the thread context yourself if needed. In contrast, {@link ListenableFuture} allows for the
31+
* thread context to be captured at subscription time.
32+
* </ul>
1933
*/
34+
// The name {@link ListenableActionFuture} dates back a long way and could be improved - TODO find a better name
2035
public class ListenableActionFuture<T> extends PlainActionFuture<T> {
2136

2237
private Object listeners;

server/src/main/java/org/elasticsearch/action/support/PlainActionFuture.java

Lines changed: 324 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,184 @@
1111
import org.elasticsearch.ElasticsearchException;
1212
import org.elasticsearch.action.ActionFuture;
1313
import org.elasticsearch.action.ActionListener;
14-
import org.elasticsearch.common.util.concurrent.BaseFuture;
14+
import org.elasticsearch.cluster.service.ClusterApplierService;
15+
import org.elasticsearch.cluster.service.MasterService;
1516
import org.elasticsearch.common.util.concurrent.FutureUtils;
1617
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
1718
import org.elasticsearch.core.CheckedConsumer;
19+
import org.elasticsearch.core.Nullable;
1820
import org.elasticsearch.core.TimeValue;
21+
import org.elasticsearch.threadpool.ThreadPool;
22+
import org.elasticsearch.transport.Transports;
1923

24+
import java.util.Objects;
25+
import java.util.concurrent.CancellationException;
26+
import java.util.concurrent.ExecutionException;
2027
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.TimeoutException;
29+
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
2130

22-
public class PlainActionFuture<T> extends BaseFuture<T> implements ActionFuture<T>, ActionListener<T> {
31+
public class PlainActionFuture<T> implements ActionFuture<T>, ActionListener<T> {
2332

2433
public static <T> PlainActionFuture<T> newFuture() {
2534
return new PlainActionFuture<>();
2635
}
2736

37+
@Override
38+
public void onResponse(T result) {
39+
set(result);
40+
}
41+
42+
@Override
43+
public void onFailure(Exception e) {
44+
setException(e);
45+
}
46+
47+
private static final String BLOCKING_OP_REASON = "Blocking operation";
48+
49+
/**
50+
* Synchronization control.
51+
*/
52+
private final Sync<T> sync = new Sync<>();
53+
54+
/*
55+
* Improve the documentation of when InterruptedException is thrown. Our
56+
* behavior matches the JDK's, but the JDK's documentation is misleading.
57+
*/
58+
59+
/**
60+
* {@inheritDoc}
61+
* <p>
62+
* The default {@link PlainActionFuture} implementation throws {@code
63+
* InterruptedException} if the current thread is interrupted before or during
64+
* the call, even if the value is already available.
65+
*
66+
* @throws InterruptedException if the current thread was interrupted before
67+
* or during the call (optional but recommended).
68+
* @throws CancellationException {@inheritDoc}
69+
*/
70+
@Override
71+
public T get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException {
72+
assert timeout <= 0 || blockingAllowed();
73+
return sync.get(unit.toNanos(timeout));
74+
}
75+
76+
/*
77+
* Improve the documentation of when InterruptedException is thrown. Our
78+
* behavior matches the JDK's, but the JDK's documentation is misleading.
79+
*/
80+
81+
/**
82+
* {@inheritDoc}
83+
* <p>
84+
* The default {@link PlainActionFuture<Object>} implementation throws {@code
85+
* InterruptedException} if the current thread is interrupted before or during
86+
* the call, even if the value is already available.
87+
*
88+
* @throws InterruptedException if the current thread was interrupted before
89+
* or during the call (optional but recommended).
90+
* @throws CancellationException {@inheritDoc}
91+
*/
92+
@Override
93+
public T get() throws InterruptedException, ExecutionException {
94+
assert blockingAllowed();
95+
return sync.get();
96+
}
97+
98+
// protected so that it can be overridden in specific instances
99+
protected boolean blockingAllowed() {
100+
return Transports.assertNotTransportThread(BLOCKING_OP_REASON)
101+
&& ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON)
102+
&& ClusterApplierService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON)
103+
&& MasterService.assertNotMasterUpdateThread(BLOCKING_OP_REASON);
104+
}
105+
106+
@Override
107+
public boolean isDone() {
108+
return sync.isDone();
109+
}
110+
111+
@Override
112+
public boolean isCancelled() {
113+
return sync.isCancelled();
114+
}
115+
116+
@Override
117+
public boolean cancel(boolean mayInterruptIfRunning) {
118+
if (sync.cancel() == false) {
119+
return false;
120+
}
121+
done(false);
122+
if (mayInterruptIfRunning) {
123+
interruptTask();
124+
}
125+
return true;
126+
}
127+
128+
/**
129+
* Subclasses can override this method to implement interruption of the
130+
* future's computation. The method is invoked automatically by a successful
131+
* call to {@link #cancel(boolean) cancel(true)}.
132+
* <p>
133+
* The default implementation does nothing.
134+
*
135+
* @since 10.0
136+
*/
137+
protected void interruptTask() {}
138+
139+
/**
140+
* Subclasses should invoke this method to set the result of the computation
141+
* to {@code value}. This will set the state of the future to
142+
* {@link PlainActionFuture.Sync#COMPLETED} and call {@link #done(boolean)} if the
143+
* state was successfully changed.
144+
*
145+
* @param value the value that was the result of the task.
146+
* @return true if the state was successfully changed.
147+
*/
148+
protected boolean set(@Nullable T value) {
149+
boolean result = sync.set(value);
150+
if (result) {
151+
done(true);
152+
}
153+
return result;
154+
}
155+
156+
/**
157+
* Subclasses should invoke this method to set the result of the computation
158+
* to an error, {@code throwable}. This will set the state of the future to
159+
* {@link PlainActionFuture.Sync#COMPLETED} and call {@link #done(boolean)} if the
160+
* state was successfully changed.
161+
*
162+
* @param throwable the exception that the task failed with.
163+
* @return true if the state was successfully changed.
164+
* @throws Error if the throwable was an {@link Error}.
165+
*/
166+
protected boolean setException(Throwable throwable) {
167+
boolean result = sync.setException(Objects.requireNonNull(throwable));
168+
if (result) {
169+
done(false);
170+
}
171+
172+
// If it's an Error, we want to make sure it reaches the top of the
173+
// call stack, so we rethrow it.
174+
175+
// we want to notify the listeners we have with errors as well, as it breaks
176+
// how we work in ES in terms of using assertions
177+
// if (throwable instanceof Error) {
178+
// throw (Error) throwable;
179+
// }
180+
return result;
181+
}
182+
183+
/**
184+
* Called when the {@link PlainActionFuture<Object>} is completed. The {@code success} boolean indicates if the {@link
185+
* PlainActionFuture<Object>} was successfully completed (the value is {@code true}). In the cases the {@link PlainActionFuture<Object>}
186+
* was completed with an error or cancelled the value is {@code false}.
187+
*
188+
* @param success indicates if the {@link PlainActionFuture<Object>} was completed with success (true); in other cases it equals false
189+
*/
190+
protected void done(boolean success) {}
191+
28192
@Override
29193
public T actionGet() {
30194
try {
@@ -58,14 +222,165 @@ public T actionGet(long timeout, TimeUnit unit) {
58222
}
59223
}
60224

61-
@Override
62-
public void onResponse(T result) {
63-
set(result);
64-
}
225+
/**
226+
* <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
227+
* private subclass to hold the synchronizer. This synchronizer is used to
228+
* implement the blocking and waiting calls as well as to handle state changes
229+
* in a thread-safe manner. The current state of the future is held in the
230+
* Sync state, and the lock is released whenever the state changes to either
231+
* {@link #COMPLETED} or {@link #CANCELLED}.
232+
* <p>
233+
* To avoid races between threads doing release and acquire, we transition
234+
* to the final state in two steps. One thread will successfully CAS from
235+
* RUNNING to COMPLETING, that thread will then set the result of the
236+
* computation, and only then transition to COMPLETED or CANCELLED.
237+
* <p>
238+
* We don't use the integer argument passed between acquire methods so we
239+
* pass around a -1 everywhere.
240+
*/
241+
static final class Sync<V> extends AbstractQueuedSynchronizer {
242+
/* Valid states. */
243+
static final int RUNNING = 0;
244+
static final int COMPLETING = 1;
245+
static final int COMPLETED = 2;
246+
static final int CANCELLED = 4;
65247

66-
@Override
67-
public void onFailure(Exception e) {
68-
setException(e);
248+
private V value;
249+
private Throwable exception;
250+
251+
/*
252+
* Acquisition succeeds if the future is done, otherwise it fails.
253+
*/
254+
@Override
255+
protected int tryAcquireShared(int ignored) {
256+
if (isDone()) {
257+
return 1;
258+
}
259+
return -1;
260+
}
261+
262+
/*
263+
* We always allow a release to go through, this means the state has been
264+
* successfully changed and the result is available.
265+
*/
266+
@Override
267+
protected boolean tryReleaseShared(int finalState) {
268+
setState(finalState);
269+
return true;
270+
}
271+
272+
/**
273+
* Blocks until the task is complete or the timeout expires. Throws a
274+
* {@link TimeoutException} if the timer expires, otherwise behaves like
275+
* {@link #get()}.
276+
*/
277+
V get(long nanos) throws TimeoutException, CancellationException, ExecutionException, InterruptedException {
278+
279+
// Attempt to acquire the shared lock with a timeout.
280+
if (tryAcquireSharedNanos(-1, nanos) == false) {
281+
throw new TimeoutException("Timeout waiting for task.");
282+
}
283+
284+
return getValue();
285+
}
286+
287+
/**
288+
* Blocks until {@link #complete(Object, Throwable, int)} has been
289+
* successfully called. Throws a {@link CancellationException} if the task
290+
* was cancelled, or a {@link ExecutionException} if the task completed with
291+
* an error.
292+
*/
293+
V get() throws CancellationException, ExecutionException, InterruptedException {
294+
295+
// Acquire the shared lock allowing interruption.
296+
acquireSharedInterruptibly(-1);
297+
return getValue();
298+
}
299+
300+
/**
301+
* Implementation of the actual value retrieval. Will return the value
302+
* on success, an exception on failure, a cancellation on cancellation, or
303+
* an illegal state if the synchronizer is in an invalid state.
304+
*/
305+
private V getValue() throws CancellationException, ExecutionException {
306+
int state = getState();
307+
switch (state) {
308+
case COMPLETED:
309+
if (exception != null) {
310+
throw new ExecutionException(exception);
311+
} else {
312+
return value;
313+
}
314+
315+
case CANCELLED:
316+
throw new CancellationException("Task was cancelled.");
317+
318+
default:
319+
throw new IllegalStateException("Error, synchronizer in invalid state: " + state);
320+
}
321+
}
322+
323+
/**
324+
* Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
325+
*/
326+
boolean isDone() {
327+
return (getState() & (COMPLETED | CANCELLED)) != 0;
328+
}
329+
330+
/**
331+
* Checks if the state is {@link #CANCELLED}.
332+
*/
333+
boolean isCancelled() {
334+
return getState() == CANCELLED;
335+
}
336+
337+
/**
338+
* Transition to the COMPLETED state and set the value.
339+
*/
340+
boolean set(@Nullable V v) {
341+
return complete(v, null, COMPLETED);
342+
}
343+
344+
/**
345+
* Transition to the COMPLETED state and set the exception.
346+
*/
347+
boolean setException(Throwable t) {
348+
return complete(null, t, COMPLETED);
349+
}
350+
351+
/**
352+
* Transition to the CANCELLED state.
353+
*/
354+
boolean cancel() {
355+
return complete(null, null, CANCELLED);
356+
}
357+
358+
/**
359+
* Implementation of completing a task. Either {@code v} or {@code t} will
360+
* be set but not both. The {@code finalState} is the state to change to
361+
* from {@link #RUNNING}. If the state is not in the RUNNING state we
362+
* return {@code false} after waiting for the state to be set to a valid
363+
* final state ({@link #COMPLETED} or {@link #CANCELLED}).
364+
*
365+
* @param v the value to set as the result of the computation.
366+
* @param t the exception to set as the result of the computation.
367+
* @param finalState the state to transition to.
368+
*/
369+
private boolean complete(@Nullable V v, @Nullable Throwable t, int finalState) {
370+
boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
371+
if (doCompletion) {
372+
// If this thread successfully transitioned to COMPLETING, set the value
373+
// and exception and then release to the final state.
374+
this.value = v;
375+
this.exception = t;
376+
releaseShared(finalState);
377+
} else if (getState() == COMPLETING) {
378+
// If some other thread is currently completing the future, block until
379+
// they are done so we can guarantee completion.
380+
acquireShared(-1);
381+
}
382+
return doCompletion;
383+
}
69384
}
70385

71386
private static RuntimeException unwrapEsException(ElasticsearchException esEx) {

0 commit comments

Comments
 (0)