Skip to content

Commit 411e60e

Browse files
authored
Merge pull request datastax#941 from datastax/python-697
PYTHON-697 asyncore reactor should use a global variable instead of a class v…
2 parents defd9eb + 6bce25c commit 411e60e

File tree

4 files changed

+47
-25
lines changed

4 files changed

+47
-25
lines changed

CHANGELOG.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Features
1515
* Allow filter queries with fields that have an index managed outside of cqlengine (PYTHON-966)
1616
* Twisted SSL Support (PYTHON-343)
1717
* Support IS NOT NULL operator in cqlengine (PYTHON-968)
18+
* Asyncore reactors should use a global variable instead of a class variable for the event loop (PYTHON-697)
1819

1920
Other
2021
-----

cassandra/io/asyncorereactor.py

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,9 @@
4343

4444
_dispatcher_map = {}
4545

46-
def _cleanup(loop_weakref):
47-
try:
48-
loop = loop_weakref()
49-
except ReferenceError:
50-
return
51-
52-
loop._cleanup()
46+
def _cleanup(loop):
47+
if loop:
48+
loop._cleanup()
5349

5450

5551
class WaitableTimer(Timer):
@@ -228,8 +224,6 @@ def __init__(self):
228224
dispatcher = _BusyWaitDispatcher()
229225
self._loop_dispatcher = dispatcher
230226

231-
atexit.register(partial(_cleanup, weakref.ref(self)))
232-
233227
def maybe_start(self):
234228
should_start = False
235229
did_acquire = False
@@ -299,40 +293,43 @@ def _cleanup(self):
299293
log.debug("Dispatchers were closed")
300294

301295

296+
_global_loop = None
297+
atexit.register(partial(_cleanup, _global_loop))
298+
299+
302300
class AsyncoreConnection(Connection, asyncore.dispatcher):
303301
"""
304302
An implementation of :class:`.Connection` that uses the ``asyncore``
305303
module in the Python standard library for its event loop.
306304
"""
307305

308-
_loop = None
309-
310306
_writable = False
311307
_readable = False
312308

313309
@classmethod
314310
def initialize_reactor(cls):
315-
if not cls._loop:
316-
cls._loop = AsyncoreLoop()
311+
global _global_loop
312+
if not _global_loop:
313+
_global_loop = AsyncoreLoop()
317314
else:
318315
current_pid = os.getpid()
319-
if cls._loop._pid != current_pid:
316+
if _global_loop._pid != current_pid:
320317
log.debug("Detected fork, clearing and reinitializing reactor state")
321318
cls.handle_fork()
322-
cls._loop = AsyncoreLoop()
319+
_global_loop = AsyncoreLoop()
323320

324321
@classmethod
325322
def handle_fork(cls):
326-
global _dispatcher_map
323+
global _dispatcher_map, _global_loop
327324
_dispatcher_map = {}
328-
if cls._loop:
329-
cls._loop._cleanup()
330-
cls._loop = None
325+
if _global_loop:
326+
_global_loop._cleanup()
327+
_global_loop = None
331328

332329
@classmethod
333330
def create_timer(cls, timeout, callback):
334331
timer = Timer(timeout, callback)
335-
cls._loop.add_timer(timer)
332+
_global_loop.add_timer(timer)
336333
return timer
337334

338335
def __init__(self, *args, **kwargs):
@@ -344,14 +341,14 @@ def __init__(self, *args, **kwargs):
344341
self._connect_socket()
345342

346343
# start the event loop if needed
347-
self._loop.maybe_start()
344+
_global_loop.maybe_start()
348345

349346
init_handler = WaitableTimer(
350347
timeout=0,
351348
callback=partial(asyncore.dispatcher.__init__,
352349
self, self._socket, _dispatcher_map)
353350
)
354-
self._loop.add_timer(init_handler)
351+
_global_loop.add_timer(init_handler)
355352
init_handler.wait(kwargs["connect_timeout"])
356353

357354
self._writable = True
@@ -451,7 +448,7 @@ def push(self, data):
451448
with self.deque_lock:
452449
self.deque.extend(chunks)
453450
self._writable = True
454-
self._loop.wake_loop()
451+
_global_loop.wake_loop()
455452

456453
def writable(self):
457454
return self._writable

tests/integration/standard/test_connection.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@
2020
from functools import partial
2121
from six.moves import range
2222
import sys
23+
import threading
2324
from threading import Thread, Event
2425
import time
25-
import weakref
2626

2727
from cassandra import ConsistencyLevel, OperationTimedOut
2828
from cassandra.cluster import NoHostAvailable, ConnectionShutdown, Cluster
29+
import cassandra.io.asyncorereactor
2930
from cassandra.io.asyncorereactor import AsyncoreConnection
3031
from cassandra.protocol import QueryMessage
3132
from cassandra.connection import Connection
@@ -401,6 +402,28 @@ def setUp(self):
401402
raise unittest.SkipTest("Can't test asyncore with monkey patching")
402403
ConnectionTests.setUp(self)
403404

405+
def test_subclasses_share_loop(self):
406+
class C1(AsyncoreConnection):
407+
pass
408+
409+
class C2(AsyncoreConnection):
410+
pass
411+
412+
cassandra.io.asyncorereactor._global_loop._cleanup()
413+
cassandra.io.asyncorereactor._global_loop = None
414+
415+
clusterC1 = Cluster(connection_class=C1)
416+
clusterC1.connect(wait_for_all_pools=True)
417+
418+
clusterC2 = Cluster(connection_class=C2)
419+
clusterC2.connect(wait_for_all_pools=True)
420+
self.addCleanup(clusterC1.shutdown)
421+
self.addCleanup(clusterC2.shutdown)
422+
423+
event_loops_threads = [thread for thread in threading.enumerate() if
424+
thread.name == "cassandra_driver_event_loop"]
425+
self.assertEqual(len(event_loops_threads), 1)
426+
404427

405428
class LibevConnectionTests(ConnectionTests, unittest.TestCase):
406429

tests/unit/io/test_asyncorereactor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
from mock import patch
2020
import socket
21+
import cassandra.io.asyncorereactor
2122
from cassandra.io.asyncorereactor import AsyncoreConnection
2223
from tests import is_monkey_patched
2324
from tests.unit.io.utils import ReactorTestMixin, TimerTestMixin, noop_if_monkey_patched
@@ -76,7 +77,7 @@ def create_timer(self):
7677

7778
@property
7879
def _timers(self):
79-
return self.connection._loop._timers
80+
return cassandra.io.asyncorereactor._global_loop._timers
8081

8182
def setUp(self):
8283
if is_monkey_patched():

0 commit comments

Comments
 (0)