Skip to content

Commit 1eb6664

Browse files
committed
Add test cases for AsyncRawSocketSender
* Added normal cases for now.
1 parent 895da7a commit 1eb6664

File tree

1 file changed

+251
-0
lines changed

1 file changed

+251
-0
lines changed
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
package org.fluentd.logger.sender;
2+
import org.fluentd.logger.util.MockFluentd;
3+
import org.fluentd.logger.util.MockFluentd.MockProcess;
4+
import org.junit.Test;
5+
import org.msgpack.MessagePack;
6+
import org.msgpack.unpacker.Unpacker;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
import java.io.BufferedInputStream;
11+
import java.io.EOFException;
12+
import java.io.IOException;
13+
import java.net.Socket;
14+
import java.util.ArrayList;
15+
import java.util.HashMap;
16+
import java.util.List;
17+
import java.util.Map;
18+
import java.util.concurrent.ConcurrentLinkedQueue;
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.ExecutorService;
21+
import java.util.concurrent.Executors;
22+
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
25+
import static org.junit.Assert.assertEquals;
26+
import static org.junit.Assert.assertFalse;
27+
import static org.junit.Assert.assertTrue;
28+
29+
public class TestAsyncRawSocketSender {
30+
31+
@Test
32+
public void testNormal01() throws Exception {
33+
// start mock fluentd
34+
int port = MockFluentd.randomPort();
35+
final List<Event> elist = new ArrayList<Event>();
36+
MockFluentd fluentd = new MockFluentd(port, new MockFluentd.MockProcess() {
37+
public void process(MessagePack msgpack, Socket socket) throws IOException {
38+
BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
39+
try {
40+
Unpacker unpacker = msgpack.createUnpacker(in);
41+
while (true) {
42+
Event e = unpacker.read(Event.class);
43+
elist.add(e);
44+
}
45+
//socket.close();
46+
} catch (EOFException e) {
47+
// ignore
48+
}
49+
}
50+
});
51+
fluentd.start();
52+
53+
// start asyncSenders
54+
Sender asyncSender = new AsyncRawSocketSender("localhost", port);
55+
Map<String, Object> data = new HashMap<String, Object>();
56+
data.put("t1k1", "t1v1");
57+
data.put("t1k2", "t1v2");
58+
asyncSender.emit("tag.label1", data);
59+
60+
Map<String, Object> data2 = new HashMap<String, Object>();
61+
data2.put("t2k1", "t2v1");
62+
data2.put("t2k2", "t2v2");
63+
asyncSender.emit("tag.label2", data2);
64+
65+
// close asyncSender sockets
66+
asyncSender.close();
67+
68+
// wait for unpacking event data on fluentd
69+
Thread.sleep(2000);
70+
71+
// close mock server sockets
72+
fluentd.close();
73+
74+
75+
// check data
76+
assertEquals(2, elist.size());
77+
{
78+
Event e = elist.get(0);
79+
assertEquals("tag.label1", e.tag);
80+
assertEquals("t1v1", e.data.get("t1k1"));
81+
assertEquals("t1v2", e.data.get("t1k2"));
82+
}
83+
{
84+
Event e = elist.get(1);
85+
assertEquals("tag.label2", e.tag);
86+
assertEquals("t2v1", e.data.get("t2k1"));
87+
assertEquals("t2v2", e.data.get("t2k2"));
88+
}
89+
}
90+
91+
92+
93+
@Test
94+
public void testNormal02() throws Exception {
95+
// start mock fluentd
96+
int port = MockFluentd.randomPort(); // Use a random port available
97+
final List<Event> elist = new ArrayList<Event>();
98+
MockFluentd fluentd = new MockFluentd(port, new MockFluentd.MockProcess() {
99+
public void process(MessagePack msgpack, Socket socket) throws IOException {
100+
BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
101+
try {
102+
Unpacker unpacker = msgpack.createUnpacker(in);
103+
while (true) {
104+
Event e = unpacker.read(Event.class);
105+
elist.add(e);
106+
}
107+
//socket.close();
108+
} catch (EOFException e) {
109+
// ignore
110+
}
111+
}
112+
});
113+
fluentd.start();
114+
115+
// start asyncSenders
116+
Sender asyncSender = new AsyncRawSocketSender("localhost", port);
117+
int count = 10000;
118+
for (int i = 0; i < count; i++) {
119+
String tag = "tag:i";
120+
Map<String, Object> record = new HashMap<String, Object>();
121+
record.put("i", i);
122+
record.put("n", "name:" + i);
123+
asyncSender.emit(tag, record);
124+
}
125+
126+
// close asyncSender sockets
127+
asyncSender.close();
128+
129+
// wait for unpacking event data on fluentd
130+
Thread.sleep(2000);
131+
132+
// close mock server sockets
133+
fluentd.close();
134+
135+
136+
// check data
137+
assertEquals(count, elist.size());
138+
}
139+
140+
@Test
141+
public void testNormal03() throws Exception {
142+
// start mock fluentds
143+
final MockFluentd[] fluentds = new MockFluentd[2];
144+
final List[] elists = new List[2];
145+
final int[] ports = new int[2];
146+
ports[0] = MockFluentd.randomPort();
147+
AsyncRawSocketSender asyncRawSocketSender = new AsyncRawSocketSender("localhost", ports[0]); // it should be failed to connect to fluentd
148+
elists[0] = new ArrayList<Event>();
149+
fluentds[0] = new MockFluentd(ports[0], new MockFluentd.MockProcess() {
150+
public void process(MessagePack msgpack, Socket socket) throws IOException {
151+
BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
152+
try {
153+
Unpacker unpacker = msgpack.createUnpacker(in);
154+
while (true) {
155+
Event e = unpacker.read(Event.class);
156+
elists[0].add(e);
157+
}
158+
//socket.close();
159+
} catch (EOFException e) {
160+
// ignore
161+
}
162+
}
163+
});
164+
fluentds[0].start();
165+
ports[1] = MockFluentd.randomPort();
166+
elists[1] = new ArrayList<Event>();
167+
fluentds[1] = new MockFluentd(ports[1], new MockFluentd.MockProcess() {
168+
public void process(MessagePack msgpack, Socket socket) throws IOException {
169+
BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
170+
try {
171+
Unpacker unpacker = msgpack.createUnpacker(in);
172+
while (true) {
173+
Event e = unpacker.read(Event.class);
174+
elists[1].add(e);
175+
}
176+
//socket.close();
177+
} catch (EOFException e) {
178+
// ignore
179+
}
180+
}
181+
});
182+
fluentds[1].start();
183+
184+
// start AsyncSenders
185+
Sender[] asyncSenders = new Sender[2];
186+
int[] counts = new int[2];
187+
asyncSenders[0] = asyncRawSocketSender;
188+
counts[0] = 10000;
189+
for (int i = 0; i < counts[0]; i++) {
190+
String tag = "tag:i";
191+
Map<String, Object> record = new HashMap<String, Object>();
192+
record.put("i", i);
193+
record.put("n", "name:" + i);
194+
asyncSenders[0].emit(tag, record);
195+
}
196+
asyncSenders[1] = new AsyncRawSocketSender("localhost", ports[1]);
197+
counts[1] = 10000;
198+
for (int i = 0; i < counts[1]; i++) {
199+
String tag = "tag:i";
200+
Map<String, Object> record = new HashMap<String, Object>();
201+
record.put("i", i);
202+
record.put("n", "name:" + i);
203+
asyncSenders[1].emit(tag, record);
204+
}
205+
206+
// close sender sockets
207+
asyncSenders[0].close();
208+
asyncSenders[1].close();
209+
210+
// wait for unpacking event data on fluentd
211+
Thread.sleep(2000);
212+
213+
// close mock server sockets
214+
fluentds[0].close();
215+
fluentds[1].close();
216+
217+
218+
// check data
219+
assertEquals(counts[0], elists[0].size());
220+
assertEquals(counts[1], elists[1].size());
221+
}
222+
223+
@Test
224+
public void testTimeout() throws InterruptedException {
225+
final AtomicBoolean socketFinished = new AtomicBoolean(false);
226+
ExecutorService executor = Executors.newSingleThreadExecutor();
227+
228+
executor.execute(new Runnable() {
229+
@Override
230+
public void run() {
231+
AsyncRawSocketSender asyncRawSocketSender = null;
232+
try {
233+
// try to connect to test network
234+
asyncRawSocketSender = new AsyncRawSocketSender("192.0.2.1", 24224, 200, 8 * 1024);
235+
}
236+
finally {
237+
if (asyncRawSocketSender != null) {
238+
asyncRawSocketSender.close();
239+
}
240+
socketFinished.set(true);
241+
}
242+
}
243+
});
244+
245+
while(!socketFinished.get())
246+
Thread.yield();
247+
248+
assertTrue(socketFinished.get());
249+
executor.shutdownNow();
250+
}
251+
}

0 commit comments

Comments
 (0)