Skip to content

Commit 0925808

Browse files
committed
Extract utility
1 parent 3896224 commit 0925808

File tree

3 files changed

+32
-12
lines changed

3 files changed

+32
-12
lines changed

libs/core/src/main/java/org/elasticsearch/core/Predicates.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
package org.elasticsearch.core;
1111

12+
import java.util.concurrent.atomic.AtomicBoolean;
13+
import java.util.function.BooleanSupplier;
1214
import java.util.function.Predicate;
1315

1416
/**
@@ -90,4 +92,22 @@ public static <T> Predicate<T> always() {
9092
public static <T> Predicate<T> never() {
9193
return (Predicate<T>) NEVER;
9294
}
95+
96+
private static class OnceTrue extends AtomicBoolean implements BooleanSupplier {
97+
OnceTrue() {
98+
super(true);
99+
}
100+
101+
@Override
102+
public boolean getAsBoolean() {
103+
return getAndSet(false);
104+
}
105+
}
106+
107+
/**
108+
* @return a {@link BooleanSupplier} which supplies {@code true} the first time it is called, and {@code false} subsequently.
109+
*/
110+
public static BooleanSupplier once() {
111+
return new OnceTrue();
112+
}
93113
}

server/src/main/java/org/elasticsearch/action/support/local/TransportLocalClusterStateAction.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.cluster.service.ClusterService;
2424
import org.elasticsearch.common.util.concurrent.EsExecutors;
2525
import org.elasticsearch.core.FixForMultiProject;
26+
import org.elasticsearch.core.Predicates;
2627
import org.elasticsearch.core.TimeValue;
2728
import org.elasticsearch.node.NodeClosedException;
2829
import org.elasticsearch.tasks.CancellableTask;
@@ -31,7 +32,6 @@
3132
import org.elasticsearch.tasks.TaskManager;
3233

3334
import java.util.concurrent.Executor;
34-
import java.util.concurrent.atomic.AtomicBoolean;
3535

3636
import static org.elasticsearch.common.Strings.format;
3737

@@ -108,10 +108,10 @@ private void waitForClusterUnblock(
108108
);
109109
// We track whether we already notified the listener or started executing the action, to avoid invoking the listener twice.
110110
// Because of that second part, we can not use ActionListener#notifyOnce.
111-
final var waitComplete = new AtomicBoolean(false);
111+
final var waitComplete = Predicates.once();
112112
if (task instanceof CancellableTask cancellableTask) {
113113
cancellableTask.addListener(() -> {
114-
if (waitComplete.compareAndSet(false, true) == false) {
114+
if (waitComplete.getAsBoolean() == false) {
115115
return;
116116
}
117117
listener.onFailure(new TaskCancelledException("Task was cancelled"));
@@ -121,7 +121,7 @@ private void waitForClusterUnblock(
121121
observer.waitForNextChange(new ClusterStateObserver.Listener() {
122122
@Override
123123
public void onNewClusterState(ClusterState state) {
124-
if (waitComplete.compareAndSet(false, true) == false) {
124+
if (waitComplete.getAsBoolean() == false) {
125125
return;
126126
}
127127
logger.trace("retrying with cluster state version [{}]", state.version());
@@ -130,15 +130,15 @@ public void onNewClusterState(ClusterState state) {
130130

131131
@Override
132132
public void onClusterServiceClose() {
133-
if (waitComplete.compareAndSet(false, true) == false) {
133+
if (waitComplete.getAsBoolean() == false) {
134134
return;
135135
}
136136
listener.onFailure(new NodeClosedException(clusterService.localNode()));
137137
}
138138

139139
@Override
140140
public void onTimeout(TimeValue timeout) {
141-
if (waitComplete.compareAndSet(false, true) == false) {
141+
if (waitComplete.getAsBoolean() == false) {
142142
return;
143143
}
144144
logger.debug(

server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.io.stream.Writeable;
3131
import org.elasticsearch.common.util.concurrent.EsExecutors;
3232
import org.elasticsearch.core.FixForMultiProject;
33+
import org.elasticsearch.core.Predicates;
3334
import org.elasticsearch.core.TimeValue;
3435
import org.elasticsearch.discovery.MasterNotDiscoveredException;
3536
import org.elasticsearch.gateway.GatewayService;
@@ -47,7 +48,6 @@
4748

4849
import java.util.Optional;
4950
import java.util.concurrent.Executor;
50-
import java.util.concurrent.atomic.AtomicBoolean;
5151
import java.util.function.Predicate;
5252

5353
import static org.elasticsearch.core.Strings.format;
@@ -307,10 +307,10 @@ private void retry(long currentStateVersion, final Throwable failure, final Pred
307307
}
308308
// We track whether we already notified the listener or started executing the action, to avoid invoking the listener twice.
309309
// Because of that second part, we can not use ActionListener#notifyOnce.
310-
final var waitComplete = new AtomicBoolean(false);
310+
final var waitComplete = Predicates.once();
311311
if (task instanceof CancellableTask cancellableTask) {
312312
cancellableTask.addListener(() -> {
313-
if (waitComplete.compareAndSet(false, true) == false) {
313+
if (waitComplete.getAsBoolean() == false) {
314314
return;
315315
}
316316
listener.onFailure(new TaskCancelledException("Task was cancelled"));
@@ -320,7 +320,7 @@ private void retry(long currentStateVersion, final Throwable failure, final Pred
320320
observer.waitForNextChange(new ClusterStateObserver.Listener() {
321321
@Override
322322
public void onNewClusterState(ClusterState state) {
323-
if (waitComplete.compareAndSet(false, true) == false) {
323+
if (waitComplete.getAsBoolean() == false) {
324324
return;
325325
}
326326
logger.trace("retrying with cluster state version [{}]", state.version());
@@ -329,15 +329,15 @@ public void onNewClusterState(ClusterState state) {
329329

330330
@Override
331331
public void onClusterServiceClose() {
332-
if (waitComplete.compareAndSet(false, true) == false) {
332+
if (waitComplete.getAsBoolean() == false) {
333333
return;
334334
}
335335
listener.onFailure(new NodeClosedException(clusterService.localNode()));
336336
}
337337

338338
@Override
339339
public void onTimeout(TimeValue timeout) {
340-
if (waitComplete.compareAndSet(false, true) == false) {
340+
if (waitComplete.getAsBoolean() == false) {
341341
return;
342342
}
343343
logger.debug(() -> format("timed out while retrying [%s] after failure (timeout [%s])", actionName, timeout), failure);

0 commit comments

Comments
 (0)