@@ -263,6 +263,8 @@ void markPrefaceSent() {
263263 private final Decoder hpackIn ;
264264 final SettingsFrame clientSettings ;
265265 private volatile SettingsFrame serverSettings ;
266+ private record PushContinuationState (HeaderDecoder pushContDecoder , PushPromiseFrame pushContFrame ) {}
267+ private volatile PushContinuationState pushContinuationState ;
266268 private final String key ; // for HttpClientImpl.connections map
267269 private final FramesDecoder framesDecoder ;
268270 private final FramesEncoder framesEncoder = new FramesEncoder ();
@@ -773,8 +775,8 @@ void processFrame(Http2Frame frame) throws IOException {
773775 }
774776
775777 if (!(frame instanceof ResetFrame )) {
776- if (frame instanceof DataFrame ) {
777- dropDataFrame (( DataFrame ) frame );
778+ if (frame instanceof DataFrame df ) {
779+ dropDataFrame (df );
778780 }
779781 if (isServerInitiatedStream (streamid )) {
780782 if (streamid < nextPushStream ) {
@@ -791,26 +793,44 @@ void processFrame(Http2Frame frame) throws IOException {
791793 }
792794 return ;
793795 }
794- if ( frame instanceof PushPromiseFrame ) {
795- PushPromiseFrame pp = ( PushPromiseFrame ) frame ;
796- try {
797- handlePushPromise ( stream , pp );
798- } catch ( UncheckedIOException e ) {
799- protocolError ( ResetFrame . PROTOCOL_ERROR , e . getMessage ());
800- return ;
801- }
802- } else if ( frame instanceof HeaderFrame ) {
803- // decode headers (or continuation)
804- try {
805- decodeHeaders (( HeaderFrame ) frame , stream . rspHeadersConsumer ());
806- } catch ( UncheckedIOException e ) {
807- debug . log ( "Error decoding headers: " + e . getMessage (), e ) ;
808- protocolError (ResetFrame .PROTOCOL_ERROR , e . getMessage () );
796+
797+ // While push frame is not null, the only acceptable frame on this
798+ // stream is a Continuation frame
799+ if ( pushContinuationState != null ) {
800+ if ( frame instanceof ContinuationFrame cf ) {
801+ try {
802+ handlePushContinuation ( stream , cf ) ;
803+ } catch ( UncheckedIOException e ) {
804+ debug . log ( "Error handling Push Promise with Continuation: " + e . getMessage (), e );
805+ protocolError ( ErrorFrame . PROTOCOL_ERROR , e . getMessage ());
806+ return ;
807+ }
808+ } else {
809+ pushContinuationState = null ;
810+ protocolError (ErrorFrame .PROTOCOL_ERROR , "Expected a Continuation frame but received " + frame );
809811 return ;
810812 }
811- stream .incoming (frame );
812813 } else {
813- stream .incoming (frame );
814+ if (frame instanceof PushPromiseFrame pp ) {
815+ try {
816+ handlePushPromise (stream , pp );
817+ } catch (UncheckedIOException e ) {
818+ protocolError (ErrorFrame .PROTOCOL_ERROR , e .getMessage ());
819+ return ;
820+ }
821+ } else if (frame instanceof HeaderFrame hf ) {
822+ // decode headers
823+ try {
824+ decodeHeaders (hf , stream .rspHeadersConsumer ());
825+ } catch (UncheckedIOException e ) {
826+ debug .log ("Error decoding headers: " + e .getMessage (), e );
827+ protocolError (ErrorFrame .PROTOCOL_ERROR , e .getMessage ());
828+ return ;
829+ }
830+ stream .incoming (frame );
831+ } else {
832+ stream .incoming (frame );
833+ }
814834 }
815835 }
816836 }
@@ -841,11 +861,34 @@ private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
841861 {
842862 // always decode the headers as they may affect connection-level HPACK
843863 // decoding state
864+ assert pushContinuationState == null ;
844865 HeaderDecoder decoder = new HeaderDecoder ();
845866 decodeHeaders (pp , decoder );
867+ int promisedStreamid = pp .getPromisedStream ();
868+ if (pp .endHeaders ()) {
869+ completePushPromise (promisedStreamid , parent , decoder .headers ());
870+ } else {
871+ pushContinuationState = new PushContinuationState (decoder , pp );
872+ }
873+ }
874+
875+ private <T > void handlePushContinuation (Stream <T > parent , ContinuationFrame cf )
876+ throws IOException {
877+ var pcs = pushContinuationState ;
878+ decodeHeaders (cf , pcs .pushContDecoder );
879+ // if all continuations are sent, set pushWithContinuation to null
880+ if (cf .endHeaders ()) {
881+ completePushPromise (pcs .pushContFrame .getPromisedStream (), parent ,
882+ pcs .pushContDecoder .headers ());
883+ pushContinuationState = null ;
884+ }
885+ }
846886
887+ private <T > void completePushPromise (int promisedStreamid , Stream <T > parent , HttpHeaders headers )
888+ throws IOException {
889+ // Perhaps the following checks could be moved to handlePushPromise()
890+ // to reset the PushPromise stream earlier?
847891 HttpRequestImpl parentReq = parent .request ;
848- int promisedStreamid = pp .getPromisedStream ();
849892 if (promisedStreamid != nextPushStream ) {
850893 resetStream (promisedStreamid , ResetFrame .PROTOCOL_ERROR );
851894 return ;
@@ -856,7 +899,6 @@ private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
856899 nextPushStream += 2 ;
857900 }
858901
859- HttpHeaders headers = decoder .headers ();
860902 HttpRequestImpl pushReq = HttpRequestImpl .createPushRequest (parentReq , headers );
861903 Exchange <T > pushExch = new Exchange <>(pushReq , parent .exchange .multi );
862904 Stream .PushedStream <T > pushStream = createPushStream (parent , pushExch );
0 commit comments