Skip to content

Commit 7be1f28

Browse files
authored
Merge pull request #1211 from Kriechi/fix-starved-flow-window
auto-emit WINDOW_UPDATE frames on unprocessed DATA frames
2 parents 306609a + f3cc06d commit 7be1f28

13 files changed

+247
-49
lines changed

.travis.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ matrix:
2828
- python: "3.7"
2929
dist: xenial
3030
env: TOXENV=py37
31+
- python: "3.8"
32+
dist: xenial
33+
env: TOXENV=py38
3134
- python: "pypy-5.3.1"
3235
dist: trusty
3336
env: TOXENV=pypy

HISTORY.rst

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,19 @@ Release History
44
3.1.1dev0
55
---------
66

7+
Bugfixes
8+
~~~~~~~~
9+
10+
- Receiving DATA frames on closed (or reset) streams now properly emit a
11+
WINDOW_UPDATE to keep the connection flow window topped up.
12+
13+
API Changes (Backward-Incompatible)
14+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
15+
16+
- ``h2.config.logger`` now uses a `trace(...)` function, in addition
17+
to `debug(...)`. If you defined a custom logger object, you need to handle
18+
these new function calls.
19+
720

821
3.1.1 (2019-08-02)
922
------------------

h2/config.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class DummyLogger(object):
2929
"""
3030
An Logger object that does not actual logging, hence a DummyLogger.
3131
32-
For the class the log operation is merely a no-op. The intent is to avoid
32+
For the class the log operation is merely a no-op. The intent is to avoid
3333
conditionals being sprinkled throughout the hyper-h2 code for calls to
3434
logging functions when no logger is passed into the corresponding object.
3535
"""
@@ -42,6 +42,12 @@ def debug(self, *vargs, **kwargs):
4242
"""
4343
pass
4444

45+
def trace(self, *vargs, **kwargs):
46+
"""
47+
No-op logging. Only level needed for now.
48+
"""
49+
pass
50+
4551

4652
class H2Configuration(object):
4753
"""

h2/connection.py

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1450,7 +1450,7 @@ def receive_data(self, data):
14501450
:returns: A list of events that the remote peer triggered by sending
14511451
this data.
14521452
"""
1453-
self.config.logger.debug(
1453+
self.config.logger.trace(
14541454
"Process received data on connection. Received data: %r", data
14551455
)
14561456

@@ -1634,6 +1634,35 @@ def _receive_push_promise_frame(self, frame):
16341634

16351635
return frames, events + stream_events
16361636

1637+
def _handle_data_on_closed_stream(self, events, exc, frame):
1638+
# This stream is already closed - and yet we received a DATA frame.
1639+
# The received DATA frame counts towards the connection flow window.
1640+
# We need to manually to acknowledge the DATA frame to update the flow
1641+
# window of the connection. Otherwise the whole connection stalls due
1642+
# the inbound flow window being 0.
1643+
frames = []
1644+
conn_manager = self._inbound_flow_control_window_manager
1645+
conn_increment = conn_manager.process_bytes(
1646+
frame.flow_controlled_length
1647+
)
1648+
if conn_increment:
1649+
f = WindowUpdateFrame(0)
1650+
f.window_increment = conn_increment
1651+
frames.append(f)
1652+
self.config.logger.debug(
1653+
"Received DATA frame on closed stream %d - "
1654+
"auto-emitted a WINDOW_UPDATE by %d",
1655+
frame.stream_id, conn_increment
1656+
)
1657+
f = RstStreamFrame(exc.stream_id)
1658+
f.error_code = exc.error_code
1659+
frames.append(f)
1660+
self.config.logger.debug(
1661+
"Stream %d already CLOSED or cleaned up - "
1662+
"auto-emitted a RST_FRAME" % frame.stream_id
1663+
)
1664+
return frames, events + exc._events
1665+
16371666
def _receive_data_frame(self, frame):
16381667
"""
16391668
Receive a data frame on the connection.
@@ -1646,12 +1675,19 @@ def _receive_data_frame(self, frame):
16461675
self._inbound_flow_control_window_manager.window_consumed(
16471676
flow_controlled_length
16481677
)
1649-
stream = self._get_stream_by_id(frame.stream_id)
1650-
frames, stream_events = stream.receive_data(
1651-
frame.data,
1652-
'END_STREAM' in frame.flags,
1653-
flow_controlled_length
1654-
)
1678+
1679+
try:
1680+
stream = self._get_stream_by_id(frame.stream_id)
1681+
frames, stream_events = stream.receive_data(
1682+
frame.data,
1683+
'END_STREAM' in frame.flags,
1684+
flow_controlled_length
1685+
)
1686+
except StreamClosedError as e:
1687+
# This stream is either marked as CLOSED or already gone from our
1688+
# internal state.
1689+
return self._handle_data_on_closed_stream(events, e, frame)
1690+
16551691
return frames, events + stream_events
16561692

16571693
def _receive_settings_frame(self, frame):

setup.cfg

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
1+
[tool:pytest]
2+
testpaths = test
3+
14
[wheel]
25
universal = 1

setup.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,19 @@
5858
'Programming Language :: Python :: 2',
5959
'Programming Language :: Python :: 2.7',
6060
'Programming Language :: Python :: 3',
61-
'Programming Language :: Python :: 3.3',
6261
'Programming Language :: Python :: 3.4',
6362
'Programming Language :: Python :: 3.5',
6463
'Programming Language :: Python :: 3.6',
64+
'Programming Language :: Python :: 3.7',
65+
'Programming Language :: Python :: 3.8',
6566
'Programming Language :: Python :: Implementation :: CPython',
6667
'Programming Language :: Python :: Implementation :: PyPy',
6768
],
6869
install_requires=[
6970
'hyperframe>=5.2.0, <6',
70-
'hpack>=2.3,<4',
71+
'hpack>=3.0,<4',
7172
],
7273
extras_require={
73-
':python_version == "2.7" or python_version == "3.3"': ['enum34>=1.1.6, <2'],
74+
':python_version == "2.7"': ['enum34>=1.1.6, <2'],
7475
}
7576
)

test/test_closed_streams.py

Lines changed: 103 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,6 @@ class TestStreamsClosedByEndStream(object):
195195
@pytest.mark.parametrize(
196196
"frame",
197197
[
198-
lambda self, ff: ff.build_data_frame(b'hello'),
199198
lambda self, ff: ff.build_headers_frame(
200199
self.example_request_headers, flags=['END_STREAM']),
201200
lambda self, ff: ff.build_headers_frame(
@@ -245,7 +244,6 @@ def test_frames_after_recv_end_will_error(self,
245244
@pytest.mark.parametrize(
246245
"frame",
247246
[
248-
lambda self, ff: ff.build_data_frame(b'hello'),
249247
lambda self, ff: ff.build_headers_frame(
250248
self.example_response_headers, flags=['END_STREAM']),
251249
lambda self, ff: ff.build_headers_frame(
@@ -344,15 +342,15 @@ class TestStreamsClosedByRstStream(object):
344342
self.example_request_headers),
345343
lambda self, ff: ff.build_headers_frame(
346344
self.example_request_headers, flags=['END_STREAM']),
347-
lambda self, ff: ff.build_data_frame(b'hello'),
348345
]
349346
)
350347
def test_resets_further_frames_after_recv_reset(self,
351348
frame_factory,
352349
frame):
353350
"""
354351
A stream that is closed by receive RST_STREAM can receive further
355-
frames: it simply sends RST_STREAM for it.
352+
frames: it simply sends RST_STREAM for it, and additionally
353+
WINDOW_UPDATE for DATA frames.
356354
"""
357355
c = h2.connection.H2Connection(config=self.server_config)
358356
c.receive_data(frame_factory.preamble())
@@ -396,14 +394,66 @@ def test_resets_further_frames_after_recv_reset(self,
396394
assert not events
397395
assert c.data_to_send() == rst_frame.serialize() * 3
398396

397+
def test_resets_further_data_frames_after_recv_reset(self,
398+
frame_factory):
399+
"""
400+
A stream that is closed by receive RST_STREAM can receive further
401+
DATA frames: it simply sends WINDOW_UPDATE for the connection flow
402+
window, and RST_STREAM for the stream.
403+
"""
404+
c = h2.connection.H2Connection(config=self.server_config)
405+
c.receive_data(frame_factory.preamble())
406+
c.initiate_connection()
407+
408+
header_frame = frame_factory.build_headers_frame(
409+
self.example_request_headers, flags=['END_STREAM']
410+
)
411+
c.receive_data(header_frame.serialize())
412+
413+
c.send_headers(
414+
stream_id=1,
415+
headers=self.example_response_headers,
416+
end_stream=False
417+
)
418+
419+
rst_frame = frame_factory.build_rst_stream_frame(
420+
1, h2.errors.ErrorCodes.STREAM_CLOSED
421+
)
422+
c.receive_data(rst_frame.serialize())
423+
c.clear_outbound_data_buffer()
424+
425+
f = frame_factory.build_data_frame(
426+
data=b'some data'
427+
)
428+
429+
events = c.receive_data(f.serialize())
430+
assert not events
431+
432+
expected = frame_factory.build_rst_stream_frame(
433+
stream_id=1,
434+
error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
435+
).serialize()
436+
assert c.data_to_send() == expected
437+
438+
events = c.receive_data(f.serialize() * 3)
439+
assert not events
440+
assert c.data_to_send() == expected * 3
441+
442+
# Iterate over the streams to make sure it's gone, then confirm the
443+
# behaviour is unchanged.
444+
c.open_outbound_streams
445+
446+
events = c.receive_data(f.serialize() * 3)
447+
assert not events
448+
assert c.data_to_send() == expected * 3
449+
399450
@pytest.mark.parametrize(
400451
"frame",
401452
[
402453
lambda self, ff: ff.build_headers_frame(
403454
self.example_request_headers),
404455
lambda self, ff: ff.build_headers_frame(
405456
self.example_request_headers, flags=['END_STREAM']),
406-
lambda self, ff: ff.build_data_frame(b'hello'),
407457
]
408458
)
409459
def test_resets_further_frames_after_send_reset(self,
@@ -455,3 +505,51 @@ def test_resets_further_frames_after_send_reset(self,
455505
events = c.receive_data(f.serialize() * 3)
456506
assert not events
457507
assert c.data_to_send() == rst_frame.serialize() * 3
508+
509+
def test_resets_further_data_frames_after_send_reset(self,
510+
frame_factory):
511+
"""
512+
A stream that is closed by sent RST_STREAM can receive further
513+
data frames: it simply sends WINDOW_UPDATE and RST_STREAM for it.
514+
"""
515+
c = h2.connection.H2Connection(config=self.server_config)
516+
c.receive_data(frame_factory.preamble())
517+
c.initiate_connection()
518+
519+
header_frame = frame_factory.build_headers_frame(
520+
self.example_request_headers, flags=['END_STREAM']
521+
)
522+
c.receive_data(header_frame.serialize())
523+
524+
c.send_headers(
525+
stream_id=1,
526+
headers=self.example_response_headers,
527+
end_stream=False
528+
)
529+
530+
c.reset_stream(1, h2.errors.ErrorCodes.INTERNAL_ERROR)
531+
532+
c.clear_outbound_data_buffer()
533+
534+
f = frame_factory.build_data_frame(
535+
data=b'some data'
536+
)
537+
events = c.receive_data(f.serialize())
538+
assert not events
539+
expected = frame_factory.build_rst_stream_frame(
540+
stream_id=1,
541+
error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
542+
).serialize()
543+
assert c.data_to_send() == expected
544+
545+
events = c.receive_data(f.serialize() * 3)
546+
assert not events
547+
assert c.data_to_send() == expected * 3
548+
549+
# Iterate over the streams to make sure it's gone, then confirm the
550+
# behaviour is unchanged.
551+
c.open_outbound_streams
552+
553+
events = c.receive_data(f.serialize() * 3)
554+
assert not events
555+
assert c.data_to_send() == expected * 3

test/test_flow_control_window.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,42 @@ def test_reject_local_overlarge_increase_stream_window(self):
638638
with pytest.raises(h2.exceptions.FlowControlError):
639639
c.increment_flow_control_window(increment=increment, stream_id=1)
640640

641+
def test_send_update_on_closed_streams(self, frame_factory):
642+
c = h2.connection.H2Connection()
643+
c.initiate_connection()
644+
c.send_headers(1, self.example_request_headers)
645+
c.reset_stream(1)
646+
647+
c.clear_outbound_data_buffer()
648+
c.open_outbound_streams
649+
c.open_inbound_streams
650+
651+
f = frame_factory.build_data_frame(b'some data'*1500)
652+
events = c.receive_data(f.serialize()*3)
653+
assert not events
654+
655+
expected = frame_factory.build_rst_stream_frame(
656+
stream_id=1,
657+
error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
658+
).serialize() * 2 + frame_factory.build_window_update_frame(
659+
stream_id=0,
660+
increment=40500,
661+
).serialize() + frame_factory.build_rst_stream_frame(
662+
stream_id=1,
663+
error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
664+
).serialize()
665+
assert c.data_to_send() == expected
666+
667+
f = frame_factory.build_data_frame(b'')
668+
events = c.receive_data(f.serialize())
669+
assert not events
670+
671+
expected = frame_factory.build_rst_stream_frame(
672+
stream_id=1,
673+
error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
674+
).serialize()
675+
assert c.data_to_send() == expected
676+
641677

642678
class TestAutomaticFlowControl(object):
643679
"""

test/test_h2_upgrade.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,11 @@ def test_cannot_receive_data_stream_1(self, frame_factory):
260260
)
261261
c.receive_data(f.serialize())
262262

263-
expected_frame = frame_factory.build_rst_stream_frame(
263+
expected = frame_factory.build_rst_stream_frame(
264264
stream_id=1,
265265
error_code=h2.errors.ErrorCodes.STREAM_CLOSED,
266-
)
267-
assert c.data_to_send() == expected_frame.serialize()
266+
).serialize()
267+
assert c.data_to_send() == expected
268268

269269
def test_client_settings_are_applied(self, frame_factory):
270270
"""

0 commit comments

Comments
 (0)