4040import io .netty .handler .codec .http .HttpUtil ;
4141import io .netty .handler .codec .http .HttpVersion ;
4242
43+ import org .apache .http .ConnectionClosedException ;
4344import org .apache .http .HttpHost ;
4445import org .apache .lucene .util .SetOnce ;
4546import org .elasticsearch .ElasticsearchException ;
4849import org .elasticsearch .action .ActionListener ;
4950import org .elasticsearch .action .bulk .IncrementalBulkService ;
5051import org .elasticsearch .action .support .ActionTestUtils ;
52+ import org .elasticsearch .action .support .PlainActionFuture ;
5153import org .elasticsearch .action .support .SubscribableListener ;
5254import org .elasticsearch .client .Request ;
5355import org .elasticsearch .client .RestClient ;
100102import java .util .Collections ;
101103import java .util .List ;
102104import java .util .Set ;
105+ import java .util .concurrent .CancellationException ;
103106import java .util .concurrent .ConcurrentHashMap ;
104107import java .util .concurrent .CountDownLatch ;
105108import java .util .concurrent .TimeUnit ;
110113import static com .carrotsearch .randomizedtesting .RandomizedTest .getRandom ;
111114import static org .elasticsearch .http .HttpTransportSettings .SETTING_CORS_ALLOW_ORIGIN ;
112115import static org .elasticsearch .http .HttpTransportSettings .SETTING_CORS_ENABLED ;
116+ import static org .elasticsearch .http .HttpTransportSettings .SETTING_HTTP_SERVER_SHUTDOWN_GRACE_PERIOD ;
113117import static org .elasticsearch .rest .RestStatus .BAD_REQUEST ;
114118import static org .elasticsearch .rest .RestStatus .OK ;
115119import static org .elasticsearch .rest .RestStatus .UNAUTHORIZED ;
@@ -1039,8 +1043,16 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
10391043 }
10401044 }
10411045
1042- public void testRespondAfterClose () throws Exception {
1043- final String url = "/thing" ;
1046+ public void testRespondAfterServiceCloseWithClientCancel () throws Exception {
1047+ runRespondAfterServiceCloseTest (true );
1048+ }
1049+
1050+ public void testRespondAfterServiceCloseWithServerCancel () throws Exception {
1051+ runRespondAfterServiceCloseTest (false );
1052+ }
1053+
1054+ private void runRespondAfterServiceCloseTest (boolean clientCancel ) throws Exception {
1055+ final String url = "/" + randomIdentifier ();
10441056 final CountDownLatch responseReleasedLatch = new CountDownLatch (1 );
10451057 final SubscribableListener <Void > transportClosedFuture = new SubscribableListener <>();
10461058 final CountDownLatch handlingRequestLatch = new CountDownLatch (1 );
@@ -1066,7 +1078,9 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
10661078
10671079 try (
10681080 Netty4HttpServerTransport transport = new Netty4HttpServerTransport (
1069- Settings .EMPTY ,
1081+ clientCancel
1082+ ? Settings .EMPTY
1083+ : Settings .builder ().put (SETTING_HTTP_SERVER_SHUTDOWN_GRACE_PERIOD .getKey (), TimeValue .timeValueMillis (1 )).build (),
10701084 networkService ,
10711085 threadPool ,
10721086 xContentRegistry (),
@@ -1082,11 +1096,24 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
10821096 transport .start ();
10831097 final var address = randomFrom (transport .boundAddress ().boundAddresses ()).address ();
10841098 try (var client = RestClient .builder (new HttpHost (address .getAddress (), address .getPort ())).build ()) {
1085- client .performRequestAsync (new Request ("GET" , url ), ActionTestUtils .wrapAsRestResponseListener (ActionListener .noop ()));
1099+ final var responseExceptionFuture = new PlainActionFuture <Exception >();
1100+ final var cancellable = client .performRequestAsync (
1101+ new Request ("GET" , url ),
1102+ ActionTestUtils .wrapAsRestResponseListener (ActionTestUtils .assertNoSuccessListener (responseExceptionFuture ::onResponse ))
1103+ );
10861104 safeAwait (handlingRequestLatch );
1105+ if (clientCancel ) {
1106+ threadPool .generic ().execute (cancellable ::cancel );
1107+ }
10871108 transport .close ();
10881109 transportClosedFuture .onResponse (null );
10891110 safeAwait (responseReleasedLatch );
1111+ final var responseException = safeGet (responseExceptionFuture );
1112+ if (clientCancel ) {
1113+ assertThat (responseException , instanceOf (CancellationException .class ));
1114+ } else {
1115+ assertThat (responseException , instanceOf (ConnectionClosedException .class ));
1116+ }
10901117 }
10911118 }
10921119 }
0 commit comments