3030import io .grpc .InternalLogId ;
3131import io .grpc .Metadata ;
3232import io .grpc .Status ;
33- import io .grpc .Status .Code ;
3433import io .grpc .internal .AbstractServerStream ;
3534import io .grpc .internal .GrpcUtil ;
3635import io .grpc .internal .SerializingExecutor ;
4342import java .util .Collections ;
4443import java .util .HashMap ;
4544import java .util .Map ;
46- import java .util .concurrent .CountDownLatch ;
47- import java .util .concurrent .TimeUnit ;
4845import java .util .function .Supplier ;
4946import java .util .logging .Logger ;
5047import javax .annotation .Nullable ;
@@ -58,12 +55,15 @@ final class ServletServerStream extends AbstractServerStream {
5855
5956 private final ServletTransportState transportState ;
6057 private final Sink sink = new Sink ();
61- private final AsyncContext asyncCtx ;
6258 private final HttpServletResponse resp ;
6359 private final Attributes attributes ;
6460 private final String authority ;
6561 private final InternalLogId logId ;
6662 private final AsyncServletOutputStreamWriter writer ;
63+ /**
64+ * If the async servlet operation has been completed.
65+ */
66+ volatile boolean asyncCompleted = false ;
6767
6868 ServletServerStream (
6969 AsyncContext asyncCtx ,
@@ -78,7 +78,6 @@ final class ServletServerStream extends AbstractServerStream {
7878 this .attributes = attributes ;
7979 this .authority = authority ;
8080 this .logId = logId ;
81- this .asyncCtx = asyncCtx ;
8281 this .resp = (HttpServletResponse ) asyncCtx .getResponse ();
8382 this .writer = new AsyncServletOutputStreamWriter (
8483 asyncCtx , transportState , logId );
@@ -269,6 +268,12 @@ public void writeFrame(@Nullable WritableBuffer frame, boolean flush, int numMes
269268
270269 @ Override
271270 public void writeTrailers (Metadata trailers , boolean headersSent , Status status ) {
271+ if (asyncCompleted ) {
272+ if (logger .isLoggable (FINE )) {
273+ logger .log (FINE , "[{0}] ignore writeTrailers as already completed" , new Object []{logId });
274+ }
275+ return ;
276+ }
272277 if (logger .isLoggable (FINE )) {
273278 logger .log (
274279 FINE ,
@@ -292,24 +297,16 @@ public void writeTrailers(Metadata trailers, boolean headersSent, Status status)
292297
293298 @ Override
294299 public void cancel (Status status ) {
295- if (resp .isCommitted () && Code .DEADLINE_EXCEEDED == status .getCode ()) {
296- return ; // let the servlet timeout, the container will sent RST_STREAM automatically
297- }
298300 transportState .runOnTransportThread (() -> transportState .transportReportStatus (status ));
299- // There is no way to RST_STREAM with CANCEL code, so write trailers instead
300- close (Status .CANCELLED .withDescription ("Servlet stream cancelled" )
301- .withCause (status .asRuntimeException ()),
302- new Metadata ());
303- CountDownLatch countDownLatch = new CountDownLatch (1 );
304- transportState .runOnTransportThread (() -> {
305- asyncCtx .complete ();
306- countDownLatch .countDown ();
307- });
308- try {
309- countDownLatch .await (5 , TimeUnit .SECONDS );
310- } catch (InterruptedException e ) {
311- Thread .currentThread ().interrupt ();
301+ if (asyncCompleted ) {
302+ if (logger .isLoggable (FINE )) {
303+ logger .log (FINE , "[{0}] ignore cancel as already completed" , new Object []{logId });
304+ }
305+ return ;
312306 }
307+ // There is no way to RST_STREAM with CANCEL code, so write trailers instead
308+ close (status , new Metadata ());
309+ // close() calls writeTrailers(), which calls AsyncContext.complete()
313310 }
314311 }
315312
0 commit comments