Skip to content

Commit 5e3e69f

Browse files
committed
Merge pull request #32 from fluent/error_handler
Add ServerErrorHandler
2 parents fedab54 + d934857 commit 5e3e69f

File tree

6 files changed

+116
-10
lines changed

6 files changed

+116
-10
lines changed

src/main/java/org/fluentd/logger/FluentLogger.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.HashMap;
2121
import java.util.Map;
2222

23+
import org.fluentd.logger.errorhandler.ErrorHandler;
2324
import org.fluentd.logger.sender.Reconnector;
2425
import org.fluentd.logger.sender.Sender;
2526

@@ -132,4 +133,23 @@ public void finalize() {
132133
public boolean isConnected() {
133134
return sender != null && sender.isConnected();
134135
}
136+
137+
public synchronized void setErrorHandler(ErrorHandler errorHandler) {
138+
if (errorHandler == null) {
139+
throw new IllegalArgumentException("errorHandler is null");
140+
}
141+
142+
if (sender != null) {
143+
sender.setErrorHandler(errorHandler);
144+
}
145+
}
146+
147+
public synchronized void removeErrorHandler() {
148+
if (sender != null) {
149+
sender.removeErrorHandler();
150+
}
151+
}
152+
153+
154+
135155
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.fluentd.logger.errorhandler;
2+
3+
import java.io.IOException;
4+
5+
public abstract class ErrorHandler {
6+
public void handleNetworkError(IOException ex) {};
7+
}

src/main/java/org/fluentd/logger/sender/NullSender.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
//
1818
package org.fluentd.logger.sender;
1919

20+
import org.fluentd.logger.errorhandler.ErrorHandler;
21+
2022
import java.util.Map;
2123

2224
public class NullSender implements Sender {
@@ -55,4 +57,12 @@ public String toString() {
5557
public boolean isConnected() {
5658
return true;
5759
}
60+
61+
@Override
62+
public void setErrorHandler(ErrorHandler errorHandler) {
63+
}
64+
65+
@Override
66+
public void removeErrorHandler() {
67+
}
5868
}

src/main/java/org/fluentd/logger/sender/RawSocketSender.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
//
1818
package org.fluentd.logger.sender;
1919

20+
import org.fluentd.logger.errorhandler.ErrorHandler;
2021
import org.msgpack.MessagePack;
2122
import org.slf4j.Logger;
2223
import org.slf4j.LoggerFactory;
@@ -33,6 +34,8 @@ public class RawSocketSender implements Sender {
3334

3435
private static final Logger LOG = LoggerFactory.getLogger(RawSocketSender.class);
3536

37+
private static final ErrorHandler DEFAULT_ERROR_HANLDER = new ErrorHandler() {};
38+
3639
private MessagePack msgpack;
3740

3841
private SocketAddress server;
@@ -49,6 +52,8 @@ public class RawSocketSender implements Sender {
4952

5053
private String name;
5154

55+
private ErrorHandler errorHandler = DEFAULT_ERROR_HANLDER;
56+
5257
public RawSocketSender() {
5358
this("localhost", 24224);
5459
}
@@ -185,6 +190,12 @@ public synchronized void flush() {
185190
clearBuffer();
186191
reconnector.clearErrorHistory();
187192
} catch (IOException e) {
193+
try {
194+
errorHandler.handleNetworkError(e);
195+
}
196+
catch (Exception handlerException) {
197+
LOG.warn("ErrorHandler.handleNetworkError failed", handlerException);
198+
}
188199
LOG.error(this.getClass().getName(), "flush", e);
189200
reconnector.addErrorHistory(System.currentTimeMillis());
190201
close();
@@ -217,4 +228,18 @@ public String toString() {
217228
public boolean isConnected() {
218229
return socket != null && !socket.isClosed() && socket.isConnected() && !socket.isOutputShutdown();
219230
}
231+
232+
@Override
233+
public void setErrorHandler(ErrorHandler errorHandler) {
234+
if (errorHandler == null) {
235+
throw new IllegalArgumentException("errorHandler is null");
236+
}
237+
238+
this.errorHandler = errorHandler;
239+
}
240+
241+
@Override
242+
public void removeErrorHandler() {
243+
this.errorHandler = DEFAULT_ERROR_HANLDER;
244+
}
220245
}

src/main/java/org/fluentd/logger/sender/Sender.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
//
1818
package org.fluentd.logger.sender;
1919

20+
import org.fluentd.logger.errorhandler.ErrorHandler;
21+
2022
import java.util.Map;
2123

2224
public interface Sender {
@@ -31,4 +33,8 @@ public interface Sender {
3133
String getName();
3234

3335
boolean isConnected();
36+
37+
void setErrorHandler(ErrorHandler errorHandler);
38+
39+
void removeErrorHandler();
3440
}

src/test/java/org/fluentd/logger/TestFluentLogger.java

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.fluentd.logger;
22

3+
import org.fluentd.logger.errorhandler.ErrorHandler;
34
import org.fluentd.logger.sender.Event;
45
import org.fluentd.logger.sender.NullSender;
56
import org.fluentd.logger.util.MockFluentd;
@@ -17,10 +18,9 @@
1718
import java.util.concurrent.ExecutorService;
1819
import java.util.concurrent.Executors;
1920
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicReference;
2022

21-
import static org.junit.Assert.assertEquals;
22-
import static org.junit.Assert.assertFalse;
23-
import static org.junit.Assert.assertTrue;
23+
import static org.junit.Assert.*;
2424

2525
public class TestFluentLogger {
2626
private Logger _logger = LoggerFactory.getLogger(TestFluentLogger.class);
@@ -203,9 +203,11 @@ public void testReconnection() throws Exception {
203203
int port = MockFluentd.randomPort();
204204
String host = "localhost";
205205
final List<Event> elist1 = new ArrayList<Event>();
206+
final AtomicReference<Exception> lastError = new AtomicReference<Exception>();
206207

207208
FixedThreadManager threadManager = new FixedThreadManager(2);
208209

210+
// run a fluentd
209211
MockFluentd fluentd1 = new MockFluentd(port, new MockFluentd.MockProcess() {
210212
public void process(MessagePack msgpack, Socket socket) throws IOException {
211213
BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
@@ -226,8 +228,16 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
226228
});
227229
threadManager.submit(fluentd1);
228230

229-
// start loggers
230-
FluentLogger logger = FluentLogger.getLogger("testtag", host, port);
231+
// start a logger
232+
final FluentLogger logger = FluentLogger.getLogger("testtag", host, port);
233+
final ErrorHandler errorHandler = new ErrorHandler() {
234+
@Override
235+
public void handleNetworkError(IOException ex) {
236+
lastError.set(ex);
237+
}
238+
};
239+
logger.setErrorHandler(errorHandler);
240+
231241
assertFalse(logger.isConnected());
232242
{
233243
Map<String, Object> data = new HashMap<String, Object>();
@@ -237,21 +247,46 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
237247
}
238248
assertTrue(logger.isConnected());
239249

250+
// close the fluentd to make the situation that the fluentd gets down
240251
TimeUnit.MILLISECONDS.sleep(500);
241252
_logger.info("Closing the current fluentd instance");
242253
fluentd1.closeClientSockets();
243254
fluentd1.close();
244255

256+
// the logger should fail to send an event
245257
TimeUnit.MILLISECONDS.sleep(500);
246258
assertTrue(logger.isConnected());
259+
for (int i = 0; i < 2; i++) {
260+
// repeat twice to test both behaviors on socket write error and connection error
261+
assertNull(lastError.get());
262+
{
263+
// writing to the closed socket
264+
Map<String, Object> data = new HashMap<String, Object>();
265+
data.put("k3", "v3");
266+
data.put("k4", "v4");
267+
logger.log("test01", data);
268+
}
269+
assertTrue(lastError.get() instanceof IOException);
270+
lastError.set(null); // Clear the last error
271+
assertFalse(logger.isConnected());
272+
TimeUnit.MILLISECONDS.sleep(100);
273+
}
274+
275+
// the logger shouldn't call the error handler after calling removeErrorHandler()
276+
logger.removeErrorHandler();
247277
{
278+
// writing to the closed socket
248279
Map<String, Object> data = new HashMap<String, Object>();
249280
data.put("k3", "v3");
250281
data.put("k4", "v4");
251282
logger.log("test01", data);
252283
}
253-
assertFalse(logger.isConnected());
284+
assertNull(lastError.get());
285+
lastError.set(null); // Clear the last error
254286

287+
logger.setErrorHandler(errorHandler);
288+
289+
// run the fluentd again
255290
final List<Event> elist2 = new ArrayList<Event>();
256291
MockFluentd fluentd2 = new MockFluentd(port, new MockFluentd.MockProcess() {
257292
public void process(MessagePack msgpack, Socket socket) throws IOException {
@@ -270,14 +305,15 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
270305
});
271306
threadManager.submit(fluentd2);
272307

308+
// the logger should send an event successfully
273309
TimeUnit.MILLISECONDS.sleep(500);
274-
275310
{
276311
Map<String, Object> data = new HashMap<String, Object>();
277312
data.put("k5", "v5");
278313
data.put("k6", "v6");
279314
logger.log("test01", data);
280315
}
316+
assertNull(lastError.get());
281317
assertTrue(logger.isConnected());
282318

283319
// close loggers
@@ -289,14 +325,16 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
289325
// wait for unpacking event data on fluentd
290326
TimeUnit.MILLISECONDS.sleep(2000);
291327
threadManager.join();
328+
assertNull(lastError.get());
292329

293330
// check data
294331
assertEquals(1, elist1.size());
295332
assertEquals("testtag.test01", elist1.get(0).tag);
296333

297-
assertEquals(2, elist2.size());
298-
assertEquals("testtag.test01", elist2.get(0).tag);
299-
assertEquals("testtag.test01", elist2.get(1).tag);
334+
assertEquals(4, elist2.size());
335+
for (int i = 0; i < elist2.size(); i++) {
336+
assertEquals("testtag.test01", elist2.get(i).tag);
337+
}
300338
}
301339

302340
@Test

0 commit comments

Comments
 (0)