40
40
import java .util .concurrent .ConcurrentHashMap ;
41
41
import java .util .concurrent .ConcurrentLinkedDeque ;
42
42
import java .util .concurrent .ConcurrentLinkedQueue ;
43
+ import java .util .concurrent .atomic .AtomicBoolean ;
43
44
import java .util .concurrent .atomic .AtomicInteger ;
44
45
45
46
import javax .net .ssl .SSLHandshakeException ;
@@ -556,7 +557,7 @@ public final void onTimeout(final Timeout timeout) throws HttpException, IOExcep
556
557
for (final Iterator <Map .Entry <Integer , H2Stream >> it = streamMap .entrySet ().iterator (); it .hasNext (); ) {
557
558
final Map .Entry <Integer , H2Stream > entry = it .next ();
558
559
final H2Stream stream = entry .getValue ();
559
- stream .reset (new H2StreamResetException (H2Error .NO_ERROR , "Timeout due to inactivity (" + timeout + ")" ));
560
+ stream .fail (new H2StreamResetException (H2Error .NO_ERROR , "Timeout due to inactivity (" + timeout + ")" ));
560
561
}
561
562
streamMap .clear ();
562
563
}
@@ -676,7 +677,7 @@ public final void onException(final Exception cause) {
676
677
for (final Iterator <Map .Entry <Integer , H2Stream >> it = streamMap .entrySet ().iterator (); it .hasNext (); ) {
677
678
final Map .Entry <Integer , H2Stream > entry = it .next ();
678
679
final H2Stream stream = entry .getValue ();
679
- stream .reset (cause );
680
+ stream .fail (cause );
680
681
}
681
682
streamMap .clear ();
682
683
if (!(cause instanceof ConnectionClosedException )) {
@@ -873,9 +874,8 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
873
874
throw new H2ConnectionException (H2Error .FRAME_SIZE_ERROR , "Invalid RST_STREAM frame payload" );
874
875
}
875
876
final int errorCode = payload .getInt ();
876
- stream .reset (new H2StreamResetException (errorCode , "Stream reset (" + errorCode + ")" ));
877
+ stream .fail (new H2StreamResetException (errorCode , "Stream reset (" + errorCode + ")" ));
877
878
streamMap .remove (streamId );
878
- stream .releaseResources ();
879
879
requestSessionOutput ();
880
880
}
881
881
}
@@ -1013,7 +1013,7 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
1013
1013
for (final Iterator <Map .Entry <Integer , H2Stream >> it = streamMap .entrySet ().iterator (); it .hasNext (); ) {
1014
1014
final Map .Entry <Integer , H2Stream > entry = it .next ();
1015
1015
final H2Stream stream = entry .getValue ();
1016
- stream .reset (new H2StreamResetException (errorCode , "Connection terminated by the peer (" + errorCode + ")" ));
1016
+ stream .fail (new H2StreamResetException (errorCode , "Connection terminated by the peer (" + errorCode + ")" ));
1017
1017
}
1018
1018
streamMap .clear ();
1019
1019
connState = ConnectionHandshake .SHUTDOWN ;
@@ -1599,6 +1599,7 @@ static class H2Stream {
1599
1599
private final H2StreamChannelImpl channel ;
1600
1600
private final H2StreamHandler handler ;
1601
1601
private final boolean remoteInitiated ;
1602
+ private final AtomicBoolean released ;
1602
1603
1603
1604
private H2Stream (
1604
1605
final H2StreamChannelImpl channel ,
@@ -1607,6 +1608,7 @@ private H2Stream(
1607
1608
this .channel = channel ;
1608
1609
this .handler = handler ;
1609
1610
this .remoteInitiated = remoteInitiated ;
1611
+ this .released = new AtomicBoolean ();
1610
1612
}
1611
1613
1612
1614
int getId () {
@@ -1688,15 +1690,21 @@ void produceInputCapacityUpdate() throws IOException {
1688
1690
handler .updateInputCapacity ();
1689
1691
}
1690
1692
1691
- void reset (final Exception cause ) {
1693
+ void fail (final Exception cause ) {
1692
1694
channel .setRemoteEndStream ();
1693
1695
channel .setLocalEndStream ();
1694
- handler .failed (cause );
1696
+ if (released .compareAndSet (false , true )) {
1697
+ handler .failed (cause );
1698
+ handler .releaseResources ();
1699
+ }
1695
1700
}
1696
1701
1697
1702
void localReset (final Exception cause , final int code ) throws IOException {
1698
1703
channel .localReset (code );
1699
- handler .failed (cause );
1704
+ if (released .compareAndSet (false , true )) {
1705
+ handler .failed (cause );
1706
+ handler .releaseResources ();
1707
+ }
1700
1708
}
1701
1709
1702
1710
void localReset (final Exception cause , final H2Error error ) throws IOException {
@@ -1716,17 +1724,22 @@ HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
1716
1724
}
1717
1725
1718
1726
void cancel () {
1719
- reset (new RequestNotExecutedException ());
1727
+ fail (new RequestNotExecutedException ());
1720
1728
}
1721
1729
1722
1730
boolean abort () {
1723
1731
final boolean cancelled = channel .cancel ();
1724
- handler .failed (new RequestNotExecutedException ());
1732
+ if (released .compareAndSet (false , true )) {
1733
+ handler .failed (new RequestNotExecutedException ());
1734
+ handler .releaseResources ();
1735
+ }
1725
1736
return cancelled ;
1726
1737
}
1727
1738
1728
1739
void releaseResources () {
1729
- handler .releaseResources ();
1740
+ if (released .compareAndSet (false , true )) {
1741
+ handler .releaseResources ();
1742
+ }
1730
1743
}
1731
1744
1732
1745
void appendState (final StringBuilder buf ) {
0 commit comments