Skip to content

Commit 4ffe500

Browse files
committed
Add error handling to background threads
Added error resiliency to the server and client threads, so that exceptions which occur are logged and the thread continues to run. Additionally, fixed the logging handler in the server plugin so that stack traces for errors are added to the output. Fixes: #5 Change-Id: I6d875a1b3f17f35d29cf7c28297967e0f5045d64
1 parent b7c8bdc commit 4ffe500

File tree

7 files changed

+193
-15
lines changed

7 files changed

+193
-15
lines changed
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import threading
2+
import unittest
3+
4+
import mock
5+
6+
from .. import worker
7+
8+
9+
class WorkerTest(unittest.TestCase):
10+
def test_worker_resilency(self):
11+
canary = mock.Mock()
12+
13+
collection_one = ["one", KeyError("random internal error"), "three"]
14+
collection_two = ["one", "two", SystemExit()]
15+
16+
def send(collection, collection_target, now, interval, pid):
17+
obj = collection.pop(0)
18+
if isinstance(obj, BaseException):
19+
raise obj
20+
else:
21+
canary.send(collection_target, obj)
22+
23+
the_time = [100]
24+
25+
mutex = threading.Lock()
26+
27+
# worker thread will call this at the top of its main loop.
28+
def start_loop():
29+
# still supporting Python 2, in Py3k only can instead use
30+
# nonlocal for "the_time"
31+
mutex.acquire()
32+
try:
33+
return the_time[0]
34+
finally:
35+
the_time[0] += 5
36+
mutex.release()
37+
38+
with mock.patch.object(
39+
worker, "log"
40+
) as mock_logger, mock.patch.object(
41+
worker.time, "time", mock.Mock(side_effect=start_loop)
42+
), mock.patch.object(
43+
worker.time, "sleep"
44+
):
45+
mutex.acquire()
46+
try:
47+
# this adds the target and also starts the worker thread.
48+
# however we have it blocked from doing anything via the
49+
# mutex above...
50+
worker.add_target(
51+
collection_one, "target one", mock.Mock(send=send)
52+
)
53+
54+
# ...so that we can also add this target and get deterministic
55+
# results
56+
worker.add_target(
57+
collection_two, "target two", mock.Mock(send=send)
58+
)
59+
finally:
60+
# worker thread is unblocked
61+
mutex.release()
62+
63+
# now wait, it will hit the SystemExit and exit.
64+
# if it times out, we failed.
65+
worker._WORKER_THREAD.join(1)
66+
67+
# see that it did what we asked.
68+
self.assertEqual(
69+
[
70+
mock.call.send("target one", "one"),
71+
mock.call.send("target two", "one"),
72+
mock.call.send("target two", "two"),
73+
mock.call.send("target one", "three"),
74+
],
75+
canary.mock_calls,
76+
)
77+
78+
self.assertEqual(
79+
[
80+
mock.call.info("Starting process thread in pid %s", mock.ANY),
81+
mock.call.error("error sending stats", exc_info=True),
82+
mock.call.info(
83+
"message sender thread caught SystemExit "
84+
"exception, exiting"
85+
),
86+
],
87+
mock_logger.mock_calls,
88+
)

sqlalchemy_collectd/client/worker.py

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,30 @@ def _process(interval):
2727
pid = os.getpid()
2828
log.info("Starting process thread in pid %s", pid)
2929

30-
while True:
31-
now = time.time()
32-
for (
33-
collection_target,
34-
connection,
35-
sender,
36-
last_called,
37-
) in _collection_targets:
38-
if now - last_called[0] > interval:
39-
last_called[0] = now
40-
sender.send(connection, collection_target, now, interval, pid)
41-
42-
time.sleep(0.2)
30+
try:
31+
while True:
32+
now = time.time()
33+
for (
34+
collection_target,
35+
connection,
36+
sender,
37+
last_called,
38+
) in _collection_targets:
39+
if now - last_called[0] > interval:
40+
last_called[0] = now
41+
try:
42+
sender.send(
43+
connection, collection_target, now, interval, pid
44+
)
45+
except Exception:
46+
log.error("error sending stats", exc_info=True)
47+
48+
time.sleep(0.2)
49+
except BaseException as be:
50+
log.info(
51+
"message sender thread caught %s exception, exiting"
52+
% type(be).__name__
53+
)
4354

4455

4556
def add_target(connection, collection_target, sender):

sqlalchemy_collectd/server/listener.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,25 @@
1+
import logging
12
import threading
23

4+
log = logging.getLogger(__name__)
5+
36

47
def _receive(connection, receiver):
58
while True:
6-
receiver.receive(connection)
9+
try:
10+
receiver.receive(connection)
11+
except Exception:
12+
log.error("message receiver caught an exception", exc_info=True)
13+
except BaseException as be:
14+
log.info(
15+
"message receiver thread caught %s exception, exiting"
16+
% type(be).__name__
17+
)
18+
break
719

820

921
def listen(connection, receiver):
22+
global listen_thread
1023
listen_thread = threading.Thread(
1124
target=_receive, args=(connection, receiver)
1225
)

sqlalchemy_collectd/server/plugin.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ class CollectdHandler(logging.Handler):
2727

2828
def emit(self, record):
2929
fn = self.levels[record.levelno]
30-
fn(record.msg % record.args)
30+
record.msg = "[sqlalchemy-collectd] " + record.msg
31+
fn(self.format(record))
3132

3233

3334
def get_config(config):
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import unittest
2+
3+
import mock
4+
5+
from .. import listener
6+
7+
8+
class ListenerTest(unittest.TestCase):
9+
def test_receive_resilency(self):
10+
canary = mock.Mock()
11+
12+
collection = [
13+
"one",
14+
"two",
15+
"three",
16+
KeyError("random internal error"),
17+
"four",
18+
SystemExit(),
19+
"five",
20+
]
21+
22+
def receive(connection):
23+
24+
obj = collection.pop(0)
25+
if isinstance(obj, BaseException):
26+
raise obj
27+
else:
28+
canary.receive(obj)
29+
30+
with mock.patch.object(listener, "log") as mock_logger:
31+
listener.listen(mock.Mock(), mock.Mock(receive=receive))
32+
33+
listener.listen_thread.join(1)
34+
35+
# call "five" doesn't happen because we should have exited
36+
self.assertEqual(
37+
[
38+
mock.call.receive("one"),
39+
mock.call.receive("two"),
40+
mock.call.receive("three"),
41+
mock.call.receive("four"),
42+
],
43+
canary.mock_calls,
44+
)
45+
self.assertEqual(
46+
[
47+
mock.call.error(
48+
"message receiver caught an exception", exc_info=True
49+
),
50+
mock.call.info(
51+
"message receiver thread caught SystemExit "
52+
"exception, exiting"
53+
),
54+
],
55+
mock_logger.mock_calls,
56+
)

tox.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ deps=
2929
flake8-builtins
3030
flake8-docstrings
3131
flake8-rst-docstrings
32+
pydocstyle<4.0.0
3233
# used by flake8-rst-docstrings
3334
pygments
3435
commands = flake8 ./sqlalchemy_collectd/ setup.py

unreleased_changes/5.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
.. change::
2+
:tags: bug
3+
:tickets: 5
4+
5+
Added error resiliency to the server and client threads, so that exceptions
6+
which occur are logged and the thread continues to run. Additionally, fixed
7+
the logging handler in the server plugin so that stack traces for errors
8+
are added to the output.

0 commit comments

Comments
 (0)