Skip to content
This repository was archived by the owner on Jan 13, 2021. It is now read-only.

Commit 040887a

Browse files
author
Tim Emiola
committed
Improve synchronization
- ensure all checks for data to write and subsequent io are synchronized - ensure all access to _conn is synchronized via the appropriate lock
1 parent f1ca5dc commit 040887a

File tree

2 files changed

+64
-36
lines changed

2 files changed

+64
-36
lines changed

hyper/http20/connection.py

Lines changed: 62 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ def _send_preamble(self):
347347
self._conn.update_settings(
348348
{h2.settings.ENABLE_PUSH: int(self._enable_push)}
349349
)
350-
self._sock.sendall(self._conn.data_to_send())
350+
self._send_outstanding_data()
351351

352352
# The server will also send an initial settings frame, so get it.
353353
self._recv_cb()
@@ -380,14 +380,27 @@ def close(self, error_code=None):
380380
# Send GoAway frame to the server
381381
try:
382382
self._conn.close_connection(error_code or 0)
383-
self._send_cb(self._conn.data_to_send(), True)
383+
self._send_outstanding_data(tolerate_peer_gone=True)
384384
except Exception as e: # pragma: no cover
385385
log.warn("GoAway frame could not be sent: %s" % e)
386386

387387
if self._sock is not None:
388388
self._sock.close()
389389
self.__init_state()
390390

391+
def _send_outstanding_data(self, tolerate_peer_gone=False,
392+
send_empty=True):
393+
# Concurrency
394+
#
395+
# Hold _write_lock; getting and writing data from _conn are
396+
# synchronized
397+
#
398+
# i/o occurs while the lock is held; waiting threads will see a delay.
399+
with self._write_lock:
400+
data = self._conn.data_to_send()
401+
if data or send_empty:
402+
self._send_cb(data, tolerate_peer_gone=tolerate_peer_gone)
403+
391404
def putrequest(self, method, selector, **kwargs):
392405
"""
393406
This should be the first call for sending a given HTTP request to a
@@ -472,18 +485,16 @@ def endheaders(self, message_body=None, final=False, stream_id=None):
472485

473486
# Concurrency:
474487
#
475-
# It is necessary to hold the _write_lock here as the streams share the
476-
# connection's hpack encoder and decoder, which is used in stream.open.
477-
# Bad things happen when streams on different threads call stream.open
478-
# without synchronizing.
488+
# Hold _write_lock: synchronize access to the connection's hpack
489+
# encoder and decoder and the subsquent write to the connection
479490
with self._write_lock:
480491
stream.send_headers(headers_only)
481492

482-
# Send whatever data we have.
483-
if message_body is not None:
484-
stream.send_data(message_body, final)
493+
# Send whatever data we have.
494+
if message_body is not None:
495+
stream.send_data(message_body, final)
485496

486-
self._send_cb(self._conn.data_to_send())
497+
self._send_outstanding_data()
487498

488499
return
489500

@@ -512,9 +523,8 @@ def _new_stream(self, stream_id=None, local_closed=False):
512523
"""
513524
# Concurrency
514525
#
515-
# It's necessary to hold the lock here to ensure that multiple threads
516-
# accessing the connection see self.next_stream_id in a consistent
517-
# state
526+
# Hold _lock: ensure that threads accessing the connection see
527+
# self.next_stream_id in a consistent state
518528
#
519529
# No i/o occurs, the delay in waiting threads depends on their number.
520530
with self._lock:
@@ -540,7 +550,7 @@ def _send_cb(self, data, tolerate_peer_gone=False):
540550
"""
541551
# Concurrency
542552
#
543-
# Synchronizes data writes.
553+
# Hold _write_lock: ensures only writer at a time
544554
#
545555
# i/o occurs while the lock is held; waiting threads will see a delay.
546556
with self._write_lock:
@@ -556,11 +566,19 @@ def _adjust_receive_window(self, frame_len):
556566
Adjusts the window size in response to receiving a DATA frame of length
557567
``frame_len``. May send a WINDOWUPDATE frame if necessary.
558568
"""
559-
increment = self.window_manager._handle_frame(frame_len)
569+
# Concurrency
570+
#
571+
# Hold _write_lock; synchronize the window manager update, the
572+
# subsequent potential write to the connection
573+
#
574+
# i/o may occurs while the lock is held; waiting threads may see a
575+
# delay.
576+
with self._write_lock:
577+
increment = self.window_manager._handle_frame(frame_len)
560578

561-
if increment:
562-
self._conn.increment_flow_control_window(increment)
563-
self._send_cb(self._conn.data_to_send(), True)
579+
if increment:
580+
self._conn.increment_flow_control_window(increment)
581+
self._send_outstanding_data(tolerate_peer_gone=True)
564582

565583
return
566584

@@ -637,9 +655,7 @@ def _single_read(self):
637655
else:
638656
log.info("Received unhandled event %s", event)
639657

640-
data = self._conn.data_to_send()
641-
if data:
642-
self._send_cb(data, tolerate_peer_gone=True)
658+
self._send_outstanding_data(tolerate_peer_gone=True, send_empty=False)
643659

644660
def _recv_cb(self, stream_id=0):
645661
"""
@@ -706,20 +722,32 @@ def _send_rst_frame(self, stream_id, error_code):
706722
"""
707723
Send reset stream frame with error code and remove stream from map.
708724
"""
709-
self._conn.reset_stream(stream_id, error_code=error_code)
710-
self._send_cb(self._conn.data_to_send())
725+
# Concurrency
726+
#
727+
# Hold _write_lock; synchronize generating the reset frame and writing
728+
# it
729+
#
730+
# i/o occurs while the lock is held; waiting threads will see a delay.
731+
with self._write_lock:
732+
self._conn.reset_stream(stream_id, error_code=error_code)
733+
self._send_outstanding_data()
711734

712-
try:
713-
del self.streams[stream_id]
714-
self.recent_recv_streams.discard(stream_id)
715-
except KeyError as e: # pragma: no cover
716-
log.warn(
717-
"Stream with id %d does not exist: %s",
718-
stream_id, e)
719-
720-
# Keep track of the fact that we reset this stream in case there are
721-
# other frames in flight.
722-
self.reset_streams.add(stream_id)
735+
# Concurrency
736+
#
737+
# Hold _lock; the stram storage is being update. No i/o occurs, any
738+
# delay is proportional to the number of waiting threads.
739+
with self._lock:
740+
try:
741+
del self.streams[stream_id]
742+
self.recent_recv_streams.discard(stream_id)
743+
except KeyError as e: # pragma: no cover
744+
log.warn(
745+
"Stream with id %d does not exist: %s",
746+
stream_id, e)
747+
748+
# Keep track of the fact that we reset this stream in case there
749+
# are other frames in flight.
750+
self.reset_streams.add(stream_id)
723751

724752
def _stream_close_cb(self, stream_id):
725753
"""

test/test_hyper.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ def test_putheader_replaces_headers(self):
134134
]
135135

136136
def test_endheaders_sends_data(self, frame_buffer):
137-
def data_callback(chunk):
137+
def data_callback(chunk, **kwargs):
138138
frame_buffer.add_data(chunk)
139139

140140
c = HTTP20Connection('www.google.com')
@@ -149,7 +149,7 @@ def data_callback(chunk):
149149
assert isinstance(f, HeadersFrame)
150150

151151
def test_we_can_send_data_using_endheaders(self, frame_buffer):
152-
def data_callback(chunk):
152+
def data_callback(chunk, **kwargs):
153153
frame_buffer.add_data(chunk)
154154

155155
c = HTTP20Connection('www.google.com')

0 commit comments

Comments
 (0)