@@ -248,4 +248,93 @@ public void run() {
248248 assertTrue (socketFinished .get ());
249249 executor .shutdownNow ();
250250 }
251+
252+ @ Test
253+ public void testBufferingAndResending () throws InterruptedException , IOException {
254+ final ConcurrentLinkedQueue <Event > readEvents = new ConcurrentLinkedQueue <Event >();
255+ final CountDownLatch countDownLatch = new CountDownLatch (4 );
256+ int port = MockFluentd .randomPort ();
257+ MockProcess mockProcess = new MockFluentd .MockProcess () {
258+ public void process (MessagePack msgpack , Socket socket ) throws IOException {
259+ BufferedInputStream in = new BufferedInputStream (socket .getInputStream ());
260+ try {
261+ Unpacker unpacker = msgpack .createUnpacker (in );
262+ while (true ) {
263+ Event e = unpacker .read (Event .class );
264+ readEvents .add (e );
265+ countDownLatch .countDown ();
266+ }
267+ } catch (EOFException e ) {
268+ // e.printStackTrace();
269+ }
270+ }
271+ };
272+
273+ MockFluentd fluentd = new MockFluentd (port , mockProcess );
274+ fluentd .start ();
275+
276+ Sender asyncSender = new AsyncRawSocketSender ("localhost" , port );
277+ assertFalse (asyncSender .isConnected ());
278+ Map <String , Object > data = new HashMap <String , Object >();
279+ data .put ("key0" , "v0" );
280+ boolean emitted1 = asyncSender .emit ("tag0" , data );
281+ assertTrue (emitted1 );
282+
283+ // close fluentd to make the next sending failed
284+ TimeUnit .MILLISECONDS .sleep (500 );
285+
286+ fluentd .closeClientSockets ();
287+
288+ TimeUnit .MILLISECONDS .sleep (500 );
289+
290+ data = new HashMap <String , Object >();
291+ data .put ("key0" , "v1" );
292+ boolean emitted2 = asyncSender .emit ("tag0" , data );
293+ assertTrue (emitted2 );
294+
295+ // wait to avoid the suppression of reconnection
296+ TimeUnit .MILLISECONDS .sleep (500 );
297+
298+ data = new HashMap <String , Object >();
299+ data .put ("key0" , "v2" );
300+ boolean emitted3 = asyncSender .emit ("tag0" , data );
301+ assertTrue (emitted3 );
302+
303+ data = new HashMap <String , Object >();
304+ data .put ("key0" , "v3" );
305+ boolean emitted4 = asyncSender .emit ("tag0" , data );
306+ assertTrue (emitted4 );
307+
308+ countDownLatch .await (500 , TimeUnit .MILLISECONDS );
309+
310+ asyncSender .close ();
311+
312+ fluentd .close ();
313+
314+ assertEquals (4 , readEvents .size ());
315+
316+ Event event = readEvents .poll ();
317+ assertEquals ("tag0" , event .tag );
318+ assertEquals (1 , event .data .size ());
319+ assertTrue (event .data .keySet ().contains ("key0" ));
320+ assertTrue (event .data .values ().contains ("v0" ));
321+
322+ event = readEvents .poll ();
323+ assertEquals ("tag0" , event .tag );
324+ assertEquals (1 , event .data .size ());
325+ assertTrue (event .data .keySet ().contains ("key0" ));
326+ assertTrue (event .data .values ().contains ("v1" ));
327+
328+ event = readEvents .poll ();
329+ assertEquals ("tag0" , event .tag );
330+ assertEquals (1 , event .data .size ());
331+ assertTrue (event .data .keySet ().contains ("key0" ));
332+ assertTrue (event .data .values ().contains ("v2" ));
333+
334+ event = readEvents .poll ();
335+ assertEquals ("tag0" , event .tag );
336+ assertEquals (1 , event .data .size ());
337+ assertTrue (event .data .keySet ().contains ("key0" ));
338+ assertTrue (event .data .values ().contains ("v3" ));
339+ }
251340}
0 commit comments