1212from ..packages .hyperframe .frame import (
1313 FRAMES , DataFrame , HeadersFrame , PushPromiseFrame , RstStreamFrame ,
1414 SettingsFrame , Frame , WindowUpdateFrame , GoAwayFrame , PingFrame ,
15- BlockedFrame
15+ BlockedFrame , FRAME_MAX_LEN , FRAME_MAX_ALLOWED_LEN
1616)
1717from ..packages .hpack .hpack_compat import Encoder , Decoder
1818from .stream import Stream
@@ -113,9 +113,13 @@ def __init_state(self):
113113 # Streams are stored in a dictionary keyed off their stream IDs. We
114114 # also save the most recent one for easy access without having to walk
115115 # the dictionary.
116+ # Finally, we add a set of all streams that we or the remote party
117+ # forcefully closed with RST_STREAM, to avoid encountering issues where
118+ # frames were already in flight before the RST was processed.
116119 self .streams = {}
117120 self .recent_stream = None
118121 self .next_stream_id = 1
122+ self .reset_streams = set ()
119123
120124 # Header encoding/decoding is at the connection scope, so we embed a
121125 # header encoder and a decoder. These get passed to child stream
@@ -126,6 +130,7 @@ def __init_state(self):
126130 # Values for the settings used on an HTTP/2 connection.
127131 self ._settings = {
128132 SettingsFrame .INITIAL_WINDOW_SIZE : 65535 ,
133+ SettingsFrame .SETTINGS_MAX_FRAME_SIZE : FRAME_MAX_LEN ,
129134 }
130135
131136 # The socket used to send data.
@@ -250,16 +255,17 @@ def _send_preamble(self):
250255 # The server will also send an initial settings frame, so get it.
251256 self ._recv_cb ()
252257
253- def close (self ):
258+ def close (self , error_code = None ):
254259 """
255260 Close the connection to the server.
256261
262+ :param error_code: (optional) The error code to reset all streams with.
257263 :returns: Nothing.
258264 """
259265 # Close all streams
260266 for stream in list (self .streams .values ()):
261267 log .debug ("Close stream %d" % stream .stream_id )
262- stream .close ()
268+ stream .close (error_code )
263269
264270 # Send GoAway frame to the server
265271 try :
@@ -391,10 +397,14 @@ def receive_frame(self, frame):
391397 if 'ACK' not in frame .flags :
392398 self ._update_settings (frame )
393399
394- # Need to return an ack.
395- f = SettingsFrame (0 )
396- f .flags .add ('ACK' )
397- self ._send_cb (f )
400+ # When the setting containing the max frame size value is out
401+ # of range, the spec dictates to tear down the connection.
402+ # Therefore we make sure the socket is still alive before
403+ # returning the ack.
404+ if self ._sock is not None :
405+ f = SettingsFrame (0 )
406+ f .flags .add ('ACK' )
407+ self ._send_cb (f )
398408 elif frame .type == GoAwayFrame .type :
399409 # If we get GoAway with error code zero, we are doing a graceful
400410 # shutdown and all is well. Otherwise, throw an exception.
@@ -404,13 +414,19 @@ def receive_frame(self, frame):
404414 # code registry otherwise use the frame's additional data.
405415 if frame .error_code != 0 :
406416 try :
407- name , number , description = errors .get_data (frame .error_code )
417+ name , number , description = errors .get_data (
418+ frame .error_code
419+ )
408420 except ValueError :
409- error_string = ("Encountered error code %d, extra data %s" %
410- (frame .error_code , frame .additional_data ))
421+ error_string = (
422+ "Encountered error code %d, extra data %s" %
423+ (frame .error_code , frame .additional_data )
424+ )
411425 else :
412- error_string = ("Encountered error %s %s: %s" %
413- (name , number , description ))
426+ error_string = (
427+ "Encountered error %s %s: %s" %
428+ (name , number , description )
429+ )
414430
415431 raise ConnectionError (error_string )
416432
@@ -452,6 +468,25 @@ def _update_settings(self, frame):
452468
453469 self ._settings [SettingsFrame .INITIAL_WINDOW_SIZE ] = newsize
454470
471+ if SettingsFrame .SETTINGS_MAX_FRAME_SIZE in frame .settings :
472+ new_size = frame .settings [SettingsFrame .SETTINGS_MAX_FRAME_SIZE ]
473+ if FRAME_MAX_LEN <= new_size <= FRAME_MAX_ALLOWED_LEN :
474+ self ._settings [SettingsFrame .SETTINGS_MAX_FRAME_SIZE ] = (
475+ new_size
476+ )
477+ else :
478+ log .warning (
479+ "Frame size %d is outside of allowed range" ,
480+ new_size
481+ )
482+
483+ # Tear the connection down with error code PROTOCOL_ERROR
484+ self .close (1 )
485+ error_string = (
486+ "Advertised frame size %d is outside of range" % (new_size )
487+ )
488+ raise ConnectionError (error_string )
489+
455490 def _new_stream (self , stream_id = None , local_closed = False ):
456491 """
457492 Returns a new stream object for this connection.
@@ -472,12 +507,18 @@ def _close_stream(self, stream_id, error_code=None):
472507 """
473508 Called by a stream when it would like to be 'closed'.
474509 """
475- if error_code is not None :
476- f = RstStreamFrame (stream_id )
477- f .error_code = error_code
478- self ._send_cb (f )
479-
480- del self .streams [stream_id ]
510+ # Graceful shutdown of streams involves not emitting an error code
511+ # at all.
512+ if error_code :
513+ self ._send_rst_frame (stream_id , error_code )
514+ else :
515+ # Just delete the stream.
516+ try :
517+ del self .streams [stream_id ]
518+ except KeyError as e : # pragma: no cover
519+ log .warn (
520+ "Stream with id %d does not exist: %s" ,
521+ stream_id , e )
481522
482523 def _send_cb (self , frame , tolerate_peer_gone = False ):
483524 """
@@ -498,6 +539,14 @@ def _send_cb(self, frame, tolerate_peer_gone=False):
498539
499540 data = frame .serialize ()
500541
542+ max_frame_size = self ._settings [SettingsFrame .SETTINGS_MAX_FRAME_SIZE ]
543+ if frame .body_len > max_frame_size :
544+ raise ValueError (
545+ "Frame size %d exceeds maximum frame size setting %d" %
546+ (frame .body_len ,
547+ self ._settings [SettingsFrame .SETTINGS_MAX_FRAME_SIZE ])
548+ )
549+
501550 log .info (
502551 "Sending frame %s on stream %d" ,
503552 frame .__class__ .__name__ ,
@@ -507,7 +556,8 @@ def _send_cb(self, frame, tolerate_peer_gone=False):
507556 try :
508557 self ._sock .send (data )
509558 except socket .error as e :
510- if not tolerate_peer_gone or e .errno not in (errno .EPIPE , errno .ECONNRESET ):
559+ if (not tolerate_peer_gone or
560+ e .errno not in (errno .EPIPE , errno .ECONNRESET )):
511561 raise
512562
513563 def _adjust_receive_window (self , frame_len ):
@@ -538,6 +588,15 @@ def _consume_single_frame(self):
538588 # Parse the header. We can use the returned memoryview directly here.
539589 frame , length = Frame .parse_frame_header (header )
540590
591+ if (length > FRAME_MAX_LEN ):
592+ log .warning (
593+ "Frame size exceeded on stream %d (received: %d, max: %d)" ,
594+ frame .stream_id ,
595+ length ,
596+ FRAME_MAX_LEN
597+ )
598+ self ._send_rst_frame (frame .stream_id , 6 ) # 6 = FRAME_SIZE_ERROR
599+
541600 # Read the remaining data from the socket.
542601 data = self ._recv_payload (length )
543602 self ._consume_frame_payload (frame , data )
@@ -598,9 +657,14 @@ def _consume_frame_payload(self, frame, data):
598657 # the ENABLE_PUSH setting is 0, but the spec leaves the client
599658 # action undefined when they do it anyway. So we just refuse
600659 # the stream and go about our business.
601- f = RstStreamFrame (frame .promised_stream_id )
602- f .error_code = 7 # REFUSED_STREAM
603- self ._send_cb (f )
660+ self ._send_rst_frame (frame .promised_stream_id , 7 )
661+
662+ # If this frame was received on a stream that has been reset, drop it.
663+ if frame .stream_id in self .reset_streams :
664+ log .info (
665+ "Stream %s has been reset, dropping frame." , frame .stream_id
666+ )
667+ return
604668
605669 # Work out to whom this frame should go.
606670 if frame .stream_id != 0 :
@@ -609,12 +673,15 @@ def _consume_frame_payload(self, frame, data):
609673 except KeyError :
610674 # If we receive an unexpected stream identifier then we
611675 # cancel the stream with an error of type PROTOCOL_ERROR
612- f = RstStreamFrame (frame .stream_id )
613- f .error_code = 1 # PROTOCOL_ERROR
614- self ._send_cb (f )
676+ self ._send_rst_frame (frame .stream_id , 1 )
615677 log .warning (
616678 "Unexpected stream identifier %d" % (frame .stream_id )
617679 )
680+
681+ # If this is a RST_STREAM frame, we may get more than one (because
682+ # of frames in flight). Keep track.
683+ if frame .type == RstStreamFrame .type :
684+ self .reset_streams .add (frame .stream_id )
618685 else :
619686 self .receive_frame (frame )
620687
@@ -640,6 +707,24 @@ def _recv_cb(self):
640707 except ConnectionResetError :
641708 break
642709
710+ def _send_rst_frame (self , stream_id , error_code ):
711+ """
712+ Send reset stream frame with error code and remove stream from map.
713+ """
714+ f = RstStreamFrame (stream_id )
715+ f .error_code = error_code
716+ self ._send_cb (f )
717+
718+ try :
719+ del self .streams [stream_id ]
720+ except KeyError as e : # pragma: no cover
721+ log .warn (
722+ "Stream with id %d does not exist: %s" ,
723+ stream_id , e )
724+
725+ # Keep track of the fact that we reset this stream in case there are
726+ # other frames in flight.
727+ self .reset_streams .add (stream_id )
643728
644729 # The following two methods are the implementation of the context manager
645730 # protocol.
0 commit comments