1919import org .elasticsearch .action .search .SearchResponse ;
2020import org .elasticsearch .action .search .TransportSearchAction ;
2121import org .elasticsearch .action .support .PlainActionFuture ;
22+ import org .elasticsearch .action .support .SubscribableListener ;
2223import org .elasticsearch .client .internal .node .NodeClient ;
2324import org .elasticsearch .common .settings .Settings ;
2425import org .elasticsearch .common .util .set .Sets ;
4445import java .util .concurrent .atomic .AtomicBoolean ;
4546import java .util .concurrent .atomic .AtomicInteger ;
4647import java .util .concurrent .atomic .AtomicLong ;
47- import java .util .concurrent .atomic .AtomicReference ;
4848import java .util .function .LongSupplier ;
4949
5050public class RestCancellableNodeClientTests extends ESTestCase {
@@ -79,7 +79,9 @@ public void testCompletedTasks() throws Exception {
7979 for (int j = 0 ; j < numTasks ; j ++) {
8080 PlainActionFuture <SearchResponse > actionFuture = new PlainActionFuture <>();
8181 RestCancellableNodeClient client = new RestCancellableNodeClient (testClient , channel );
82- threadPool .generic ().submit (() -> client .execute (TransportSearchAction .TYPE , new SearchRequest (), actionFuture ));
82+ futures .add (
83+ threadPool .generic ().submit (() -> client .execute (TransportSearchAction .TYPE , new SearchRequest (), actionFuture ))
84+ );
8385 futures .add (actionFuture );
8486 }
8587 }
@@ -150,7 +152,7 @@ public void testChannelAlreadyClosed() {
150152 assertEquals (totalSearches , testClient .cancelledTasks .size ());
151153 }
152154
153- public void testConcurrentExecuteAndClose () throws Exception {
155+ public void testConcurrentExecuteAndClose () {
154156 final var testClient = new TestClient (Settings .EMPTY , threadPool , true );
155157 int initialHttpChannels = RestCancellableNodeClient .getNumChannels ();
156158 int numTasks = randomIntBetween (1 , 30 );
@@ -254,7 +256,7 @@ public String getLocalNodeId() {
254256
255257 private class TestHttpChannel implements HttpChannel {
256258 private final AtomicBoolean open = new AtomicBoolean (true );
257- private final AtomicReference <ActionListener <Void >> closeListener = new AtomicReference <>();
259+ private final SubscribableListener <ActionListener <Void >> closeListener = new SubscribableListener <>();
258260 private final CountDownLatch closeLatch = new CountDownLatch (1 );
259261
260262 @ Override
@@ -273,8 +275,7 @@ public InetSocketAddress getRemoteAddress() {
273275 @ Override
274276 public void close () {
275277 assertTrue ("HttpChannel is already closed" , open .compareAndSet (true , false ));
276- ActionListener <Void > listener = closeListener .get ();
277- if (listener != null ) {
278+ closeListener .andThenAccept (listener -> {
278279 boolean failure = randomBoolean ();
279280 threadPool .generic ().submit (() -> {
280281 if (failure ) {
@@ -284,11 +285,10 @@ public void close() {
284285 }
285286 closeLatch .countDown ();
286287 });
287- }
288+ });
288289 }
289290
290291 private void awaitClose () throws InterruptedException {
291- assertNotNull ("must set closeListener before calling awaitClose" , closeListener .get ());
292292 close ();
293293 closeLatch .await ();
294294 }
@@ -304,9 +304,8 @@ public void addCloseListener(ActionListener<Void> listener) {
304304 if (open .get () == false ) {
305305 listener .onResponse (null );
306306 } else {
307- if (closeListener .compareAndSet (null , listener ) == false ) {
308- throw new AssertionError ("close listener already set, only one is allowed!" );
309- }
307+ assertFalse ("close listener already set, only one is allowed!" , closeListener .isDone ());
308+ closeListener .onResponse (ActionListener .assertOnce (listener ));
310309 }
311310 }
312311 }
0 commit comments