|
15 | 15 | import java.io.IOException; |
16 | 16 | import java.net.Socket; |
17 | 17 | import java.util.*; |
| 18 | +import java.util.concurrent.CountDownLatch; |
18 | 19 | import java.util.concurrent.ExecutorService; |
19 | 20 | import java.util.concurrent.Executors; |
20 | 21 | import java.util.concurrent.TimeUnit; |
@@ -448,4 +449,63 @@ public void run() { |
448 | 449 | assertEquals((i * LOOP * (N - i)), (long)counters.get(i)); |
449 | 450 | } |
450 | 451 | } |
| 452 | + |
| 453 | + @Test |
| 454 | + public void testFlushOnClose() throws Exception { |
| 455 | + // start mock fluentd |
| 456 | + int port = MockFluentd.randomPort(); |
| 457 | + String host = "localhost"; |
| 458 | + final List<Event> elist = new ArrayList<Event>(); |
| 459 | + final CountDownLatch latch = new CountDownLatch(1); |
| 460 | + MockFluentd fluentd = new MockFluentd(port, new MockFluentd.MockProcess() { |
| 461 | + public void process(MessagePack msgpack, Socket socket) throws IOException { |
| 462 | + BufferedInputStream in = new BufferedInputStream(socket.getInputStream()); |
| 463 | + try { |
| 464 | + Unpacker unpacker = msgpack.createUnpacker(in); |
| 465 | + while (true) { |
| 466 | + Event e = unpacker.read(Event.class); |
| 467 | + elist.add(e); |
| 468 | + latch.countDown(); |
| 469 | + } |
| 470 | + //socket.close(); |
| 471 | + } catch (EOFException e) { |
| 472 | + // ignore |
| 473 | + } |
| 474 | + } |
| 475 | + }); |
| 476 | + |
| 477 | + FixedThreadManager threadManager = new FixedThreadManager(1); |
| 478 | + |
| 479 | + // start loggers |
| 480 | + FluentLogger logger = FluentLogger.getLogger("prefix", host, port); |
| 481 | + { |
| 482 | + Map<String, Object> data = new HashMap<String, Object>(); |
| 483 | + data.put("k", "v"); |
| 484 | + // Fluentd hasn't started yet and the record will be buffered. |
| 485 | + logger.log("tag", data); |
| 486 | + } |
| 487 | + |
| 488 | + threadManager.submit(fluentd); |
| 489 | + Thread.sleep(1000); |
| 490 | + |
| 491 | + // close loggers and it should flush the buffer |
| 492 | + logger.close(); |
| 493 | + |
| 494 | + // wait for fluentd's getting at least one kv pair |
| 495 | + latch.await(3, TimeUnit.SECONDS); |
| 496 | + |
| 497 | + // close mock fluentd |
| 498 | + fluentd.close(); |
| 499 | + |
| 500 | + // wait for unpacking event data on fluentd |
| 501 | + threadManager.join(); |
| 502 | + |
| 503 | + // check data |
| 504 | + assertEquals(1, elist.size()); |
| 505 | + Event ev = elist.get(0); |
| 506 | + assertEquals("prefix.tag", ev.tag); |
| 507 | + assertEquals(1, ev.data.size()); |
| 508 | + assertTrue(ev.data.containsKey("k")); |
| 509 | + assertTrue(ev.data.containsValue("v")); |
| 510 | + } |
451 | 511 | } |
0 commit comments