Skip to content

Commit c590560

Browse files
authored
Merge pull request #59 from fluent/save-last-error
Save last error
2 parents a8a08b0 + c64af02 commit c590560

File tree

7 files changed

+162
-17
lines changed

7 files changed

+162
-17
lines changed

.travis.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
sudo: false
22
language: python
33
python:
4-
- "2.6"
54
- "2.7"
65
- "3.2"
76
- "3.3"

README.rst

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,60 @@ To quickly test your setup, add a matcher that logs to the stdout:
6060
Usage
6161
-----
6262

63+
FluentSender Interface
64+
~~~~~~~~~~~~~~~~~~~~~~
65+
66+
`sender.FluentSender` is a structured event logger for Fluentd.
67+
68+
By default, the logger assumes fluentd daemon is launched locally. You
69+
can also specify remote logger by passing the options.
70+
71+
.. code:: python
72+
73+
from fluent import sender
74+
75+
# for local fluent
76+
logger = sender.FluentSender('app')
77+
78+
# for remote fluent
79+
logger = sender.FluentSender('app', host='host', port=24224)
80+
81+
For sending event, call `emit` method with your event. Following example will send the event to
82+
fluentd, with tag 'app.follow' and the attributes 'from' and 'to'.
83+
84+
.. code:: python
85+
86+
# Use current time
87+
logger.emit('follow', {'from': 'userA', 'to': 'userB'})
88+
89+
# Specify optional time
90+
cur_time = int(time.time())
91+
logger.emit_with_time('follow', cur_time, {'from': 'userA', 'to':'userB'})
92+
93+
You can detect an error via return value of `emit`. If an error happens in `emit`, `emit` returns `False` and get an error object using `last_error` method.
94+
95+
.. code:: python
96+
97+
if not logger.emit('follow', {'from': 'userA', 'to': 'userB'}):
98+
print(logger.last_error)
99+
logger.clear_last_error() # clear stored error after handled errors
100+
101+
If you want to shutdown the client, call `close()` method.
102+
103+
.. code:: python
104+
105+
logger.close()
106+
63107
Event-Based Interface
64108
~~~~~~~~~~~~~~~~~~~~~
65109

66-
First, you need to call ``logger.setup()`` to create global logger
110+
This API is a wrapper for `sender.FluentSender`.
111+
112+
First, you need to call ``sender.setup()`` to create global `sender.FluentSender` logger
67113
instance. This call needs to be called only once, at the beggining of
68114
the application for example.
69115

70-
By default, the logger assumes fluentd daemon is launched locally. You
71-
can also specify remote logger by passing the options.
116+
Initialization code of Event-Based API is below:
72117

73118
.. code:: python
74119
@@ -81,7 +126,7 @@ can also specify remote logger by passing the options.
81126
sender.setup('app', host='host', port=24224)
82127
83128
Then, please create the events like this. This will send the event to
84-
fluent, with tag 'app.follow' and the attributes 'from' and 'to'.
129+
fluentd, with tag 'app.follow' and the attributes 'from' and 'to'.
85130

86131
.. code:: python
87132
@@ -93,11 +138,14 @@ fluent, with tag 'app.follow' and the attributes 'from' and 'to'.
93138
'to': 'userB'
94139
})
95140
96-
If you want to shutdown the client, call `close()` method.
141+
`event.Event` has one limitation which can't return success/failure result.
142+
143+
Other methods for Event-Based Interface.
97144

98145
.. code:: python
99146
100-
sender.close()
147+
sender.get_global_sender # get instance of global sender
148+
sender.close # Call FluentSender#close
101149
102150
Handler for buffer overflow
103151
~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -114,7 +162,7 @@ You can inject your own custom proc to handle buffer overflow in the event of co
114162
for unpacked in unpacker:
115163
print(unpacked)
116164
117-
sender.setup('app', host='host', port=24224, buffer_overflow_handler=handler)
165+
logger = sender.FluentSender('app', host='host', port=24224, buffer_overflow_handler=handler)
118166
119167
You should handle any exception in handler. fluent-logger ignores exceptions from ``buffer_overflow_handler``.
120168

fluent/handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def __init__(self,
106106

107107
def emit(self, record):
108108
data = self.format(record)
109-
self.sender.emit(None, data)
109+
return self.sender.emit(None, data)
110110

111111
def close(self):
112112
self.acquire()

fluent/sender.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def __init__(self,
5252
self.socket = None
5353
self.pendings = None
5454
self.lock = threading.Lock()
55+
self._last_error_threadlocal = threading.local()
5556

5657
try:
5758
self._reconnect()
@@ -61,17 +62,18 @@ def __init__(self,
6162

6263
def emit(self, label, data):
6364
cur_time = int(time.time())
64-
self.emit_with_time(label, cur_time, data)
65+
return self.emit_with_time(label, cur_time, data)
6566

6667
def emit_with_time(self, label, timestamp, data):
6768
try:
6869
bytes_ = self._make_packet(label, timestamp, data)
69-
except Exception:
70+
except Exception as e:
71+
self.last_error = e
7072
bytes_ = self._make_packet(label, timestamp,
7173
{"level": "CRITICAL",
7274
"message": "Can't output to log",
7375
"traceback": traceback.format_exc()})
74-
self._send(bytes_)
76+
return self._send(bytes_)
7577

7678
def close(self):
7779
self.lock.acquire()
@@ -101,7 +103,7 @@ def _make_packet(self, label, timestamp, data):
101103
def _send(self, bytes_):
102104
self.lock.acquire()
103105
try:
104-
self._send_internal(bytes_)
106+
return self._send_internal(bytes_)
105107
finally:
106108
self.lock.release()
107109

@@ -116,16 +118,24 @@ def _send_internal(self, bytes_):
116118

117119
# send finished
118120
self.pendings = None
121+
122+
return True
119123
except socket.error as e:
124+
#except Exception as e:
125+
self.last_error = e
126+
120127
# close socket
121128
self._close()
129+
122130
# clear buffer if it exceeds max bufer size
123131
if self.pendings and (len(self.pendings) > self.bufmax):
124132
self._call_buffer_overflow_handler(self.pendings)
125133
self.pendings = None
126134
else:
127135
self.pendings = bytes_
128136

137+
return False
138+
129139
def _send_data(self, bytes_):
130140
# reconnect if possible
131141
self._reconnect()
@@ -152,6 +162,18 @@ def _call_buffer_overflow_handler(self, pending_events):
152162
# User should care any exception in handler
153163
pass
154164

165+
@property
166+
def last_error(self):
167+
return getattr(self._last_error_threadlocal, 'exception', None)
168+
169+
@last_error.setter
170+
def last_error(self, err):
171+
self._last_error_threadlocal.exception = err
172+
173+
def clear_last_error(self, _thread_id = None):
174+
if hasattr(self._last_error_threadlocal, 'exception'):
175+
delattr(self._last_error_threadlocal, 'exception')
176+
155177
def _close(self):
156178
if self.socket:
157179
self.socket.close()

tests/test_event.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,64 @@
33
import unittest
44

55
from fluent import event, sender
6+
from tests import mockserver
67

8+
class TestException(BaseException): pass
79

8-
sender.setup(server='localhost', tag='app')
10+
class TestEvent(unittest.TestCase):
11+
def setUp(self):
12+
self._server = mockserver.MockRecvServer('localhost')
13+
sender.setup('app', port=self._server.port)
914

15+
def tearDown(self):
16+
from fluent.sender import _set_global_sender
17+
sender.close()
18+
_set_global_sender(None)
1019

11-
class TestEvent(unittest.TestCase):
1220
def test_logging(self):
21+
# XXX: This tests succeeds even if the fluentd connection failed
1322
# send event with tag app.follow
1423
event.Event('follow', {
1524
'from': 'userA',
1625
'to': 'userB'
1726
})
1827

28+
def test_logging_with_timestamp(self):
29+
# XXX: This tests succeeds even if the fluentd connection failed
30+
1931
# send event with tag app.follow, with timestamp
2032
event.Event('follow', {
2133
'from': 'userA',
2234
'to': 'userB'
2335
}, time=int(0))
36+
37+
def test_no_last_error_on_successful_event(self):
38+
global_sender = sender.get_global_sender()
39+
event.Event('unfollow', {
40+
'from': 'userC',
41+
'to': 'userD'
42+
})
43+
44+
self.assertEqual(global_sender.last_error, None)
45+
sender.close()
46+
47+
@unittest.skip("This test failed with 'TypeError: catching classes that do not inherit from BaseException is not allowed' so skipped")
48+
#@patch('fluent.sender.socket')
49+
def test_connect_exception_during_event_send(self, mock_socket):
50+
# Make the socket.socket().connect() call raise a custom exception
51+
mock_connect = mock_socket.socket.return_value.connect
52+
EXCEPTION_MSG = "a event send socket connect() exception"
53+
mock_connect.side_effect = TestException(EXCEPTION_MSG)
54+
55+
# Force the socket to reconnect while trying to emit the event
56+
global_sender = sender.get_global_sender()
57+
global_sender._close()
58+
59+
event.Event('unfollow', {
60+
'from': 'userE',
61+
'to': 'userF'
62+
})
63+
64+
ex = global_sender.last_error
65+
self.assertEqual(ex.args, EXCEPTION_MSG)
66+
global_sender.clear_last_error()

tests/test_sender.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
from __future__ import print_function
44
import unittest
5+
import socket
56

67
import fluent.sender
7-
88
from tests import mockserver
99

1010

@@ -45,6 +45,9 @@ def setUp(self):
4545
self._sender = fluent.sender.FluentSender(tag='test',
4646
port=self._server.port)
4747

48+
def tearDown(self):
49+
self._sender.close()
50+
4851
def get_data(self):
4952
return self._server.get_recieved()
5053

@@ -60,3 +63,33 @@ def test_simple(self):
6063
eq({'bar': 'baz'}, data[0][2])
6164
self.assertTrue(data[0][1])
6265
self.assertTrue(isinstance(data[0][1], int))
66+
67+
def test_no_last_error_on_successful_emit(self):
68+
sender = self._sender
69+
sender.emit('foo', {'bar': 'baz'})
70+
sender._close()
71+
72+
self.assertEqual(sender.last_error, None)
73+
74+
def test_last_error_property(self):
75+
EXCEPTION_MSG = "custom exception for testing last_error property"
76+
self._sender.last_error = socket.error(EXCEPTION_MSG)
77+
78+
self.assertEqual(self._sender.last_error.args[0], EXCEPTION_MSG)
79+
80+
def test_clear_last_error(self):
81+
EXCEPTION_MSG = "custom exception for testing clear_last_error"
82+
self._sender.last_error = socket.error(EXCEPTION_MSG)
83+
self._sender.clear_last_error()
84+
85+
self.assertEqual(self._sender.last_error, None)
86+
87+
@unittest.skip("This test failed with 'TypeError: catching classes that do not inherit from BaseException is not allowed' so skipped")
88+
#@patch('fluent.sender.socket')
89+
def test_connect_exception_during_sender_init(self, mock_socket):
90+
# Make the socket.socket().connect() call raise a custom exception
91+
mock_connect = mock_socket.socket.return_value.connect
92+
EXCEPTION_MSG = "a sender init socket connect() exception"
93+
mock_connect.side_effect = socket.error(EXCEPTION_MSG)
94+
95+
self.assertEqual(self._sender.last_error.args[0], EXCEPTION_MSG)

tox.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tox]
22
minversion = 1.7.2
3-
envlist = py26, py27, py32, py33, py34, py35
3+
envlist = py27, py32, py33, py34, py35
44
skip_missing_interpreters = True
55

66
[testenv]

0 commit comments

Comments
 (0)