Skip to content

Commit a726c6e

Browse files
authored
Merge pull request datastax#950 from datastax/python-973
PYTHON-973: Use global variable for libev loops so it can be subclassed
2 parents 29e7f8f + 5452e1e commit a726c6e

File tree

4 files changed

+67
-48
lines changed

4 files changed

+67
-48
lines changed

CHANGELOG.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ Bug Fixes
44
---------
55
* Tokenmap.get_replicas returns the wrong value if token coincides with the end of the range (PYTHON-978)
66

7+
Other
8+
-----
9+
* Use global variable for libev loops so it can be subclassed (PYTHON-973)
10+
711
3.14.0
812
======
913
April 17, 2018

cassandra/io/libevreactor.py

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,9 @@
4141
log = logging.getLogger(__name__)
4242

4343

44-
def _cleanup(loop_weakref):
45-
try:
46-
loop = loop_weakref()
47-
except ReferenceError:
48-
return
49-
loop._cleanup()
44+
def _cleanup(loop):
45+
if loop:
46+
loop._cleanup()
5047

5148

5249
class LibevLoop(object):
@@ -84,8 +81,6 @@ def __init__(self):
8481
self._timers = TimerManager()
8582
self._loop_timer = libev.Timer(self._loop, self._on_loop_timer)
8683

87-
atexit.register(partial(_cleanup, weakref.ref(self)))
88-
8984
def maybe_start(self):
9085
should_start = False
9186
with self._lock:
@@ -228,36 +223,41 @@ def _loop_will_run(self, prepare):
228223
self._notifier.send()
229224

230225

226+
_global_loop = None
227+
atexit.register(partial(_cleanup, _global_loop))
228+
229+
231230
class LibevConnection(Connection):
232231
"""
233232
An implementation of :class:`.Connection` that uses libev for its event loop.
234233
"""
235-
_libevloop = None
236234
_write_watcher_is_active = False
237235
_read_watcher = None
238236
_write_watcher = None
239237
_socket = None
240238

241239
@classmethod
242240
def initialize_reactor(cls):
243-
if not cls._libevloop:
244-
cls._libevloop = LibevLoop()
241+
global _global_loop
242+
if not _global_loop:
243+
_global_loop = LibevLoop()
245244
else:
246-
if cls._libevloop._pid != os.getpid():
245+
if _global_loop._pid != os.getpid():
247246
log.debug("Detected fork, clearing and reinitializing reactor state")
248247
cls.handle_fork()
249-
cls._libevloop = LibevLoop()
248+
_global_loop = LibevLoop()
250249

251250
@classmethod
252251
def handle_fork(cls):
253-
if cls._libevloop:
254-
cls._libevloop._cleanup()
255-
cls._libevloop = None
252+
global _global_loop
253+
if _global_loop:
254+
_global_loop._cleanup()
255+
_global_loop = None
256256

257257
@classmethod
258258
def create_timer(cls, timeout, callback):
259259
timer = Timer(timeout, callback)
260-
cls._libevloop.add_timer(timer)
260+
_global_loop.add_timer(timer)
261261
return timer
262262

263263
def __init__(self, *args, **kwargs):
@@ -268,16 +268,16 @@ def __init__(self, *args, **kwargs):
268268
self._connect_socket()
269269
self._socket.setblocking(0)
270270

271-
with self._libevloop._lock:
272-
self._read_watcher = libev.IO(self._socket.fileno(), libev.EV_READ, self._libevloop._loop, self.handle_read)
273-
self._write_watcher = libev.IO(self._socket.fileno(), libev.EV_WRITE, self._libevloop._loop, self.handle_write)
271+
with _global_loop._lock:
272+
self._read_watcher = libev.IO(self._socket.fileno(), libev.EV_READ, _global_loop._loop, self.handle_read)
273+
self._write_watcher = libev.IO(self._socket.fileno(), libev.EV_WRITE, _global_loop._loop, self.handle_write)
274274

275275
self._send_options_message()
276276

277-
self._libevloop.connection_created(self)
277+
_global_loop.connection_created(self)
278278

279279
# start the global event loop if needed
280-
self._libevloop.maybe_start()
280+
_global_loop.maybe_start()
281281

282282
def close(self):
283283
with self.lock:
@@ -286,7 +286,8 @@ def close(self):
286286
self.is_closed = True
287287

288288
log.debug("Closing connection (%s) to %s", id(self), self.host)
289-
self._libevloop.connection_destroyed(self)
289+
290+
_global_loop.connection_destroyed(self)
290291
self._socket.close()
291292
log.debug("Closed socket to %s", self.host)
292293

@@ -367,4 +368,4 @@ def push(self, data):
367368

368369
with self._deque_lock:
369370
self.deque.extend(chunks)
370-
self._libevloop.notify()
371+
_global_loop.notify()

tests/integration/standard/test_connection.py

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
requiresmallclockgranularity, greaterthancass20
3939
try:
4040
from cassandra.io.libevreactor import LibevConnection
41+
import cassandra.io.libevreactor
4142
except ImportError:
4243
LibevConnection = None
4344

@@ -392,26 +393,13 @@ def test_connect_timeout(self):
392393
break
393394
self.assertTrue(exception_thrown)
394395

395-
396-
class AsyncoreConnectionTests(ConnectionTests, unittest.TestCase):
397-
398-
klass = AsyncoreConnection
399-
400-
def setUp(self):
401-
if is_monkey_patched():
402-
raise unittest.SkipTest("Can't test asyncore with monkey patching")
403-
ConnectionTests.setUp(self)
404-
405396
def test_subclasses_share_loop(self):
406397
class C1(AsyncoreConnection):
407398
pass
408399

409400
class C2(AsyncoreConnection):
410401
pass
411402

412-
cassandra.io.asyncorereactor._global_loop._cleanup()
413-
cassandra.io.asyncorereactor._global_loop = None
414-
415403
clusterC1 = Cluster(connection_class=C1)
416404
clusterC1.connect(wait_for_all_pools=True)
417405

@@ -420,14 +408,35 @@ class C2(AsyncoreConnection):
420408
self.addCleanup(clusterC1.shutdown)
421409
self.addCleanup(clusterC2.shutdown)
422410

423-
event_loops_threads = [thread for thread in threading.enumerate() if
424-
thread.name == "cassandra_driver_event_loop"]
425-
self.assertEqual(len(event_loops_threads), 1)
411+
self.assertEqual(len(get_eventloop_threads(self.event_loop_name)), 1)
412+
413+
414+
def get_eventloop_threads(name):
415+
import threading
416+
event_loops_threads = [thread for thread in threading.enumerate() if name == thread.name]
417+
418+
return event_loops_threads
419+
420+
421+
class AsyncoreConnectionTests(ConnectionTests, unittest.TestCase):
422+
423+
klass = AsyncoreConnection
424+
event_loop_name = "cassandra_driver_event_loop"
425+
426+
def setUp(self):
427+
if is_monkey_patched():
428+
raise unittest.SkipTest("Can't test asyncore with monkey patching")
429+
ConnectionTests.setUp(self)
430+
431+
def clean_global_loop(self):
432+
cassandra.io.asyncorereactor._global_loop._cleanup()
433+
cassandra.io.asyncorereactor._global_loop = None
426434

427435

428436
class LibevConnectionTests(ConnectionTests, unittest.TestCase):
429437

430438
klass = LibevConnection
439+
event_loop_name = "event_loop"
431440

432441
def setUp(self):
433442
if is_monkey_patched():
@@ -436,3 +445,7 @@ def setUp(self):
436445
raise unittest.SkipTest(
437446
'libev does not appear to be installed properly')
438447
ConnectionTests.setUp(self)
448+
449+
def clean_global_loop(self):
450+
cassandra.io.libevreactor._global_loop._cleanup()
451+
cassandra.io.libevreactor._global_loop = None

tests/unit/io/test_libevreactor.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ class LibevConnectionTest(ReactorTestMixin, unittest.TestCase):
3535

3636
connection_class = LibevConnection
3737
socket_attr_name = '_socket'
38-
loop_attr_name = '_libevloop'
3938
null_handle_function_args = None, 0
4039

4140
def setUp(self):
@@ -62,7 +61,7 @@ def test_watchers_are_finished(self):
6261
Test for asserting that watchers are closed in LibevConnection
6362
6463
This test simulates a process termination without calling cluster.shutdown(), which would trigger
65-
LibevConnection._libevloop._cleanup. It will check the watchers have been closed
64+
_global_loop._cleanup. It will check the watchers have been closed
6665
Finally it will restore the LibevConnection reactor so it doesn't affect
6766
the rest of the tests
6867
@@ -72,24 +71,25 @@ def test_watchers_are_finished(self):
7271
7372
@test_category connection
7473
"""
75-
with patch.object(LibevConnection._libevloop, "_thread"),\
76-
patch.object(LibevConnection._libevloop, "notify"):
74+
from cassandra.io.libevreactor import _global_loop
75+
with patch.object(_global_loop, "_thread"),\
76+
patch.object(_global_loop, "notify"):
7777

7878
self.make_connection()
7979

8080
# We have to make a copy because the connections shouldn't
8181
# be alive when we verify them
82-
live_connections = set(LibevConnection._libevloop._live_conns)
82+
live_connections = set(_global_loop._live_conns)
8383

8484
# This simulates the process ending without cluster.shutdown()
8585
# being called, then with atexit _cleanup for libevreactor would
8686
# be called
87-
libev__cleanup(weakref.ref(LibevConnection._libevloop))
87+
libev__cleanup(_global_loop)
8888
for conn in live_connections:
8989
self.assertTrue(conn._write_watcher.stop.mock_calls)
9090
self.assertTrue(conn._read_watcher.stop.mock_calls)
9191

92-
LibevConnection._libevloop._shutdown = False
92+
_global_loop._shutdown = False
9393

9494

9595
class LibevTimerPatcher(unittest.TestCase):
@@ -125,7 +125,8 @@ def create_timer(self):
125125

126126
@property
127127
def _timers(self):
128-
return self.connection._libevloop._timers
128+
from cassandra.io.libevreactor import _global_loop
129+
return _global_loop._timers
129130

130131
def make_connection(self):
131132
c = LibevConnection('1.2.3.4', cql_version='3.0.1')

0 commit comments

Comments
 (0)