1818import java .util .concurrent .ExecutorService ;
1919import java .util .concurrent .Executors ;
2020import java .util .concurrent .TimeUnit ;
21+ import java .util .concurrent .atomic .AtomicBoolean ;
2122import java .util .concurrent .atomic .AtomicReference ;
2223
2324import static org .junit .Assert .*;
@@ -455,6 +456,7 @@ public void testFlushOnClose() throws Exception {
455456 int port = MockFluentd .randomPort ();
456457 String host = "localhost" ;
457458 final List <Event > elist = new ArrayList <Event >();
459+ final AtomicBoolean received = new AtomicBoolean (false );
458460 MockFluentd fluentd = new MockFluentd (port , new MockFluentd .MockProcess () {
459461 public void process (MessagePack msgpack , Socket socket ) throws IOException {
460462 BufferedInputStream in = new BufferedInputStream (socket .getInputStream ());
@@ -463,6 +465,7 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
463465 while (true ) {
464466 Event e = unpacker .read (Event .class );
465467 elist .add (e );
468+ received .set (true );
466469 }
467470 //socket.close();
468471 } catch (EOFException e ) {
@@ -488,7 +491,10 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
488491 // close loggers and it should flush the buffer
489492 logger .close ();
490493
491- Thread .sleep (1000 );
494+ // wait for fluentd's getting at least one kv pair
495+ while (!received .get ()) {
496+ Thread .sleep (100 );
497+ }
492498
493499 // close mock fluentd
494500 fluentd .close ();
0 commit comments