Skip to content
This repository was archived by the owner on Jan 13, 2021. It is now read-only.

Commit 3b12f7a

Browse files
author
Tim Emiola
committed
Ensure synchronized access to the H2Connection
- introduce _LockedH2Connection that wraps the h2.connection.H2Connection - use it to prevent unsynchronized access to the connection
1 parent cb5b604 commit 3b12f7a

File tree

1 file changed

+36
-20
lines changed

1 file changed

+36
-20
lines changed

hyper/http20/connection.py

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,20 @@
3535
TRANSIENT_SSL_ERRORS = (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE)
3636

3737

38+
class _LockedObject(object):
39+
40+
def __init__(self, obj):
41+
self.lock = threading.RLock()
42+
self._obj = obj
43+
44+
def __enter__(self):
45+
self.lock.acquire()
46+
return self._obj
47+
48+
def __exit__(self, _exc_type, _exc_val, _exc_tb):
49+
self.lock.release()
50+
51+
3852
class HTTP20Connection(object):
3953
"""
4054
An object representing a single HTTP/2 connection to a server.
@@ -112,14 +126,10 @@ def __init__(self, host, port=None, secure=None, window_manager=None,
112126
self.force_proto = force_proto
113127
# Concurrency
114128
#
115-
# Use one lock (_lock) to synchronize
116-
# - connect()
117-
# ensures that socket creation, sending the preamble, and the initial
118-
# settings exchange occur just once.
119-
# - _new_stream()
120-
# ensures that stream creation sees a consistent connection state
121-
# - close()
122-
# ensures that closing threads see a consistent connection state
129+
# Use one lock (_lock) to synchronize any interaction with the
130+
# connection. This is because the h2.connection.H2Connection is
131+
# completely unthreadsafe, for thread safety, all access to it must be
132+
# synchronized.
123133
#
124134
# It's ok to use the same in lock all these cases as they occur at
125135
# different/linked points in the connection's lifecycle.
@@ -169,7 +179,7 @@ def __init_state(self):
169179
users should be strongly discouraged from messing about with connection
170180
objects themselves.
171181
"""
172-
self._conn = h2.connection.H2Connection()
182+
self._conn = _LockedObject(h2.connection.H2Connection())
173183

174184
# Streams are stored in a dictionary keyed off their stream IDs. We
175185
# also save the most recent one for easy access without having to walk
@@ -343,10 +353,11 @@ def _send_preamble(self):
343353
"""
344354
# We need to send the connection header immediately on this
345355
# connection, followed by an initial settings frame.
346-
self._conn.initiate_connection()
347-
self._conn.update_settings(
348-
{h2.settings.ENABLE_PUSH: int(self._enable_push)}
349-
)
356+
with self._conn as conn:
357+
conn.initiate_connection()
358+
conn.update_settings(
359+
{h2.settings.ENABLE_PUSH: int(self._enable_push)}
360+
)
350361
self._send_outstanding_data()
351362

352363
# The server will also send an initial settings frame, so get it.
@@ -379,7 +390,8 @@ def close(self, error_code=None):
379390

380391
# Send GoAway frame to the server
381392
try:
382-
self._conn.close_connection(error_code or 0)
393+
with self._conn as conn:
394+
conn.close_connection(error_code or 0)
383395
self._send_outstanding_data(tolerate_peer_gone=True)
384396
except Exception as e: # pragma: no cover
385397
log.warn("GoAway frame could not be sent: %s" % e)
@@ -397,7 +409,8 @@ def _send_outstanding_data(self, tolerate_peer_gone=False,
397409
#
398410
# i/o occurs while the lock is held; waiting threads will see a delay.
399411
with self._write_lock:
400-
data = self._conn.data_to_send()
412+
with self._conn as conn:
413+
data = conn.data_to_send()
401414
if data or send_empty:
402415
self._send_cb(data, tolerate_peer_gone=tolerate_peer_gone)
403416

@@ -527,11 +540,11 @@ def _new_stream(self, stream_id=None, local_closed=False):
527540
# self.next_stream_id in a consistent state
528541
#
529542
# No i/o occurs, the delay in waiting threads depends on their number.
530-
with self._lock:
543+
with self._lock, self._conn as conn:
531544
s = Stream(
532545
stream_id or self.next_stream_id,
533546
self.__wm_class(DEFAULT_WINDOW_SIZE),
534-
self._conn,
547+
conn,
535548
self._send_cb,
536549
self._recv_cb,
537550
self._stream_close_cb,
@@ -577,7 +590,8 @@ def _adjust_receive_window(self, frame_len):
577590
increment = self.window_manager._handle_frame(frame_len)
578591

579592
if increment:
580-
self._conn.increment_flow_control_window(increment)
593+
with self._conn as conn:
594+
conn.increment_flow_control_window(increment)
581595
self._send_outstanding_data(tolerate_peer_gone=True)
582596

583597
return
@@ -598,7 +612,8 @@ def _single_read(self):
598612
self._sock.fill()
599613
data = self._sock.buffer.tobytes()
600614
self._sock.advance_buffer(len(data))
601-
events = self._conn.receive_data(data)
615+
with self._conn as conn:
616+
events = conn.receive_data(data)
602617
stream_ids = set(getattr(e, 'stream_id', -1) for e in events)
603618
stream_ids.discard(-1) # sentinel
604619
stream_ids.discard(0) # connection events
@@ -735,7 +750,8 @@ def _send_rst_frame(self, stream_id, error_code):
735750
#
736751
# i/o occurs while the lock is held; waiting threads will see a delay.
737752
with self._write_lock:
738-
self._conn.reset_stream(stream_id, error_code=error_code)
753+
with self._conn as conn:
754+
conn.reset_stream(stream_id, error_code=error_code)
739755
self._send_outstanding_data()
740756

741757
# Concurrency

0 commit comments

Comments
 (0)