1515import java .io .IOException ;
1616import java .net .Socket ;
1717import java .util .*;
18+ import java .util .concurrent .CountDownLatch ;
1819import java .util .concurrent .ExecutorService ;
1920import java .util .concurrent .Executors ;
2021import java .util .concurrent .TimeUnit ;
21- import java .util .concurrent .atomic .AtomicBoolean ;
2222import java .util .concurrent .atomic .AtomicReference ;
2323
2424import static org .junit .Assert .*;
@@ -456,7 +456,7 @@ public void testFlushOnClose() throws Exception {
456456 int port = MockFluentd .randomPort ();
457457 String host = "localhost" ;
458458 final List <Event > elist = new ArrayList <Event >();
459- final AtomicBoolean received = new AtomicBoolean ( false );
459+ final CountDownLatch latch = new CountDownLatch ( 1 );
460460 MockFluentd fluentd = new MockFluentd (port , new MockFluentd .MockProcess () {
461461 public void process (MessagePack msgpack , Socket socket ) throws IOException {
462462 BufferedInputStream in = new BufferedInputStream (socket .getInputStream ());
@@ -465,7 +465,7 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
465465 while (true ) {
466466 Event e = unpacker .read (Event .class );
467467 elist .add (e );
468- received . set ( true );
468+ latch . countDown ( );
469469 }
470470 //socket.close();
471471 } catch (EOFException e ) {
@@ -492,9 +492,7 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
492492 logger .close ();
493493
494494 // wait for fluentd's getting at least one kv pair
495- while (!received .get ()) {
496- Thread .sleep (100 );
497- }
495+ latch .await (3 , TimeUnit .SECONDS );
498496
499497 // close mock fluentd
500498 fluentd .close ();
0 commit comments