Skip to content

Commit 71d1227

Browse files
committed
PYTHON-2115 Remove threading.Lock() from SocketChecker
1 parent 9cc3652 commit 71d1227

File tree

3 files changed

+14
-34
lines changed

3 files changed

+14
-34
lines changed

pymongo/pool.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,7 @@ def __init__(self, sock, pool, address, id):
487487
self.enabled_for_cmap = pool.enabled_for_cmap
488488
self.compression_settings = pool.opts.compression_settings
489489
self.compression_context = None
490+
self.socket_checker = SocketChecker()
490491

491492
# The pool's generation changes with each reset() so we can close
492493
# sockets created before the last reset.
@@ -752,6 +753,10 @@ def close_socket(self, reason):
752753
self.listeners.publish_connection_closed(
753754
self.address, self.id, reason)
754755

756+
def socket_closed(self):
757+
"""Return True if we know socket has been closed, False otherwise."""
758+
return self.socket_checker.socket_closed(self.sock)
759+
755760
def send_cluster_time(self, command, session, client):
756761
"""Add cluster time for MongoDB >= 3.6."""
757762
if self.max_wire_version >= 6 and client:
@@ -976,7 +981,6 @@ def __init__(self, address, options, handshake=True):
976981

977982
self._socket_semaphore = thread_util.create_semaphore(
978983
self.opts.max_pool_size, max_waiters)
979-
self.socket_checker = SocketChecker()
980984
if self.enabled_for_cmap:
981985
self.opts.event_listeners.publish_pool_created(
982986
self.address, self.opts.non_default_options)
@@ -1244,7 +1248,7 @@ def _perished(self, sock_info):
12441248
if (self._check_interval_seconds is not None and (
12451249
0 == self._check_interval_seconds or
12461250
idle_time_seconds > self._check_interval_seconds)):
1247-
if self.socket_checker.socket_closed(sock_info.sock):
1251+
if sock_info.socket_closed():
12481252
sock_info.close_socket(ConnectionClosedReason.ERROR)
12491253
return True
12501254

pymongo/socket_checker.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
import errno
1818
import select
19-
import threading
2019

2120
_HAVE_POLL = hasattr(select, "poll")
2221
_SelectError = getattr(select, "error", OSError)
@@ -34,10 +33,8 @@ class SocketChecker(object):
3433

3534
def __init__(self):
3635
if _HAVE_POLL:
37-
self._lock = threading.Lock()
3836
self._poller = select.poll()
3937
else:
40-
self._lock = None
4138
self._poller = None
4239

4340
def select(self, sock, read=False, write=False, timeout=0):
@@ -50,14 +47,13 @@ def select(self, sock, read=False, write=False, timeout=0):
5047
mask = mask | select.POLLIN | select.POLLPRI
5148
if write:
5249
mask = mask | select.POLLOUT
53-
with self._lock:
54-
self._poller.register(sock, mask)
55-
try:
56-
# poll() timeout is in milliseconds. select()
57-
# timeout is in seconds.
58-
res = self._poller.poll(timeout * 1000)
59-
finally:
60-
self._poller.unregister(sock)
50+
self._poller.register(sock, mask)
51+
try:
52+
# poll() timeout is in milliseconds. select()
53+
# timeout is in seconds.
54+
res = self._poller.poll(timeout * 1000)
55+
finally:
56+
self._poller.unregister(sock)
6157
else:
6258
rlist = [sock] if read else []
6359
wlist = [sock] if write else []

test/test_pooling.py

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,7 @@ def test_pool_removes_dead_socket(self):
236236
# Simulate a closed socket without telling the SocketInfo it's
237237
# closed.
238238
sock_info.sock.close()
239-
self.assertTrue(
240-
cx_pool.socket_checker.socket_closed(sock_info.sock))
239+
self.assertTrue(sock_info.socket_closed())
241240

242241
with cx_pool.get_socket({}) as new_sock_info:
243242
self.assertEqual(0, len(cx_pool.sockets))
@@ -257,25 +256,6 @@ def test_socket_closed(self):
257256
s.close()
258257
self.assertTrue(socket_checker.socket_closed(s))
259258

260-
def test_socket_closed_thread_safe(self):
261-
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
262-
s.connect((client_context.host, client_context.port))
263-
self.addCleanup(s.close)
264-
socket_checker = SocketChecker()
265-
266-
def check_socket():
267-
for _ in range(1000):
268-
self.assertFalse(socket_checker.socket_closed(s))
269-
270-
threads = []
271-
for i in range(3):
272-
thread = threading.Thread(target=check_socket)
273-
thread.start()
274-
threads.append(thread)
275-
276-
for thread in threads:
277-
thread.join()
278-
279259
def test_return_socket_after_reset(self):
280260
pool = self.create_pool()
281261
with pool.get_socket({}) as sock:

0 commit comments

Comments
 (0)