Skip to content

Commit 3f81acc

Browse files
committed
auto-emit WINDOW_UPDATE frames on unprocessed DATA frames
fixes #1210
1 parent adc6e25 commit 3f81acc

File tree

6 files changed

+211
-39
lines changed

6 files changed

+211
-39
lines changed

h2/connection.py

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1634,6 +1634,35 @@ def _receive_push_promise_frame(self, frame):
16341634

16351635
return frames, events + stream_events
16361636

1637+
def _handle_data_on_closed_stream(self, events, exc, frame):
1638+
# This stream is already closed - and yet we received a DATA frame.
1639+
# The received DATA frame counts towards the connection flow window.
1640+
# We need to manually to acknowledge the DATA frame to update the flow
1641+
# window of the connection. Otherwise the whole connection stalls due
1642+
# the inbound flow window being 0.
1643+
frames = []
1644+
conn_manager = self._inbound_flow_control_window_manager
1645+
conn_increment = conn_manager.process_bytes(
1646+
frame.flow_controlled_length
1647+
)
1648+
if conn_increment:
1649+
f = WindowUpdateFrame(0)
1650+
f.window_increment = conn_increment
1651+
frames.append(f)
1652+
self.config.logger.debug(
1653+
"Received DATA frame on closed stream %d - "
1654+
"auto-emitted a WINDOW_UPDATE by %d",
1655+
frame.stream_id, conn_increment
1656+
)
1657+
f = RstStreamFrame(exc.stream_id)
1658+
f.error_code = exc.error_code
1659+
frames.append(f)
1660+
self.config.logger.debug(
1661+
"Stream %d already CLOSED or cleaned up - "
1662+
"auto-emitted a RST_FRAME" % frame.stream_id
1663+
)
1664+
return frames, events + exc._events
1665+
16371666
def _receive_data_frame(self, frame):
16381667
"""
16391668
Receive a data frame on the connection.
@@ -1646,12 +1675,19 @@ def _receive_data_frame(self, frame):
16461675
self._inbound_flow_control_window_manager.window_consumed(
16471676
flow_controlled_length
16481677
)
1649-
stream = self._get_stream_by_id(frame.stream_id)
1650-
frames, stream_events = stream.receive_data(
1651-
frame.data,
1652-
'END_STREAM' in frame.flags,
1653-
flow_controlled_length
1654-
)
1678+
1679+
try:
1680+
stream = self._get_stream_by_id(frame.stream_id)
1681+
frames, stream_events = stream.receive_data(
1682+
frame.data,
1683+
'END_STREAM' in frame.flags,
1684+
flow_controlled_length
1685+
)
1686+
except StreamClosedError as e:
1687+
# This stream is either marked as CLOSED or already gone from our
1688+
# internal state.
1689+
return self._handle_data_on_closed_stream(events, e, frame)
1690+
16551691
return frames, events + stream_events
16561692

16571693
def _receive_settings_frame(self, frame):

test/test_closed_streams.py

Lines changed: 103 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,6 @@ class TestStreamsClosedByEndStream(object):
195195
@pytest.mark.parametrize(
196196
"frame",
197197
[
198-
lambda self, ff: ff.build_data_frame(b'hello'),
199198
lambda self, ff: ff.build_headers_frame(
200199
self.example_request_headers, flags=['END_STREAM']),
201200
lambda self, ff: ff.build_headers_frame(
@@ -245,7 +244,6 @@ def test_frames_after_recv_end_will_error(self,
245244
@pytest.mark.parametrize(
246245
"frame",
247246
[
248-
lambda self, ff: ff.build_data_frame(b'hello'),
249247
lambda self, ff: ff.build_headers_frame(
250248
self.example_response_headers, flags=['END_STREAM']),
251249
lambda self, ff: ff.build_headers_frame(
@@ -344,15 +342,15 @@ class TestStreamsClosedByRstStream(object):
344342
self.example_request_headers),
345343
lambda self, ff: ff.build_headers_frame(
346344
self.example_request_headers, flags=['END_STREAM']),
347-
lambda self, ff: ff.build_data_frame(b'hello'),
348345
]
349346
)
350347
def test_resets_further_frames_after_recv_reset(self,
351348
frame_factory,
352349
frame):
353350
"""
354351
A stream that is closed by receive RST_STREAM can receive further
355-
frames: it simply sends RST_STREAM for it.
352+
frames: it simply sends RST_STREAM for it, and additionally
353+
WINDOW_UPDATE for DATA frames.
356354
"""
357355
c = h2.connection.H2Connection(config=self.server_config)
358356
c.receive_data(frame_factory.preamble())
@@ -396,14 +394,66 @@ def test_resets_further_frames_after_recv_reset(self,
396394
assert not events
397395
assert c.data_to_send() == rst_frame.serialize() * 3
398396

397+
def test_resets_further_data_frames_after_recv_reset(self,
398+
frame_factory):
399+
"""
400+
A stream that is closed by receive RST_STREAM can receive further
401+
DATA frames: it simply sends WINDOW_UPDATE for the connection flow
402+
window, and RST_STREAM for the stream.
403+
"""
404+
c = h2.connection.H2Connection(config=self.server_config)
405+
c.receive_data(frame_factory.preamble())
406+
c.initiate_connection()
407+
408+
header_frame = frame_factory.build_headers_frame(
409+
self.example_request_headers, flags=['END_STREAM']
410+
)
411+
c.receive_data(header_frame.serialize())
412+
413+
c.send_headers(
414+
stream_id=1,
415+
headers=self.example_response_headers,
416+
end_stream=False
417+
)
418+
419+
rst_frame = frame_factory.build_rst_stream_frame(
420+
1, h2.errors.ErrorCodes.STREAM_CLOSED
421+
)
422+
c.receive_data(rst_frame.serialize())
423+
c.clear_outbound_data_buffer()
424+
425+
f = frame_factory.build_data_frame(
426+
data=b'some data'
427+
)
428+
429+
events = c.receive_data(f.serialize())
430+
assert not events
431+
432+
expected = frame_factory.build_rst_stream_frame(
433+
stream_id=1,
434+
error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
435+
).serialize()
436+
assert c.data_to_send() == expected
437+
438+
events = c.receive_data(f.serialize() * 3)
439+
assert not events
440+
assert c.data_to_send() == expected * 3
441+
442+
# Iterate over the streams to make sure it's gone, then confirm the
443+
# behaviour is unchanged.
444+
c.open_outbound_streams
445+
446+
events = c.receive_data(f.serialize() * 3)
447+
assert not events
448+
assert c.data_to_send() == expected * 3
449+
399450
@pytest.mark.parametrize(
400451
"frame",
401452
[
402453
lambda self, ff: ff.build_headers_frame(
403454
self.example_request_headers),
404455
lambda self, ff: ff.build_headers_frame(
405456
self.example_request_headers, flags=['END_STREAM']),
406-
lambda self, ff: ff.build_data_frame(b'hello'),
407457
]
408458
)
409459
def test_resets_further_frames_after_send_reset(self,
@@ -455,3 +505,51 @@ def test_resets_further_frames_after_send_reset(self,
455505
events = c.receive_data(f.serialize() * 3)
456506
assert not events
457507
assert c.data_to_send() == rst_frame.serialize() * 3
508+
509+
def test_resets_further_data_frames_after_send_reset(self,
510+
frame_factory):
511+
"""
512+
A stream that is closed by sent RST_STREAM can receive further
513+
data frames: it simply sends WINDOW_UPDATE and RST_STREAM for it.
514+
"""
515+
c = h2.connection.H2Connection(config=self.server_config)
516+
c.receive_data(frame_factory.preamble())
517+
c.initiate_connection()
518+
519+
header_frame = frame_factory.build_headers_frame(
520+
self.example_request_headers, flags=['END_STREAM']
521+
)
522+
c.receive_data(header_frame.serialize())
523+
524+
c.send_headers(
525+
stream_id=1,
526+
headers=self.example_response_headers,
527+
end_stream=False
528+
)
529+
530+
c.reset_stream(1, h2.errors.ErrorCodes.INTERNAL_ERROR)
531+
532+
c.clear_outbound_data_buffer()
533+
534+
f = frame_factory.build_data_frame(
535+
data=b'some data'
536+
)
537+
events = c.receive_data(f.serialize())
538+
assert not events
539+
expected = frame_factory.build_rst_stream_frame(
540+
stream_id=1,
541+
error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
542+
).serialize()
543+
assert c.data_to_send() == expected
544+
545+
events = c.receive_data(f.serialize() * 3)
546+
assert not events
547+
assert c.data_to_send() == expected * 3
548+
549+
# Iterate over the streams to make sure it's gone, then confirm the
550+
# behaviour is unchanged.
551+
c.open_outbound_streams
552+
553+
events = c.receive_data(f.serialize() * 3)
554+
assert not events
555+
assert c.data_to_send() == expected * 3

test/test_flow_control_window.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,42 @@ def test_reject_local_overlarge_increase_stream_window(self):
638638
with pytest.raises(h2.exceptions.FlowControlError):
639639
c.increment_flow_control_window(increment=increment, stream_id=1)
640640

641+
def test_send_update_on_closed_streams(self, frame_factory):
642+
c = h2.connection.H2Connection()
643+
c.initiate_connection()
644+
c.send_headers(1, self.example_request_headers)
645+
c.reset_stream(1)
646+
647+
c.clear_outbound_data_buffer()
648+
c.open_outbound_streams
649+
c.open_inbound_streams
650+
651+
f = frame_factory.build_data_frame(b'some data'*1500)
652+
events = c.receive_data(f.serialize()*3)
653+
assert not events
654+
655+
expected = frame_factory.build_rst_stream_frame(
656+
stream_id=1,
657+
error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
658+
).serialize() * 2 + frame_factory.build_window_update_frame(
659+
stream_id=0,
660+
increment=40500,
661+
).serialize() + frame_factory.build_rst_stream_frame(
662+
stream_id=1,
663+
error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
664+
).serialize()
665+
assert c.data_to_send() == expected
666+
667+
f = frame_factory.build_data_frame(b'')
668+
events = c.receive_data(f.serialize())
669+
assert not events
670+
671+
expected = frame_factory.build_rst_stream_frame(
672+
stream_id=1,
673+
error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
674+
).serialize()
675+
assert c.data_to_send() == expected
676+
641677

642678
class TestAutomaticFlowControl(object):
643679
"""

test/test_h2_upgrade.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,11 @@ def test_cannot_receive_data_stream_1(self, frame_factory):
260260
)
261261
c.receive_data(f.serialize())
262262

263-
expected_frame = frame_factory.build_rst_stream_frame(
263+
expected = frame_factory.build_rst_stream_frame(
264264
stream_id=1,
265265
error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
266-
)
267-
assert c.data_to_send() == expected_frame.serialize()
266+
).serialize()
267+
assert c.data_to_send() == expected
268268

269269
def test_client_settings_are_applied(self, frame_factory):
270270
"""

test/test_invalid_frame_sequences.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -149,14 +149,16 @@ def test_reject_data_on_closed_streams(self, frame_factory):
149149
c.receive_data(f.serialize())
150150
c.clear_outbound_data_buffer()
151151

152-
bad_frame = frame_factory.build_data_frame(data=b'hello')
152+
bad_frame = frame_factory.build_data_frame(
153+
data=b'some data'
154+
)
153155
c.receive_data(bad_frame.serialize())
154156

155-
expected_frame = frame_factory.build_rst_stream_frame(
157+
expected = frame_factory.build_rst_stream_frame(
156158
stream_id=1,
157-
error_code=0x5,
158-
)
159-
assert c.data_to_send() == expected_frame.serialize()
159+
error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
160+
).serialize()
161+
assert c.data_to_send() == expected
160162

161163
def test_unexpected_continuation_on_closed_stream(self, frame_factory):
162164
"""
@@ -293,15 +295,15 @@ def test_get_stream_reset_event_on_auto_reset(self, frame_factory):
293295
c.clear_outbound_data_buffer()
294296

295297
bad_frame = frame_factory.build_data_frame(
296-
data=b'hello'
298+
data=b'some data'
297299
)
298300
events = c.receive_data(bad_frame.serialize())
299301

300-
expected_frame = frame_factory.build_rst_stream_frame(
302+
expected = frame_factory.build_rst_stream_frame(
301303
stream_id=1,
302-
error_code=0x5,
303-
)
304-
assert c.data_to_send() == expected_frame.serialize()
304+
error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
305+
).serialize()
306+
assert c.data_to_send() == expected
305307

306308
assert len(events) == 1
307309
event = events[0]
@@ -327,16 +329,16 @@ def test_one_one_stream_reset(self, frame_factory):
327329
c.clear_outbound_data_buffer()
328330

329331
bad_frame = frame_factory.build_data_frame(
330-
data=b'hello'
332+
data=b'some data'
331333
)
332334
# Receive 5 frames.
333335
events = c.receive_data(bad_frame.serialize() * 5)
334336

335-
expected_frame = frame_factory.build_rst_stream_frame(
337+
expected = frame_factory.build_rst_stream_frame(
336338
stream_id=1,
337-
error_code=0x5,
338-
)
339-
assert c.data_to_send() == expected_frame.serialize() * 5
339+
error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
340+
).serialize()
341+
assert c.data_to_send() == expected * 5
340342

341343
assert len(events) == 1
342344
event = events[0]

test/test_stream_reset.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ def test_reset_stream_keeps_flow_control_correct(self,
7070
other_id,
7171
frame_factory):
7272
"""
73-
A stream that has been reset still affects the connection flow control
74-
window.
73+
A stream that has been reset does not affect the connection flow
74+
control window.
7575
"""
7676
c = h2.connection.H2Connection()
7777
c.initiate_connection()
@@ -89,19 +89,19 @@ def test_reset_stream_keeps_flow_control_correct(self,
8989
c.clear_outbound_data_buffer()
9090

9191
f = frame_factory.build_data_frame(
92-
data=b'some data!',
92+
data=b'some data',
9393
stream_id=close_id
9494
)
95-
events = c.receive_data(f.serialize())
95+
c.receive_data(f.serialize())
9696

97-
rst_frame = frame_factory.build_rst_stream_frame(
98-
close_id, h2.errors.ErrorCodes.STREAM_CLOSED
99-
)
100-
assert not events
101-
assert c.data_to_send() == rst_frame.serialize()
97+
expected = frame_factory.build_rst_stream_frame(
98+
stream_id=close_id,
99+
error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
100+
).serialize()
101+
assert c.data_to_send() == expected
102102

103103
new_window = c.remote_flow_control_window(stream_id=other_id)
104-
assert initial_window - len(b'some data!') == new_window
104+
assert initial_window - len(b'some data') == new_window
105105

106106
@pytest.mark.parametrize('clear_streams', [True, False])
107107
def test_reset_stream_automatically_resets_pushed_streams(self,

0 commit comments

Comments
 (0)