5353import jdk .internal .net .http .HttpConnection .HttpPublisher ;
5454import jdk .internal .net .http .common .FlowTube ;
5555import jdk .internal .net .http .common .FlowTube .TubeSubscriber ;
56+ import jdk .internal .net .http .common .HeaderDecoder ;
5657import jdk .internal .net .http .common .HttpHeadersBuilder ;
5758import jdk .internal .net .http .common .Log ;
5859import jdk .internal .net .http .common .Logger ;
5960import jdk .internal .net .http .common .MinimalFuture ;
6061import jdk .internal .net .http .common .SequentialScheduler ;
6162import jdk .internal .net .http .common .Utils ;
63+ import jdk .internal .net .http .common .ValidatingHeadersConsumer ;
6264import jdk .internal .net .http .frame .ContinuationFrame ;
6365import jdk .internal .net .http .frame .DataFrame ;
6466import jdk .internal .net .http .frame .ErrorFrame ;
@@ -279,6 +281,7 @@ private record PushContinuationState(HeaderDecoder pushContDecoder, PushPromiseF
279281 final ConnectionWindowUpdateSender windowUpdater ;
280282 private volatile Throwable cause ;
281283 private volatile Supplier <ByteBuffer > initial ;
284+ private volatile Stream <?> initialStream ;
282285
283286 static final int DEFAULT_FRAME_SIZE = 16 * 1024 ;
284287
@@ -332,6 +335,7 @@ private Http2Connection(HttpConnection connection,
332335
333336 Stream <?> initialStream = createStream (exchange );
334337 boolean opened = initialStream .registerStream (1 , true );
338+ this .initialStream = initialStream ;
335339 if (debug .on () && !opened ) {
336340 debug .log ("Initial stream was cancelled - but connection is maintained: " +
337341 "reset frame will need to be sent later" );
@@ -765,7 +769,7 @@ void processFrame(Http2Frame frame) throws IOException {
765769 if (frame instanceof HeaderFrame ) {
766770 // always decode the headers as they may affect
767771 // connection-level HPACK decoding state
768- DecodingCallback decoder = new ValidatingHeadersConsumer ();
772+ DecodingCallback decoder = new ValidatingHeadersConsumer ():: onDecoded ;
769773 try {
770774 decodeHeaders ((HeaderFrame ) frame , decoder );
771775 } catch (UncheckedIOException e ) {
@@ -863,7 +867,7 @@ private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
863867 // decoding state
864868 assert pushContinuationState == null ;
865869 HeaderDecoder decoder = new HeaderDecoder ();
866- decodeHeaders (pp , decoder );
870+ decodeHeaders (pp , decoder :: onDecoded );
867871 int promisedStreamid = pp .getPromisedStream ();
868872 if (pp .endHeaders ()) {
869873 completePushPromise (promisedStreamid , parent , decoder .headers ());
@@ -875,7 +879,7 @@ private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
875879 private <T > void handlePushContinuation (Stream <T > parent , ContinuationFrame cf )
876880 throws IOException {
877881 var pcs = pushContinuationState ;
878- decodeHeaders (cf , pcs .pushContDecoder );
882+ decodeHeaders (cf , pcs .pushContDecoder :: onDecoded );
879883 // if all continuations are sent, set pushWithContinuation to null
880884 if (cf .endHeaders ()) {
881885 completePushPromise (pcs .pushContFrame .getPromisedStream (), parent ,
@@ -1122,6 +1126,21 @@ private void sendConnectionPreface() throws IOException {
11221126 subscriber .onNext (List .of (EMPTY_TRIGGER ));
11231127 }
11241128
1129+ /**
1130+ * Called to get the initial stream after a connection upgrade.
1131+ * If the stream was cancelled, it might no longer be in the
1132+ * stream map. Therefore - we use the initialStream field
1133+ * instead, and reset it to null after returning it.
1134+ * @param <T> the response type
1135+ * @return the initial stream created during the upgrade.
1136+ */
1137+ @ SuppressWarnings ("unchecked" )
1138+ <T > Stream <T > getInitialStream () {
1139+ var s = (Stream <T >) initialStream ;
1140+ initialStream = null ;
1141+ return s ;
1142+ }
1143+
11251144 /**
11261145 * Returns an existing Stream with given id, or null if doesn't exist
11271146 */
@@ -1439,76 +1458,6 @@ final String dbgString() {
14391458 + connection .getConnectionFlow () + ")" ;
14401459 }
14411460
1442- static class HeaderDecoder extends ValidatingHeadersConsumer {
1443-
1444- HttpHeadersBuilder headersBuilder ;
1445-
1446- HeaderDecoder () {
1447- this .headersBuilder = new HttpHeadersBuilder ();
1448- }
1449-
1450- @ Override
1451- public void onDecoded (CharSequence name , CharSequence value ) {
1452- String n = name .toString ();
1453- String v = value .toString ();
1454- super .onDecoded (n , v );
1455- headersBuilder .addHeader (n , v );
1456- }
1457-
1458- HttpHeaders headers () {
1459- return headersBuilder .build ();
1460- }
1461- }
1462-
1463- /*
1464- * Checks RFC 7540 rules (relaxed) compliance regarding pseudo-headers.
1465- */
1466- static class ValidatingHeadersConsumer implements DecodingCallback {
1467-
1468- private static final Set <String > PSEUDO_HEADERS =
1469- Set .of (":authority" , ":method" , ":path" , ":scheme" , ":status" );
1470-
1471- /** Used to check that if there are pseudo-headers, they go first */
1472- private boolean pseudoHeadersEnded ;
1473-
1474- /**
1475- * Called when END_HEADERS was received. This consumer may be invoked
1476- * again after reset() is called, but for a whole new set of headers.
1477- */
1478- void reset () {
1479- pseudoHeadersEnded = false ;
1480- }
1481-
1482- @ Override
1483- public void onDecoded (CharSequence name , CharSequence value )
1484- throws UncheckedIOException
1485- {
1486- String n = name .toString ();
1487- if (n .startsWith (":" )) {
1488- if (pseudoHeadersEnded ) {
1489- throw newException ("Unexpected pseudo-header '%s'" , n );
1490- } else if (!PSEUDO_HEADERS .contains (n )) {
1491- throw newException ("Unknown pseudo-header '%s'" , n );
1492- }
1493- } else {
1494- pseudoHeadersEnded = true ;
1495- if (!Utils .isValidName (n )) {
1496- throw newException ("Bad header name '%s'" , n );
1497- }
1498- }
1499- String v = value .toString ();
1500- if (!Utils .isValidValue (v )) {
1501- throw newException ("Bad header value '%s'" , v );
1502- }
1503- }
1504-
1505- private UncheckedIOException newException (String message , String header )
1506- {
1507- return new UncheckedIOException (
1508- new IOException (String .format (message , header )));
1509- }
1510- }
1511-
15121461 static final class ConnectionWindowUpdateSender extends WindowUpdateSender {
15131462
15141463 final int initialWindowSize ;
0 commit comments