File tree Expand file tree Collapse file tree 3 files changed +10
-17
lines changed
src/test/java/cn/leancloud/kafka/consumer/integration Expand file tree Collapse file tree 3 files changed +10
-17
lines changed Original file line number Diff line number Diff line change @@ -48,13 +48,14 @@ public static void main(String[] args) throws Exception {
4848 final List <TestRunner > runners = prepareRunners ();
4949
5050 try {
51- for (TestRunner runner : runners ) {
52- runner .runTest (new SingleConsumerTest ());
53- runner .runTest (new TwoConsumerTest ());
54- runner .runTest (new JoinLeaveGroupTest ());
51+ for (int i = 0 ; i < runners .size (); i ++) {
52+ try (TestRunner runner = runners .get (i )){
53+ runner .runTest (new SingleConsumerTest ());
54+ runner .runTest (new TwoConsumerTest ());
55+ runner .runTest (new JoinLeaveGroupTest ());
56+ }
5557 }
5658 } finally {
57- shutdown (runners );
5859 workerPool .shutdown ();
5960 }
6061 }
@@ -108,12 +109,6 @@ private static List<TestRunner> prepareRunners() {
108109 return runners ;
109110 }
110111
111- private static void shutdown (List <TestRunner > runners ) throws Exception {
112- for (TestRunner runner : runners ) {
113- runner .close ();
114- }
115- }
116-
117112 private static class AutoCommitWithWorkerPoolConsumerFactory implements ConsumerFactory {
118113 @ Override
119114 public String type () {
Original file line number Diff line number Diff line change @@ -22,8 +22,4 @@ public TestingProducer producer() {
2222 public String topic () {
2323 return topic ;
2424 }
25-
26- public TestStatistics newStatistics () {
27- return new TestStatistics ();
28- }
2925}
Original file line number Diff line number Diff line change 99
1010public class TestRunner implements Closeable {
1111 private static final Logger logger = LoggerFactory .getLogger (TestRunner .class );
12+ private static final Duration defaultProducerSendMsgInterval = Duration .ofMillis (100 );
13+ private static final int defaultConcurrentProducerCount = 2 ;
1214
1315 private final TestingProducer producer ;
1416 private final ConsumerFactory factory ;
1517 private final TestContext context ;
1618
1719 TestRunner (String topic , ConsumerFactory factory ) {
1820 this .factory = factory ;
19- this .producer = new TestingProducer (Duration . ofMillis ( 100 ), 2 );
21+ this .producer = new TestingProducer (defaultProducerSendMsgInterval , defaultConcurrentProducerCount );
2022 this .context = new TestContext (topic , producer , factory );
2123 }
2224
2325 void runTest (IntegrationTest test ) throws Exception {
2426 logger .info ("\n \n \n ------------------------------------ Start {} with consumer: {} ------------------------------------\n " ,
2527 test .name (), factory .type ());
2628
27- final TestStatistics statistics = context . newStatistics ();
29+ final TestStatistics statistics = new TestStatistics ();
2830 try {
2931 test .runTest (context , statistics );
3032
You can’t perform that action at this time.
0 commit comments