Skip to content

Commit 97cbab0

Browse files
beltranmambocab
authored andcommitted
PYTHON-973: Use global variable for libev loops so it can be subclassed
1 parent 29e7f8f commit 97cbab0

File tree

2 files changed

+29
-24
lines changed

2 files changed

+29
-24
lines changed

CHANGELOG.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ Bug Fixes
44
---------
55
* Tokenmap.get_replicas returns the wrong value if token coincides with the end of the range (PYTHON-978)
66

7+
Other
8+
-----
9+
* Use global variable for libev loops so it can be subclassed (PYTHON-973)
10+
711
3.14.0
812
======
913
April 17, 2018

cassandra/io/libevreactor.py

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,9 @@
4141
log = logging.getLogger(__name__)
4242

4343

44-
def _cleanup(loop_weakref):
45-
try:
46-
loop = loop_weakref()
47-
except ReferenceError:
48-
return
49-
loop._cleanup()
44+
def _cleanup(loop):
45+
if loop:
46+
loop._cleanup()
5047

5148

5249
class LibevLoop(object):
@@ -84,8 +81,6 @@ def __init__(self):
8481
self._timers = TimerManager()
8582
self._loop_timer = libev.Timer(self._loop, self._on_loop_timer)
8683

87-
atexit.register(partial(_cleanup, weakref.ref(self)))
88-
8984
def maybe_start(self):
9085
should_start = False
9186
with self._lock:
@@ -228,36 +223,41 @@ def _loop_will_run(self, prepare):
228223
self._notifier.send()
229224

230225

226+
_global_loop = None
227+
atexit.register(partial(_cleanup, _global_loop))
228+
229+
231230
class LibevConnection(Connection):
232231
"""
233232
An implementation of :class:`.Connection` that uses libev for its event loop.
234233
"""
235-
_libevloop = None
236234
_write_watcher_is_active = False
237235
_read_watcher = None
238236
_write_watcher = None
239237
_socket = None
240238

241239
@classmethod
242240
def initialize_reactor(cls):
243-
if not cls._libevloop:
244-
cls._libevloop = LibevLoop()
241+
global _global_loop
242+
if not _global_loop:
243+
_global_loop = LibevLoop()
245244
else:
246-
if cls._libevloop._pid != os.getpid():
245+
if _global_loop._pid != os.getpid():
247246
log.debug("Detected fork, clearing and reinitializing reactor state")
248247
cls.handle_fork()
249-
cls._libevloop = LibevLoop()
248+
_global_loop = LibevLoop()
250249

251250
@classmethod
252251
def handle_fork(cls):
253-
if cls._libevloop:
254-
cls._libevloop._cleanup()
255-
cls._libevloop = None
252+
global _global_loop
253+
if _global_loop:
254+
_global_loop._cleanup()
255+
_global_loop = None
256256

257257
@classmethod
258258
def create_timer(cls, timeout, callback):
259259
timer = Timer(timeout, callback)
260-
cls._libevloop.add_timer(timer)
260+
_global_loop.add_timer(timer)
261261
return timer
262262

263263
def __init__(self, *args, **kwargs):
@@ -268,16 +268,16 @@ def __init__(self, *args, **kwargs):
268268
self._connect_socket()
269269
self._socket.setblocking(0)
270270

271-
with self._libevloop._lock:
272-
self._read_watcher = libev.IO(self._socket.fileno(), libev.EV_READ, self._libevloop._loop, self.handle_read)
273-
self._write_watcher = libev.IO(self._socket.fileno(), libev.EV_WRITE, self._libevloop._loop, self.handle_write)
271+
with _global_loop._lock:
272+
self._read_watcher = libev.IO(self._socket.fileno(), libev.EV_READ, _global_loop._loop, self.handle_read)
273+
self._write_watcher = libev.IO(self._socket.fileno(), libev.EV_WRITE, _global_loop._loop, self.handle_write)
274274

275275
self._send_options_message()
276276

277-
self._libevloop.connection_created(self)
277+
_global_loop.connection_created(self)
278278

279279
# start the global event loop if needed
280-
self._libevloop.maybe_start()
280+
_global_loop.maybe_start()
281281

282282
def close(self):
283283
with self.lock:
@@ -286,7 +286,8 @@ def close(self):
286286
self.is_closed = True
287287

288288
log.debug("Closing connection (%s) to %s", id(self), self.host)
289-
self._libevloop.connection_destroyed(self)
289+
290+
_global_loop.connection_destroyed(self)
290291
self._socket.close()
291292
log.debug("Closed socket to %s", self.host)
292293

@@ -367,4 +368,4 @@ def push(self, data):
367368

368369
with self._deque_lock:
369370
self.deque.extend(chunks)
370-
self._libevloop.notify()
371+
_global_loop.notify()

0 commit comments

Comments
 (0)