Skip to content

Commit a40e758

Browse files
christianaistleitnertolbertam
authored andcommitted
Fix uncaught exception during graceful channel shutdown
after exceeding max orphan ids patch by Christian Aistleitner; reviewed by Andy Tolbert, and Bret McGuire for #1938
1 parent 8444c79 commit a40e758

File tree

2 files changed

+64
-3
lines changed

2 files changed

+64
-3
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/channel/InFlightHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,14 +199,14 @@ private void startGracefulShutdown(ChannelHandlerContext ctx) {
199199
LOG.debug("[{}] No pending queries, completing graceful shutdown now", logPrefix);
200200
ctx.channel().close();
201201
} else {
202-
// remove heartbeat handler from pipeline if present.
202+
// Remove heartbeat handler from pipeline if present.
203203
ChannelHandler heartbeatHandler = ctx.pipeline().get(ChannelFactory.HEARTBEAT_HANDLER_NAME);
204204
if (heartbeatHandler != null) {
205205
ctx.pipeline().remove(heartbeatHandler);
206206
}
207207
LOG.debug("[{}] There are pending queries, delaying graceful shutdown", logPrefix);
208208
closingGracefully = true;
209-
closeStartedFuture.setSuccess();
209+
closeStartedFuture.trySuccess();
210210
}
211211
}
212212

core/src/test/java/com/datastax/oss/driver/internal/core/channel/InFlightHandlerTest.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@
3939
import io.netty.channel.ChannelFuture;
4040
import io.netty.channel.ChannelPromise;
4141
import java.net.InetSocketAddress;
42+
import java.util.ArrayList;
4243
import java.util.Collections;
44+
import java.util.List;
4345
import java.util.concurrent.TimeUnit;
4446
import org.junit.Before;
4547
import org.junit.Test;
@@ -256,7 +258,7 @@ public void should_refuse_new_writes_during_graceful_close() {
256258
}
257259

258260
@Test
259-
public void should_close_gracefully_if_orphan_ids_above_max_and_pending_requests() {
261+
public void should_close_gracefully_if_orphan_ids_above_max_and_pending_request() {
260262
// Given
261263
addToPipeline();
262264
// Generate n orphan ids by writing and cancelling the requests:
@@ -311,6 +313,65 @@ public void should_close_gracefully_if_orphan_ids_above_max_and_pending_requests
311313
assertThat(channel.closeFuture()).isSuccess();
312314
}
313315

316+
@Test
317+
public void should_close_gracefully_if_orphan_ids_above_max_and_multiple_pending_requests() {
318+
// Given
319+
addToPipeline();
320+
// Generate n orphan ids by writing and cancelling the requests.
321+
for (int i = 0; i < MAX_ORPHAN_IDS; i++) {
322+
when(streamIds.acquire()).thenReturn(i);
323+
MockResponseCallback responseCallback = new MockResponseCallback();
324+
channel
325+
.writeAndFlush(
326+
new DriverChannel.RequestMessage(QUERY, false, Frame.NO_PAYLOAD, responseCallback))
327+
.awaitUninterruptibly();
328+
channel.writeAndFlush(responseCallback).awaitUninterruptibly();
329+
}
330+
// Generate 3 additional requests that are pending and not cancelled.
331+
List<MockResponseCallback> pendingResponseCallbacks = new ArrayList<>();
332+
for (int i = 0; i < 3; i++) {
333+
when(streamIds.acquire()).thenReturn(MAX_ORPHAN_IDS + i);
334+
MockResponseCallback responseCallback = new MockResponseCallback();
335+
channel
336+
.writeAndFlush(
337+
new DriverChannel.RequestMessage(QUERY, false, Frame.NO_PAYLOAD, responseCallback))
338+
.awaitUninterruptibly();
339+
pendingResponseCallbacks.add(responseCallback);
340+
}
341+
342+
// When
343+
// Generate the n+1th orphan id that makes us go above the threshold by canceling one if the
344+
// pending requests.
345+
channel.writeAndFlush(pendingResponseCallbacks.remove(0)).awaitUninterruptibly();
346+
347+
// Then
348+
// Channel should be closing gracefully but there's no way to observe that from the outside
349+
// besides writing another request and check that it's rejected.
350+
assertThat(channel.closeFuture()).isNotDone();
351+
ChannelFuture otherWriteFuture =
352+
channel.writeAndFlush(
353+
new DriverChannel.RequestMessage(
354+
QUERY, false, Frame.NO_PAYLOAD, new MockResponseCallback()));
355+
assertThat(otherWriteFuture).isFailed();
356+
assertThat(otherWriteFuture.cause())
357+
.isInstanceOf(IllegalStateException.class)
358+
.hasMessage("Channel is closing");
359+
360+
// When
361+
// Cancel the remaining pending requests causing the n+ith orphan ids above the threshold.
362+
for (MockResponseCallback pendingResponseCallback : pendingResponseCallbacks) {
363+
ChannelFuture future = channel.writeAndFlush(pendingResponseCallback).awaitUninterruptibly();
364+
365+
// Then
366+
// The future should succeed even though the channel has started closing gracefully.
367+
assertThat(future).isSuccess();
368+
}
369+
370+
// Then
371+
// The graceful shutdown completes.
372+
assertThat(channel.closeFuture()).isSuccess();
373+
}
374+
314375
@Test
315376
public void should_close_immediately_if_orphan_ids_above_max_and_no_pending_requests() {
316377
// Given

0 commit comments

Comments
 (0)