Skip to content
This repository was archived by the owner on Jan 13, 2021. It is now read-only.

Commit 15760f4

Browse files
committed
Provide some functionality for closing streams.
1 parent 3beec82 commit 15760f4

File tree

5 files changed

+126
-15
lines changed

5 files changed

+126
-15
lines changed

hyper/http20/connection.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,8 +329,8 @@ def _new_stream(self):
329329
Returns a new stream object for this connection.
330330
"""
331331
s = Stream(
332-
self.next_stream_id, self._send_cb, self._recv_cb, self.encoder,
333-
self.decoder
332+
self.next_stream_id, self._send_cb, self._recv_cb,
333+
self._close_stream, self.encoder, self.decoder
334334
)
335335
s._out_flow_control_window = (
336336
self._settings[SettingsFrame.INITIAL_WINDOW_SIZE]
@@ -339,6 +339,12 @@ def _new_stream(self):
339339

340340
return s
341341

342+
def _close_stream(self, stream_id):
343+
"""
344+
Called by a stream when it would like to be 'closed'.
345+
"""
346+
del self.streams[stream_id]
347+
342348
def _send_cb(self, frame):
343349
"""
344350
This is the callback used by streams to send data on the connection.

hyper/http20/response.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,3 +155,11 @@ def fileno(self):
155155
currently not implemented.
156156
"""
157157
pass
158+
159+
def close(self):
160+
"""
161+
Close the response. In effect this closes the backing HTTP/2.0 stream.
162+
163+
:returns: Nothing.
164+
"""
165+
self._stream.close()

hyper/http20/stream.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ def __init__(self,
4848
stream_id,
4949
data_cb,
5050
recv_cb,
51+
close_cb,
5152
header_encoder,
5253
header_decoder):
5354
self.stream_id = stream_id
@@ -69,6 +70,9 @@ def __init__(self,
6970
# connection.
7071
self._recv_cb = recv_cb
7172

73+
# This is the callback to be called when the stream is closed.
74+
self._close_cb = close_cb
75+
7276
# A reference to the header encoder and decoder objects belonging to
7377
# the parent connection.
7478
self._encoder = header_encoder
@@ -230,6 +234,17 @@ def getresponse(self):
230234
# Create the HTTP response.
231235
return HTTP20Response(headers, self)
232236

237+
def close(self):
238+
"""
239+
Closes the stream. If the stream is currently open, attempts to close
240+
it as gracefully as possible.
241+
242+
:returns: Nothing.
243+
"""
244+
# Right now let's not bother with grace, let's just call close on the
245+
# connection.
246+
self._close_cb(self.stream_id)
247+
233248
def _send_chunk(self, data, final):
234249
"""
235250
Implements most of the sending logic.

test/test_hyper.py

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -860,18 +860,35 @@ def test_connection_doesnt_send_window_update_on_zero_length_data_frame(self):
860860
# No frame should have been sent on the connection.
861861
assert len(sock.queue) == 0
862862

863+
def test_streams_are_cleared_from_connections_on_close(self):
864+
# Prepare a socket so we can open a stream.
865+
sock = DummySocket()
866+
c = HTTP20Connection('www.google.com')
867+
c._sock = sock
868+
869+
# Open a request (which creates a stream)
870+
c.request('GET', '/')
871+
872+
# Close the stream.
873+
c.streams[1].close()
874+
875+
# There should be nothing left, but the next stream ID should be
876+
# unchanged.
877+
assert not c.streams
878+
assert c.next_stream_id == 3
879+
863880

864881
class TestHyperStream(object):
865882
def test_streams_have_ids(self):
866-
s = Stream(1, None, None, None, None)
883+
s = Stream(1, None, None, None, None, None)
867884
assert s.stream_id == 1
868885

869886
def test_streams_initially_have_no_headers(self):
870-
s = Stream(1, None, None, None, None)
887+
s = Stream(1, None, None, None, None, None)
871888
assert s.headers == []
872889

873890
def test_streams_can_have_headers(self):
874-
s = Stream(1, None, None, None, None)
891+
s = Stream(1, None, None, None, None, None)
875892
s.add_header("name", "value")
876893
assert s.headers == [("name", "value")]
877894

@@ -881,14 +898,14 @@ def data_callback(frame):
881898
assert frame.data == 'testkeyTestVal'
882899
assert frame.flags == set(['END_STREAM', 'END_HEADERS'])
883900

884-
s = Stream(1, data_callback, None, NullEncoder, None)
901+
s = Stream(1, data_callback, None, None, NullEncoder, None)
885902
s.add_header("TestKey", "TestVal")
886903
s.open(True)
887904

888905
assert s.state == STATE_HALF_CLOSED_LOCAL
889906

890907
def test_receiving_a_frame_queues_it(self):
891-
s = Stream(1, None, None, None, None)
908+
s = Stream(1, None, None, None, None, None)
892909
s.receive_frame(Frame(0))
893910
assert len(s._queued_frames) == 1
894911

@@ -898,7 +915,7 @@ def data_callback(frame):
898915
assert frame.data == b'Hi there!'
899916
assert frame.flags == set(['END_STREAM'])
900917

901-
s = Stream(1, data_callback, None, NullEncoder, None)
918+
s = Stream(1, data_callback, None, None, NullEncoder, None)
902919
s.state = STATE_OPEN
903920
s.send_data(BytesIO(b'Hi there!'), True)
904921

@@ -917,7 +934,7 @@ def data_callback(frame):
917934

918935
data = b'test' * (MAX_CHUNK + 1)
919936

920-
s = Stream(1, data_callback, None, NullEncoder, None)
937+
s = Stream(1, data_callback, None, None, NullEncoder, None)
921938
s.state = STATE_OPEN
922939
s.send_data(BytesIO(data), True)
923940

@@ -932,7 +949,7 @@ def data_callback(frame):
932949
assert frame.data == b'Hi there!'
933950
assert frame.flags == set(['END_STREAM'])
934951

935-
s = Stream(1, data_callback, None, NullEncoder, None)
952+
s = Stream(1, data_callback, None, None, NullEncoder, None)
936953
s.state = STATE_OPEN
937954
s.send_data(b'Hi there!', True)
938955

@@ -951,7 +968,7 @@ def data_callback(frame):
951968

952969
data = b'test' * (MAX_CHUNK + 1)
953970

954-
s = Stream(1, data_callback, None, NullEncoder, None)
971+
s = Stream(1, data_callback, None, None, NullEncoder, None)
955972
s.state = STATE_OPEN
956973
s.send_data(data, True)
957974

@@ -961,7 +978,7 @@ def data_callback(frame):
961978
assert s._out_flow_control_window == 65535 - len(data)
962979

963980
def test_windowupdate_frames_update_windows(self):
964-
s = Stream(1, None, None, None, None)
981+
s = Stream(1, None, None, None, None, None)
965982
f = WindowUpdateFrame(1)
966983
f.window_increment = 1000
967984
s.receive_frame(f)
@@ -980,7 +997,7 @@ def inner():
980997
s.receive_frame(in_frames.pop(0))
981998
return inner
982999

983-
s = Stream(1, send_cb, None, None, None)
1000+
s = Stream(1, send_cb, None, None, None, None)
9841001
s._recv_cb = recv_cb(s)
9851002
s.state = STATE_HALF_CLOSED_LOCAL
9861003

@@ -1006,7 +1023,7 @@ def inner():
10061023
s.receive_frame(in_frames.pop(0))
10071024
return inner
10081025

1009-
s = Stream(1, send_cb, None, None, None)
1026+
s = Stream(1, send_cb, None, None, None, None)
10101027
s._recv_cb = recv_cb(s)
10111028
s.state = STATE_HALF_CLOSED_LOCAL
10121029

@@ -1038,7 +1055,7 @@ def inner():
10381055
s.receive_frame(in_frames.pop(0))
10391056
return inner
10401057

1041-
s = Stream(1, send_cb, None, None, None)
1058+
s = Stream(1, send_cb, None, None, None, None)
10421059
s._recv_cb = recv_cb(s)
10431060
s.state = STATE_HALF_CLOSED_LOCAL
10441061

@@ -1099,6 +1116,13 @@ def test_response_transparently_decrypts_wrong_deflate(self):
10991116

11001117
assert resp.read() == b'this is test data'
11011118

1119+
def test_response_calls_stream_close(self):
1120+
stream = DummyStream('')
1121+
resp = HTTP20Response({':status': '200'}, stream)
1122+
resp.close()
1123+
1124+
assert stream.closed
1125+
11021126

11031127
class TestHTTP20Adapter(object):
11041128
def test_adapter_reuses_connections(self):
@@ -1131,6 +1155,13 @@ def close(self):
11311155
class DummyStream(object):
11321156
def __init__(self, data):
11331157
self.data = data
1158+
self.closed = False
11341159

11351160
def _read(self, *args, **kwargs):
11361161
return self.data
1162+
1163+
def close(self):
1164+
if not self.closed:
1165+
self.closed = True
1166+
else:
1167+
assert False

test/test_integration.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
from hyper.http20.frame import (
1616
Frame, SettingsFrame, WindowUpdateFrame, DataFrame, HeadersFrame
1717
)
18+
from hyper.http20.hpack import Encoder
19+
from hyper.http20.huffman import HuffmanEncoder
20+
from hyper.http20.huffman_constants import (
21+
RESPONSE_CODES, RESPONSE_CODES_LENGTH
22+
)
1823
from server import SocketLevelTest
1924

2025
# Turn off certificate verification for the tests.
@@ -27,6 +32,16 @@ def decode_frame(frame_data):
2732
assert 8 + length == len(frame_data)
2833
return f
2934

35+
36+
def build_headers_frame(headers):
37+
f = HeadersFrame(1)
38+
e = Encoder()
39+
e.huffman_coder = HuffmanEncoder(RESPONSE_CODES, RESPONSE_CODES_LENGTH)
40+
f.data = e.encode(headers)
41+
f.flags.add('END_HEADERS')
42+
return f
43+
44+
3045
class TestHyperIntegration(SocketLevelTest):
3146
def test_connection_string(self):
3247
self.set_up()
@@ -200,6 +215,42 @@ def socket_handler(listener):
200215

201216
self.tear_down()
202217

218+
def test_closed_responses_remove_their_streams_from_conn(self):
219+
self.set_up()
220+
221+
recv_event = threading.Event()
222+
223+
def socket_handler(listener):
224+
sock = listener.accept()[0]
225+
226+
# We're going to get the two messages for the connection open, then
227+
# a headers frame.
228+
sock.recv(65535)
229+
sock.recv(65535)
230+
sock.send(SettingsFrame(0).serialize())
231+
sock.recv(65535)
232+
233+
# Now, send the headers for the response.
234+
f = build_headers_frame([(':status', '200')])
235+
f.stream_id = 1
236+
sock.send(f.serialize())
237+
238+
# Wait for the message from the main thread.
239+
recv_event.wait()
240+
sock.close()
241+
242+
self._start_server(socket_handler)
243+
conn = HTTP20Connection(self.host, self.port)
244+
conn.request('GET', '/')
245+
resp = conn.getresponse()
246+
247+
# Close the response.
248+
resp.close()
249+
250+
recv_event.set()
251+
252+
assert not conn.streams
253+
203254

204255
class TestRequestsAdapter(SocketLevelTest):
205256
def test_adapter_received_values(self):

0 commit comments

Comments
 (0)