Skip to content

Commit b811752

Browse files
authored
Fix race condition (#946)
Naveed will recognize this: we were trying to help Bijan, and this was a red herring along the way. The problem sequence of events is this: - We create a new `_ModuleHandler` (line 67), which constructs the `_SingletonEventLoopThread` (line 73). - Creating that singleton sets `self._loop` to None (old line 36), then spawns a separate thread (line 39) which would properly initialize `self._loop` (old line 42). - Before the separate thread can run, though, we call `module_handler.emit()` (line 85), which calls `self._worker.get_loop()` (line 94), which throws an exception because the loop isn't initialized yet, because the second thread hasn't had a chance to run yet. With the changes in the first commit, the loop is initialized immediately and merely run in the other thread. That way, other things (like `emit()`) can enqueue new coroutines in the loop before it starts running, and they'll execute shortly thereafter when the other thread gets going. While I was at this, I noticed that we were using [a non-thread-safe](https://docs.python.org/3/library/asyncio-sync.html#event) `asyncio.Event` to signal between multiple threads, so I changed it to use a `threading.Event` instead. That's in a separate commit, in case that turns out to be a bad idea (I still don't have enough experience with asyncio to feel confident about that stuff).
1 parent e41f0d4 commit b811752

File tree

1 file changed

+9
-8
lines changed

1 file changed

+9
-8
lines changed

src/viam/logging.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from copy import copy
55
from datetime import datetime
66
from logging import DEBUG, ERROR, FATAL, INFO, WARN, WARNING # noqa: F401
7-
from threading import Lock, Thread
7+
from threading import Event, Lock, Thread
88
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Union
99

1010
from grpclib.exceptions import StreamTerminatedError
@@ -23,8 +23,10 @@
2323
class _SingletonEventLoopThread:
2424
_instance = None
2525
_lock = Lock()
26-
_ready_event = asyncio.Event()
27-
_loop: Union[asyncio.AbstractEventLoop, None]
26+
# We use a threading.Event instead of an asyncio.Event because the latter are not thread safe,
27+
# and this is set in a separate thread than it is waited on.
28+
_ready_event = Event()
29+
_loop: asyncio.AbstractEventLoop
2830
_thread: Thread
2931

3032
def __new__(cls):
@@ -33,13 +35,12 @@ def __new__(cls):
3335
with cls._lock:
3436
if cls._instance is None:
3537
cls._instance = super(_SingletonEventLoopThread, cls).__new__(cls)
36-
cls._instance._loop = None
38+
cls._instance._loop = asyncio.new_event_loop()
3739
cls._instance._thread = Thread(target=cls._instance._run)
3840
cls._instance._thread.start()
3941
return cls._instance
4042

4143
def _run(self):
42-
self._loop = asyncio.new_event_loop()
4344
asyncio.set_event_loop(self._loop)
4445
self._ready_event.set()
4546
self._loop.run_forever()
@@ -54,8 +55,8 @@ def get_loop(self):
5455
raise RuntimeError("Event loop is None. Did you call .start() and .wait_until_ready()?")
5556
return self._loop
5657

57-
async def wait_until_ready(self):
58-
await self._ready_event.wait()
58+
def wait_until_ready(self):
59+
self._ready_event.wait()
5960

6061

6162
class _ModuleHandler(logging.Handler):
@@ -101,7 +102,7 @@ def emit(self, record: logging.LogRecord):
101102
self._logger.log(record.levelno, message)
102103

103104
async def _asynchronously_emit(self, record: logging.LogRecord, name: str, message: str, stack: str, time: datetime):
104-
await self._worker.wait_until_ready()
105+
self._worker.wait_until_ready()
105106
task = self._worker.get_loop().create_task(
106107
self._parent.log(name, record.levelname, time, message, stack),
107108
name=f"{viam._TASK_PREFIX}-LOG-{record.created}",

0 commit comments

Comments
 (0)