Skip to content
This repository was archived by the owner on Jan 13, 2021. It is now read-only.

Commit 2223f0d

Browse files
committed
Merge remote-tracking branch 'upstream/development' into secure_proxy
# Conflicts: # hyper/http20/connection.py
2 parents fa238d8 + 0af177d commit 2223f0d

File tree

5 files changed

+62
-46
lines changed

5 files changed

+62
-46
lines changed

hyper/http11/connection.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,8 @@ def close(self):
451451
.. warning:: This method should absolutely only be called when you are
452452
certain the connection object is no longer needed.
453453
"""
454-
self._sock.close()
454+
if self._sock is not None:
455+
self._sock.close()
455456
self._sock = None
456457

457458
# The following two methods are the implementation of the context manager

hyper/http20/connection.py

Lines changed: 16 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -143,36 +143,9 @@ def __init__(self, host, port=None, secure=None, window_manager=None,
143143

144144
# Concurrency
145145
#
146-
# Use one lock (_lock) to synchronize any interaction with global
147-
# connection state, e.g. stream creation/deletion.
148-
#
149-
# It's ok to use the same in lock all these cases as they occur at
150-
# different/linked points in the connection's lifecycle.
151-
#
152-
# Use another 2 locks (_write_lock, _read_lock) to synchronize
153-
# - _send_cb
154-
# - _recv_cb
155-
# respectively.
156-
#
157-
# I.e, send/recieve on the connection and its streams are serialized
158-
# separately across the threads accessing the connection. This is a
159-
# simple way of providing thread-safety.
160-
#
161-
# _write_lock and _read_lock synchronize all interactions between
162-
# streams and the connnection. There is a third I/O callback,
163-
# _close_stream, passed to a stream's constructor. It does not need to
164-
# be synchronized, it uses _send_cb internally (which is serialized);
165-
# its other activity (safe deletion of the stream from self.streams)
166-
# does not require synchronization.
167-
#
168-
# _read_lock may be acquired when already holding the _write_lock,
169-
# when they both held it is always by acquiring _write_lock first.
170-
#
171-
# Either _read_lock or _write_lock may be acquired whilst holding _lock
172-
# which should always be acquired before either of the other two.
146+
# Use one universal lock (_lock) to synchronize all interaction
147+
# with global connection state, _send_cb and _recv_cb.
173148
self._lock = threading.RLock()
174-
self._write_lock = threading.RLock()
175-
self._read_lock = threading.RLock()
176149

177150
# Create the mutable state.
178151
self.__wm_class = window_manager or FlowControlManager
@@ -236,7 +209,7 @@ def ping(self, opaque_data):
236209
:returns: Nothing
237210
"""
238211
self.connect()
239-
with self._write_lock:
212+
with self._lock:
240213
with self._conn as conn:
241214
conn.ping(to_bytestring(opaque_data))
242215
self._send_outstanding_data()
@@ -275,7 +248,7 @@ def request(self, method, url, body=None, headers=None):
275248
# If threads interleave these operations, it could result in messages
276249
# being sent in the wrong order, which can lead to the out-of-order
277250
# messages with lower stream IDs being closed prematurely.
278-
with self._write_lock:
251+
with self._lock:
279252
# Unlike HTTP/1.1, HTTP/2 (according to RFC 7540) doesn't require
280253
# to use absolute URI when proxying.
281254

@@ -483,10 +456,10 @@ def _send_outstanding_data(self, tolerate_peer_gone=False,
483456
send_empty=True):
484457
# Concurrency
485458
#
486-
# Hold _write_lock; getting and writing data from _conn is synchronized
459+
# Hold _lock; getting and writing data from _conn is synchronized
487460
#
488461
# I/O occurs while the lock is held; waiting threads will see a delay.
489-
with self._write_lock:
462+
with self._lock:
490463
with self._conn as conn:
491464
data = conn.data_to_send()
492465
if data or send_empty:
@@ -576,9 +549,9 @@ def endheaders(self, message_body=None, final=False, stream_id=None):
576549

577550
# Concurrency:
578551
#
579-
# Hold _write_lock: synchronize access to the connection's HPACK
552+
# Hold _lock: synchronize access to the connection's HPACK
580553
# encoder and decoder and the subsquent write to the connection
581-
with self._write_lock:
554+
with self._lock:
582555
stream.send_headers(headers_only)
583556

584557
# Send whatever data we have.
@@ -641,10 +614,10 @@ def _send_cb(self, data, tolerate_peer_gone=False):
641614
"""
642615
# Concurrency
643616
#
644-
# Hold _write_lock: ensures only writer at a time
617+
# Hold _lock: ensures only writer at a time
645618
#
646619
# I/O occurs while the lock is held; waiting threads will see a delay.
647-
with self._write_lock:
620+
with self._lock:
648621
try:
649622
self._sock.sendall(data)
650623
except socket.error as e:
@@ -659,12 +632,12 @@ def _adjust_receive_window(self, frame_len):
659632
"""
660633
# Concurrency
661634
#
662-
# Hold _write_lock; synchronize the window manager update and the
635+
# Hold _lock; synchronize the window manager update and the
663636
# subsequent potential write to the connection
664637
#
665638
# I/O may occur while the lock is held; waiting threads may see a
666639
# delay.
667-
with self._write_lock:
640+
with self._lock:
668641
increment = self.window_manager._handle_frame(frame_len)
669642

670643
if increment:
@@ -686,7 +659,7 @@ def _single_read(self):
686659
# Synchronizes reading the data
687660
#
688661
# I/O occurs while the lock is held; waiting threads will see a delay.
689-
with self._read_lock:
662+
with self._lock:
690663
if self._sock is None:
691664
raise ConnectionError('tried to read after connection close')
692665
self._sock.fill()
@@ -780,7 +753,7 @@ def _recv_cb(self, stream_id=0):
780753
# re-acquired in the calls to self._single_read.
781754
#
782755
# I/O occurs while the lock is held; waiting threads will see a delay.
783-
with self._read_lock:
756+
with self._lock:
784757
log.debug('recv for stream %d with %s already present',
785758
stream_id,
786759
self.recent_recv_streams)
@@ -831,11 +804,11 @@ def _send_rst_frame(self, stream_id, error_code):
831804
"""
832805
# Concurrency
833806
#
834-
# Hold _write_lock; synchronize generating the reset frame and writing
807+
# Hold _lock; synchronize generating the reset frame and writing
835808
# it
836809
#
837810
# I/O occurs while the lock is held; waiting threads will see a delay.
838-
with self._write_lock:
811+
with self._lock:
839812
with self._conn as conn:
840813
conn.reset_stream(stream_id, error_code=error_code)
841814
self._send_outstanding_data()

test/test_http11.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,10 @@ def read(self, size):
588588
assert 'File-like bodies must return bytestrings. ' \
589589
'Got: {}'.format(int) in str(exc_info)
590590

591+
def test_close_with_uninitialized_socket(self):
592+
c = HTTP11Connection('httpbin.org')
593+
c.close()
594+
591595

592596
class TestHTTP11Response(object):
593597
def test_short_circuit_read(self):

test/test_integration.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import pytest
1616
from contextlib import contextmanager
1717
from mock import patch
18+
from concurrent.futures import ThreadPoolExecutor, TimeoutError
1819
from h2.frame_buffer import FrameBuffer
1920
from hyper.compat import ssl
2021
from hyper.contrib import HTTP20Adapter
@@ -1142,7 +1143,6 @@ def test_version_after_http_upgrade(self):
11421143

11431144
def socket_handler(listener):
11441145
sock = listener.accept()[0]
1145-
11461146
# We should get the initial request.
11471147
data = b''
11481148
while not data.endswith(b'\r\n\r\n'):
@@ -1192,6 +1192,44 @@ def socket_handler(listener):
11921192

11931193
self.tear_down()
11941194

1195+
def test_connection_and_send_simultaneously(self):
1196+
# Since deadlock occurs probabilistic,
1197+
# It still has deadlock probability
1198+
# even the testcase is passed.
1199+
self.set_up()
1200+
1201+
recv_event = threading.Event()
1202+
1203+
def socket_handler(listener):
1204+
sock = listener.accept()[0]
1205+
1206+
receive_preamble(sock)
1207+
sock.recv(65535)
1208+
1209+
recv_event.set()
1210+
sock.close()
1211+
1212+
def do_req(conn):
1213+
conn.request('GET', '/')
1214+
recv_event.wait()
1215+
1216+
def do_connect(conn):
1217+
conn.connect()
1218+
1219+
self._start_server(socket_handler)
1220+
conn = self.get_connection()
1221+
1222+
pool = ThreadPoolExecutor(max_workers=2)
1223+
pool.submit(do_connect, conn)
1224+
f = pool.submit(do_req, conn)
1225+
1226+
try:
1227+
f.result(timeout=10)
1228+
except TimeoutError:
1229+
assert False
1230+
1231+
self.tear_down()
1232+
11951233

11961234
@patch('hyper.http20.connection.H2_NPN_PROTOCOLS', PROTOCOLS)
11971235
class TestRequestsAdapter(SocketLevelTest):

test_requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
pytest
1+
pytest>=2.7
22
pytest-xdist
33
pytest-cov
44
requests

0 commit comments

Comments
 (0)