3333import java .net .InetSocketAddress ;
3434import java .util .ArrayList ;
3535import java .util .Collections ;
36+ import java .util .HashSet ;
3637import java .util .List ;
3738import java .util .Set ;
3839import java .util .concurrent .CopyOnWriteArraySet ;
4344import java .util .concurrent .atomic .AtomicInteger ;
4445import java .util .concurrent .atomic .AtomicLong ;
4546import java .util .concurrent .atomic .AtomicReference ;
47+ import java .util .function .LongSupplier ;
4648
4749public class RestCancellableNodeClientTests extends ESTestCase {
4850
@@ -150,8 +152,42 @@ public void testChannelAlreadyClosed() {
150152 }
151153 }
152154
155+ public void testConcurrentExecuteAndClose () {
156+ final TestClient testClient = new TestClient (Settings .EMPTY , threadPool , true );
157+ int initialHttpChannels = RestCancellableNodeClient .getNumChannels ();
158+ int numTasks = randomIntBetween (1 , 30 );
159+ TestHttpChannel channel = new TestHttpChannel ();
160+ final CountDownLatch startLatch = new CountDownLatch (1 );
161+ final CountDownLatch doneLatch = new CountDownLatch (numTasks + 1 );
162+ final Set <TaskId > expectedTasks = new HashSet <>(numTasks );
163+ for (int j = 0 ; j < numTasks ; j ++) {
164+ RestCancellableNodeClient client = new RestCancellableNodeClient (testClient , channel );
165+ threadPool .generic ().execute (() -> {
166+ client .execute (SearchAction .INSTANCE , new SearchRequest (), ActionListener .wrap (ESTestCase ::fail ));
167+ startLatch .countDown ();
168+ doneLatch .countDown ();
169+ });
170+ expectedTasks .add (new TaskId (testClient .getLocalNodeId (), j ));
171+ }
172+ threadPool .generic ().execute (() -> {
173+ try {
174+ safeAwait (startLatch );
175+ channel .awaitClose ();
176+ } catch (InterruptedException e ) {
177+ Thread .currentThread ().interrupt ();
178+ throw new AssertionError (e );
179+ } finally {
180+ doneLatch .countDown ();
181+ }
182+ });
183+ safeAwait (doneLatch );
184+ assertEquals (initialHttpChannels , RestCancellableNodeClient .getNumChannels ());
185+ assertEquals (expectedTasks , testClient .cancelledTasks );
186+ }
187+
153188 private static class TestClient extends NodeClient {
154- private final AtomicLong counter = new AtomicLong (0 );
189+ private final LongSupplier searchTaskIdGenerator = new AtomicLong (0 )::getAndIncrement ;
190+ private final LongSupplier cancelTaskIdGenerator = new AtomicLong (1000 )::getAndIncrement ;
155191 private final Set <TaskId > cancelledTasks = new CopyOnWriteArraySet <>();
156192 private final AtomicInteger searchRequests = new AtomicInteger (0 );
157193 private final boolean timeout ;
@@ -171,7 +207,13 @@ public <Request extends ActionRequest, Response extends ActionResponse> Task exe
171207 case CancelTasksAction .NAME :
172208 CancelTasksRequest cancelTasksRequest = (CancelTasksRequest ) request ;
173209 assertTrue ("tried to cancel the same task more than once" , cancelledTasks .add (cancelTasksRequest .getTaskId ()));
174- Task task = request .createTask (counter .getAndIncrement (), "cancel_task" , action .name (), null , Collections .emptyMap ());
210+ Task task = request .createTask (
211+ cancelTaskIdGenerator .getAsLong (),
212+ "cancel_task" ,
213+ action .name (),
214+ null ,
215+ Collections .emptyMap ()
216+ );
175217 if (randomBoolean ()) {
176218 listener .onResponse (null );
177219 } else {
@@ -182,7 +224,13 @@ public <Request extends ActionRequest, Response extends ActionResponse> Task exe
182224 return task ;
183225 case SearchAction .NAME :
184226 searchRequests .incrementAndGet ();
185- Task searchTask = request .createTask (counter .getAndIncrement (), "search" , action .name (), null , Collections .emptyMap ());
227+ Task searchTask = request .createTask (
228+ searchTaskIdGenerator .getAsLong (),
229+ "search" ,
230+ action .name (),
231+ null ,
232+ Collections .emptyMap ()
233+ );
186234 if (timeout == false ) {
187235 if (rarely ()) {
188236 // make sure that search is sometimes also called from the same thread before the task is returned
@@ -193,7 +241,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> Task exe
193241 }
194242 return searchTask ;
195243 default :
196- throw new UnsupportedOperationException ( );
244+ throw new AssertionError ( "unexpected action " + action . name () );
197245 }
198246
199247 }
@@ -224,9 +272,7 @@ public InetSocketAddress getRemoteAddress() {
224272
225273 @ Override
226274 public void close () {
227- if (open .compareAndSet (true , false ) == false ) {
228- throw new IllegalStateException ("channel already closed!" );
229- }
275+ assertTrue ("HttpChannel is already closed" , open .compareAndSet (true , false ));
230276 ActionListener <Void > listener = closeListener .get ();
231277 if (listener != null ) {
232278 boolean failure = randomBoolean ();
@@ -242,6 +288,7 @@ public void close() {
242288 }
243289
244290 private void awaitClose () throws InterruptedException {
291+ assertNotNull ("must set closeListener before calling awaitClose" , closeListener .get ());
245292 close ();
246293 closeLatch .await ();
247294 }
@@ -258,7 +305,7 @@ public void addCloseListener(ActionListener<Void> listener) {
258305 listener .onResponse (null );
259306 } else {
260307 if (closeListener .compareAndSet (null , listener ) == false ) {
261- throw new IllegalStateException ("close listener already set, only one is allowed!" );
308+ throw new AssertionError ("close listener already set, only one is allowed!" );
262309 }
263310 }
264311 }
0 commit comments