Skip to content
This repository was archived by the owner on Jan 13, 2021. It is now read-only.

Commit 23a1555

Browse files
authored
Merge pull request #280 from plucury/development
try to avoid deadlock
2 parents 88da7c3 + 0e2fca8 commit 23a1555

File tree

2 files changed

+55
-44
lines changed

2 files changed

+55
-44
lines changed

hyper/http20/connection.py

Lines changed: 16 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -139,36 +139,9 @@ def __init__(self, host, port=None, secure=None, window_manager=None,
139139

140140
# Concurrency
141141
#
142-
# Use one lock (_lock) to synchronize any interaction with global
143-
# connection state, e.g. stream creation/deletion.
144-
#
145-
# It's ok to use the same in lock all these cases as they occur at
146-
# different/linked points in the connection's lifecycle.
147-
#
148-
# Use another 2 locks (_write_lock, _read_lock) to synchronize
149-
# - _send_cb
150-
# - _recv_cb
151-
# respectively.
152-
#
153-
# I.e, send/recieve on the connection and its streams are serialized
154-
# separately across the threads accessing the connection. This is a
155-
# simple way of providing thread-safety.
156-
#
157-
# _write_lock and _read_lock synchronize all interactions between
158-
# streams and the connnection. There is a third I/O callback,
159-
# _close_stream, passed to a stream's constructor. It does not need to
160-
# be synchronized, it uses _send_cb internally (which is serialized);
161-
# its other activity (safe deletion of the stream from self.streams)
162-
# does not require synchronization.
163-
#
164-
# _read_lock may be acquired when already holding the _write_lock,
165-
# when they both held it is always by acquiring _write_lock first.
166-
#
167-
# Either _read_lock or _write_lock may be acquired whilst holding _lock
168-
# which should always be acquired before either of the other two.
142+
# Use one universal lock (_lock) to synchronize all interaction
143+
# with global connection state, _send_cb and _recv_cb.
169144
self._lock = threading.RLock()
170-
self._write_lock = threading.RLock()
171-
self._read_lock = threading.RLock()
172145

173146
# Create the mutable state.
174147
self.__wm_class = window_manager or FlowControlManager
@@ -232,7 +205,7 @@ def ping(self, opaque_data):
232205
:returns: Nothing
233206
"""
234207
self.connect()
235-
with self._write_lock:
208+
with self._lock:
236209
with self._conn as conn:
237210
conn.ping(to_bytestring(opaque_data))
238211
self._send_outstanding_data()
@@ -271,7 +244,7 @@ def request(self, method, url, body=None, headers=None):
271244
# If threads interleave these operations, it could result in messages
272245
# being sent in the wrong order, which can lead to the out-of-order
273246
# messages with lower stream IDs being closed prematurely.
274-
with self._write_lock:
247+
with self._lock:
275248
stream_id = self.putrequest(method, url)
276249

277250
default_headers = (':method', ':scheme', ':authority', ':path')
@@ -464,10 +437,10 @@ def _send_outstanding_data(self, tolerate_peer_gone=False,
464437
send_empty=True):
465438
# Concurrency
466439
#
467-
# Hold _write_lock; getting and writing data from _conn is synchronized
440+
# Hold _lock; getting and writing data from _conn is synchronized
468441
#
469442
# I/O occurs while the lock is held; waiting threads will see a delay.
470-
with self._write_lock:
443+
with self._lock:
471444
with self._conn as conn:
472445
data = conn.data_to_send()
473446
if data or send_empty:
@@ -557,9 +530,9 @@ def endheaders(self, message_body=None, final=False, stream_id=None):
557530

558531
# Concurrency:
559532
#
560-
# Hold _write_lock: synchronize access to the connection's HPACK
533+
# Hold _lock: synchronize access to the connection's HPACK
561534
# encoder and decoder and the subsquent write to the connection
562-
with self._write_lock:
535+
with self._lock:
563536
stream.send_headers(headers_only)
564537

565538
# Send whatever data we have.
@@ -622,10 +595,10 @@ def _send_cb(self, data, tolerate_peer_gone=False):
622595
"""
623596
# Concurrency
624597
#
625-
# Hold _write_lock: ensures only writer at a time
598+
# Hold _lock: ensures only writer at a time
626599
#
627600
# I/O occurs while the lock is held; waiting threads will see a delay.
628-
with self._write_lock:
601+
with self._lock:
629602
try:
630603
self._sock.sendall(data)
631604
except socket.error as e:
@@ -640,12 +613,12 @@ def _adjust_receive_window(self, frame_len):
640613
"""
641614
# Concurrency
642615
#
643-
# Hold _write_lock; synchronize the window manager update and the
616+
# Hold _lock; synchronize the window manager update and the
644617
# subsequent potential write to the connection
645618
#
646619
# I/O may occur while the lock is held; waiting threads may see a
647620
# delay.
648-
with self._write_lock:
621+
with self._lock:
649622
increment = self.window_manager._handle_frame(frame_len)
650623

651624
if increment:
@@ -667,7 +640,7 @@ def _single_read(self):
667640
# Synchronizes reading the data
668641
#
669642
# I/O occurs while the lock is held; waiting threads will see a delay.
670-
with self._read_lock:
643+
with self._lock:
671644
if self._sock is None:
672645
raise ConnectionError('tried to read after connection close')
673646
self._sock.fill()
@@ -761,7 +734,7 @@ def _recv_cb(self, stream_id=0):
761734
# re-acquired in the calls to self._single_read.
762735
#
763736
# I/O occurs while the lock is held; waiting threads will see a delay.
764-
with self._read_lock:
737+
with self._lock:
765738
log.debug('recv for stream %d with %s already present',
766739
stream_id,
767740
self.recent_recv_streams)
@@ -812,11 +785,11 @@ def _send_rst_frame(self, stream_id, error_code):
812785
"""
813786
# Concurrency
814787
#
815-
# Hold _write_lock; synchronize generating the reset frame and writing
788+
# Hold _lock; synchronize generating the reset frame and writing
816789
# it
817790
#
818791
# I/O occurs while the lock is held; waiting threads will see a delay.
819-
with self._write_lock:
792+
with self._lock:
820793
with self._conn as conn:
821794
conn.reset_stream(stream_id, error_code=error_code)
822795
self._send_outstanding_data()

test/test_integration.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import pytest
1515
from contextlib import contextmanager
1616
from mock import patch
17+
from concurrent.futures import ThreadPoolExecutor, TimeoutError
1718
from h2.frame_buffer import FrameBuffer
1819
from hyper.compat import ssl
1920
from hyper.contrib import HTTP20Adapter
@@ -1039,7 +1040,6 @@ def test_version_after_http_upgrade(self):
10391040

10401041
def socket_handler(listener):
10411042
sock = listener.accept()[0]
1042-
10431043
# We should get the initial request.
10441044
data = b''
10451045
while not data.endswith(b'\r\n\r\n'):
@@ -1089,6 +1089,44 @@ def socket_handler(listener):
10891089

10901090
self.tear_down()
10911091

1092+
def test_connection_and_send_simultaneously(self):
1093+
# Since deadlock occurs probabilistic,
1094+
# It still has deadlock probability
1095+
# even the testcase is passed.
1096+
self.set_up()
1097+
1098+
recv_event = threading.Event()
1099+
1100+
def socket_handler(listener):
1101+
sock = listener.accept()[0]
1102+
1103+
receive_preamble(sock)
1104+
sock.recv(65535)
1105+
1106+
recv_event.set()
1107+
sock.close()
1108+
1109+
def do_req(conn):
1110+
conn.request('GET', '/')
1111+
recv_event.wait()
1112+
1113+
def do_connect(conn):
1114+
conn.connect()
1115+
1116+
self._start_server(socket_handler)
1117+
conn = self.get_connection()
1118+
1119+
pool = ThreadPoolExecutor(max_workers=2)
1120+
pool.submit(do_connect, conn)
1121+
f = pool.submit(do_req, conn)
1122+
1123+
try:
1124+
f.result(timeout=10)
1125+
except TimeoutError:
1126+
assert False
1127+
1128+
self.tear_down()
1129+
10921130

10931131
@patch('hyper.http20.connection.H2_NPN_PROTOCOLS', PROTOCOLS)
10941132
class TestRequestsAdapter(SocketLevelTest):

0 commit comments

Comments
 (0)