Skip to content

Commit a6ba413

Browse files
committed
Add ServerErrorHandler
1 parent fedab54 commit a6ba413

File tree

6 files changed

+71
-12
lines changed

6 files changed

+71
-12
lines changed

src/main/java/org/fluentd/logger/FluentLogger.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,4 +132,10 @@ public void finalize() {
132132
public boolean isConnected() {
133133
return sender != null && sender.isConnected();
134134
}
135+
136+
public synchronized void setServerErrorHandler(ServerErrorHandler serverErrorHandler) {
137+
if (sender != null) {
138+
sender.setServerErrorHandler(serverErrorHandler);
139+
}
140+
}
135141
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.fluentd.logger;
2+
3+
import java.io.IOException;
4+
5+
public interface ServerErrorHandler {
6+
void handle(IOException ex);
7+
}

src/main/java/org/fluentd/logger/sender/NullSender.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
//
1818
package org.fluentd.logger.sender;
1919

20+
import org.fluentd.logger.ServerErrorHandler;
21+
2022
import java.util.Map;
2123

2224
public class NullSender implements Sender {
@@ -55,4 +57,8 @@ public String toString() {
5557
public boolean isConnected() {
5658
return true;
5759
}
60+
61+
@Override
62+
public void setServerErrorHandler(ServerErrorHandler serverErrorHandler) {
63+
}
5864
}

src/main/java/org/fluentd/logger/sender/RawSocketSender.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
//
1818
package org.fluentd.logger.sender;
1919

20+
import org.fluentd.logger.ServerErrorHandler;
2021
import org.msgpack.MessagePack;
2122
import org.slf4j.Logger;
2223
import org.slf4j.LoggerFactory;
@@ -49,6 +50,8 @@ public class RawSocketSender implements Sender {
4950

5051
private String name;
5152

53+
private ServerErrorHandler serverErrorHandler;
54+
5255
public RawSocketSender() {
5356
this("localhost", 24224);
5457
}
@@ -185,6 +188,14 @@ public synchronized void flush() {
185188
clearBuffer();
186189
reconnector.clearErrorHistory();
187190
} catch (IOException e) {
191+
try {
192+
if (serverErrorHandler != null) {
193+
serverErrorHandler.handle(e);
194+
}
195+
}
196+
catch (Exception handlerException) {
197+
LOG.warn("ServerErrorHandler.handle error", handlerException);
198+
}
188199
LOG.error(this.getClass().getName(), "flush", e);
189200
reconnector.addErrorHistory(System.currentTimeMillis());
190201
close();
@@ -217,4 +228,9 @@ public String toString() {
217228
public boolean isConnected() {
218229
return socket != null && !socket.isClosed() && socket.isConnected() && !socket.isOutputShutdown();
219230
}
231+
232+
@Override
233+
public void setServerErrorHandler(ServerErrorHandler serverErrorHandler) {
234+
this.serverErrorHandler = serverErrorHandler;
235+
}
220236
}

src/main/java/org/fluentd/logger/sender/Sender.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
//
1818
package org.fluentd.logger.sender;
1919

20+
import org.fluentd.logger.ServerErrorHandler;
21+
2022
import java.util.Map;
2123

2224
public interface Sender {
@@ -31,4 +33,6 @@ public interface Sender {
3133
String getName();
3234

3335
boolean isConnected();
36+
37+
void setServerErrorHandler(ServerErrorHandler serverErrorHandler);
3438
}

src/test/java/org/fluentd/logger/TestFluentLogger.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717
import java.util.concurrent.ExecutorService;
1818
import java.util.concurrent.Executors;
1919
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.atomic.AtomicReference;
2021

21-
import static org.junit.Assert.assertEquals;
22-
import static org.junit.Assert.assertFalse;
23-
import static org.junit.Assert.assertTrue;
22+
import static org.junit.Assert.*;
2423

2524
public class TestFluentLogger {
2625
private Logger _logger = LoggerFactory.getLogger(TestFluentLogger.class);
@@ -203,9 +202,11 @@ public void testReconnection() throws Exception {
203202
int port = MockFluentd.randomPort();
204203
String host = "localhost";
205204
final List<Event> elist1 = new ArrayList<Event>();
205+
final AtomicReference<Exception> lastError = new AtomicReference<Exception>();
206206

207207
FixedThreadManager threadManager = new FixedThreadManager(2);
208208

209+
// run a fluentd
209210
MockFluentd fluentd1 = new MockFluentd(port, new MockFluentd.MockProcess() {
210211
public void process(MessagePack msgpack, Socket socket) throws IOException {
211212
BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
@@ -226,8 +227,14 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
226227
});
227228
threadManager.submit(fluentd1);
228229

229-
// start loggers
230+
// start a logger
230231
FluentLogger logger = FluentLogger.getLogger("testtag", host, port);
232+
logger.setServerErrorHandler(new ServerErrorHandler() {
233+
@Override
234+
public void handle(IOException ex) {
235+
lastError.set(ex);
236+
}
237+
});
231238
assertFalse(logger.isConnected());
232239
{
233240
Map<String, Object> data = new HashMap<String, Object>();
@@ -237,21 +244,31 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
237244
}
238245
assertTrue(logger.isConnected());
239246

247+
// close the fluentd to make the situation that the fluentd gets down
240248
TimeUnit.MILLISECONDS.sleep(500);
241249
_logger.info("Closing the current fluentd instance");
242250
fluentd1.closeClientSockets();
243251
fluentd1.close();
244252

253+
// the logger should fail to send an event
245254
TimeUnit.MILLISECONDS.sleep(500);
246255
assertTrue(logger.isConnected());
247-
{
248-
Map<String, Object> data = new HashMap<String, Object>();
249-
data.put("k3", "v3");
250-
data.put("k4", "v4");
251-
logger.log("test01", data);
256+
for (int i = 0; i < 2; i++) {
257+
// repeat twice to test both behaviors on socket write error and connection error
258+
assertNull(lastError.get());
259+
{
260+
Map<String, Object> data = new HashMap<String, Object>();
261+
data.put("k3", "v3");
262+
data.put("k4", "v4");
263+
logger.log("test01", data);
264+
}
265+
assertTrue(lastError.get() instanceof IOException);
266+
lastError.set(null); // Clear the last error
267+
assertFalse(logger.isConnected());
268+
TimeUnit.MILLISECONDS.sleep(100);
252269
}
253-
assertFalse(logger.isConnected());
254270

271+
// run the fluentd again
255272
final List<Event> elist2 = new ArrayList<Event>();
256273
MockFluentd fluentd2 = new MockFluentd(port, new MockFluentd.MockProcess() {
257274
public void process(MessagePack msgpack, Socket socket) throws IOException {
@@ -270,14 +287,15 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
270287
});
271288
threadManager.submit(fluentd2);
272289

290+
// the logger should send an event successfully
273291
TimeUnit.MILLISECONDS.sleep(500);
274-
275292
{
276293
Map<String, Object> data = new HashMap<String, Object>();
277294
data.put("k5", "v5");
278295
data.put("k6", "v6");
279296
logger.log("test01", data);
280297
}
298+
assertNull(lastError.get());
281299
assertTrue(logger.isConnected());
282300

283301
// close loggers
@@ -289,14 +307,16 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
289307
// wait for unpacking event data on fluentd
290308
TimeUnit.MILLISECONDS.sleep(2000);
291309
threadManager.join();
310+
assertNull(lastError.get());
292311

293312
// check data
294313
assertEquals(1, elist1.size());
295314
assertEquals("testtag.test01", elist1.get(0).tag);
296315

297-
assertEquals(2, elist2.size());
316+
assertEquals(3, elist2.size());
298317
assertEquals("testtag.test01", elist2.get(0).tag);
299318
assertEquals("testtag.test01", elist2.get(1).tag);
319+
assertEquals("testtag.test01", elist2.get(2).tag);
300320
}
301321

302322
@Test

0 commit comments

Comments
 (0)