@@ -448,4 +448,60 @@ public void run() {
448448 assertEquals ((i * LOOP * (N - i )), (long )counters .get (i ));
449449 }
450450 }
451+
452+ @ Test
453+ public void testFlushOnClose () throws Exception {
454+ // start mock fluentd
455+ int port = MockFluentd .randomPort ();
456+ String host = "localhost" ;
457+ final List <Event > elist = new ArrayList <Event >();
458+ MockFluentd fluentd = new MockFluentd (port , new MockFluentd .MockProcess () {
459+ public void process (MessagePack msgpack , Socket socket ) throws IOException {
460+ BufferedInputStream in = new BufferedInputStream (socket .getInputStream ());
461+ try {
462+ Unpacker unpacker = msgpack .createUnpacker (in );
463+ while (true ) {
464+ Event e = unpacker .read (Event .class );
465+ elist .add (e );
466+ }
467+ //socket.close();
468+ } catch (EOFException e ) {
469+ // ignore
470+ }
471+ }
472+ });
473+
474+ FixedThreadManager threadManager = new FixedThreadManager (1 );
475+
476+ // start loggers
477+ FluentLogger logger = FluentLogger .getLogger ("prefix" , host , port );
478+ {
479+ Map <String , Object > data = new HashMap <String , Object >();
480+ data .put ("k" , "v" );
481+ // Fluentd hasn't started yet and the record will be buffered.
482+ logger .log ("tag" , data );
483+ }
484+
485+ threadManager .submit (fluentd );
486+ Thread .sleep (1000 );
487+
488+ // close loggers and it should flush the buffer
489+ logger .close ();
490+
491+ Thread .sleep (1000 );
492+
493+ // close mock fluentd
494+ fluentd .close ();
495+
496+ // wait for unpacking event data on fluentd
497+ threadManager .join ();
498+
499+ // check data
500+ assertEquals (1 , elist .size ());
501+ Event ev = elist .get (0 );
502+ assertEquals ("prefix.tag" , ev .tag );
503+ assertEquals (1 , ev .data .size ());
504+ assertTrue (ev .data .containsKey ("k" ));
505+ assertTrue (ev .data .containsValue ("v" ));
506+ }
451507}
0 commit comments