Skip to content
This repository was archived by the owner on Jan 13, 2021. It is now read-only.

Commit dbb6901

Browse files
committed
Merge pull request #150 from Lukasa/issue/146
Avoid issuing too many RST_STREAM frames
2 parents 1b5444b + ef9b7f2 commit dbb6901

File tree

4 files changed

+129
-4
lines changed

4 files changed

+129
-4
lines changed

hyper/http20/connection.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,13 @@ def __init_state(self):
110110
# Streams are stored in a dictionary keyed off their stream IDs. We
111111
# also save the most recent one for easy access without having to walk
112112
# the dictionary.
113+
# Finally, we add a set of all streams that we or the remote party
114+
# forcefully closed with RST_STREAM, to avoid encountering issues where
115+
# frames were already in flight before the RST was processed.
113116
self.streams = {}
114117
self.recent_stream = None
115118
self.next_stream_id = 1
119+
self.reset_streams = set()
116120

117121
# Header encoding/decoding is at the connection scope, so we embed a
118122
# header encoder and a decoder. These get passed to child stream
@@ -652,6 +656,13 @@ def _consume_frame_payload(self, frame, data):
652656
# the stream and go about our business.
653657
self._send_rst_frame(frame.promised_stream_id, 7)
654658

659+
# If this frame was received on a stream that has been reset, drop it.
660+
if frame.stream_id in self.reset_streams:
661+
log.info(
662+
"Stream %s has been reset, dropping frame.", frame.stream_id
663+
)
664+
return
665+
655666
# Work out to whom this frame should go.
656667
if frame.stream_id != 0:
657668
try:
@@ -663,6 +674,11 @@ def _consume_frame_payload(self, frame, data):
663674
log.warning(
664675
"Unexpected stream identifier %d" % (frame.stream_id)
665676
)
677+
678+
# If this is a RST_STREAM frame, we may get more than one (because
679+
# of frames in flight). Keep track.
680+
if frame.type == RstStreamFrame.type:
681+
self.reset_streams.add(frame.stream_id)
666682
else:
667683
self.receive_frame(frame)
668684

@@ -703,6 +719,10 @@ def _send_rst_frame(self, stream_id, error_code):
703719
"Stream with id %d does not exist: %s",
704720
stream_id, e)
705721

722+
# Keep track of the fact that we reset this stream in case there are
723+
# other frames in flight.
724+
self.reset_streams.add(stream_id)
725+
706726
# The following two methods are the implementation of the context manager
707727
# protocol.
708728
def __enter__(self):

hyper/http20/exceptions.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,10 @@ class ProtocolError(HTTP20Error):
4040
The remote party violated the HTTP/2 protocol.
4141
"""
4242
pass
43+
44+
45+
class StreamResetError(HTTP20Error):
46+
"""
47+
A stream was forcefully reset by the remote party.
48+
"""
49+
pass

hyper/http20/stream.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
from ..common.headers import HTTPHeaderMap
1717
from ..packages.hyperframe.frame import (
1818
FRAME_MAX_LEN, FRAMES, HeadersFrame, DataFrame, PushPromiseFrame,
19-
WindowUpdateFrame, ContinuationFrame, BlockedFrame
19+
WindowUpdateFrame, ContinuationFrame, BlockedFrame, RstStreamFrame
2020
)
21-
from .exceptions import ProtocolError
21+
from .exceptions import ProtocolError, StreamResetError
2222
from .util import h2_safe_headers
2323
import collections
2424
import logging
@@ -233,6 +233,9 @@ def receive_frame(self, frame):
233233
w = WindowUpdateFrame(self.stream_id)
234234
w.window_increment = increment
235235
self._data_cb(w, True)
236+
elif frame.type == RstStreamFrame.type:
237+
self.close(0)
238+
raise StreamResetError("Stream forcefully closed.")
236239
elif frame.type in FRAMES:
237240
# This frame isn't valid at this point.
238241
raise ValueError("Unexpected frame %s." % frame)

test/test_integration.py

Lines changed: 97 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
from hyper.contrib import HTTP20Adapter
1616
from hyper.packages.hyperframe.frame import (
1717
Frame, SettingsFrame, WindowUpdateFrame, DataFrame, HeadersFrame,
18-
GoAwayFrame,
18+
GoAwayFrame, RstStreamFrame
1919
)
2020
from hyper.packages.hpack.hpack import Encoder
2121
from hyper.packages.hpack.huffman import HuffmanEncoder
2222
from hyper.packages.hpack.huffman_constants import (
2323
REQUEST_CODES, REQUEST_CODES_LENGTH
2424
)
25-
from hyper.http20.exceptions import ConnectionError
25+
from hyper.http20.exceptions import ConnectionError, StreamResetError
2626
from server import SocketLevelTest
2727

2828
# Turn off certificate verification for the tests.
@@ -506,6 +506,101 @@ def socket_handler(listener):
506506

507507
self.tear_down()
508508

509+
def test_resetting_stream_with_frames_in_flight(self):
510+
"""
511+
Hyper emits only one RST_STREAM frame, despite the other frames in
512+
flight.
513+
"""
514+
self.set_up()
515+
516+
recv_event = threading.Event()
517+
518+
def socket_handler(listener):
519+
sock = listener.accept()[0]
520+
521+
# We get two messages for the connection open and then a HEADERS
522+
# frame.
523+
receive_preamble(sock)
524+
sock.recv(65535)
525+
526+
# Now, send the headers for the response. This response has no
527+
# body.
528+
f = build_headers_frame(
529+
[(':status', '204'), ('content-length', '0')]
530+
)
531+
f.flags.add('END_STREAM')
532+
f.stream_id = 1
533+
sock.send(f.serialize())
534+
535+
# Wait for the message from the main thread.
536+
recv_event.wait()
537+
sock.close()
538+
539+
self._start_server(socket_handler)
540+
conn = self.get_connection()
541+
stream_id = conn.request('GET', '/')
542+
543+
# Now, trigger the RST_STREAM frame by closing the stream.
544+
conn._send_rst_frame(stream_id, 0)
545+
546+
# Now, eat the Headers frame. This should not cause an exception.
547+
conn._recv_cb()
548+
549+
# However, attempting to get the response should.
550+
with pytest.raises(KeyError):
551+
conn.get_response(stream_id)
552+
553+
# Awesome, we're done now.
554+
recv_event.set()
555+
556+
self.tear_down()
557+
558+
def test_stream_can_be_reset_multiple_times(self):
559+
"""
560+
Confirm that hyper gracefully handles receiving multiple RST_STREAM
561+
frames.
562+
"""
563+
self.set_up()
564+
565+
recv_event = threading.Event()
566+
567+
def socket_handler(listener):
568+
sock = listener.accept()[0]
569+
570+
# We get two messages for the connection open and then a HEADERS
571+
# frame.
572+
receive_preamble(sock)
573+
sock.recv(65535)
574+
575+
# Now, send two RST_STREAM frames.
576+
for _ in range(0, 2):
577+
f = RstStreamFrame(1)
578+
sock.send(f.serialize())
579+
580+
# Wait for the message from the main thread.
581+
recv_event.wait()
582+
sock.close()
583+
584+
self._start_server(socket_handler)
585+
conn = self.get_connection()
586+
conn.request('GET', '/')
587+
588+
# Now, eat the RstStream frames. The first one throws a
589+
# StreamResetError.
590+
with pytest.raises(StreamResetError):
591+
conn._consume_single_frame()
592+
593+
# The next should throw no exception.
594+
conn._consume_single_frame()
595+
596+
assert conn.reset_streams == set([1])
597+
598+
# Awesome, we're done now.
599+
recv_event.set()
600+
601+
self.tear_down()
602+
603+
509604
class TestRequestsAdapter(SocketLevelTest):
510605
# This uses HTTP/2.
511606
h2 = True

0 commit comments

Comments
 (0)