22
33import org .apache .kafka .clients .consumer .MockConsumer ;
44import org .apache .kafka .clients .consumer .OffsetResetStrategy ;
5+ import org .apache .kafka .common .PartitionInfo ;
6+ import org .apache .kafka .common .TopicPartition ;
57import org .junit .After ;
68import org .junit .Before ;
79import org .junit .Test ;
810
9- import java .util .Collections ;
10- import java .util .HashMap ;
11- import java .util .List ;
12- import java .util .Map ;
11+ import java .util .*;
1312import java .util .concurrent .ExecutorService ;
14-
13+ import java .util .function .Function ;
14+ import java .util .regex .Pattern ;
15+ import java .util .stream .Collectors ;
16+ import java .util .stream .IntStream ;
17+
18+ import static cn .leancloud .kafka .consumer .TestingUtils .assignPartitions ;
19+ import static cn .leancloud .kafka .consumer .TestingUtils .testingTopic ;
20+ import static java .util .stream .Collectors .toList ;
21+ import static java .util .stream .Collectors .toMap ;
1522import static org .assertj .core .api .Assertions .assertThat ;
1623import static org .assertj .core .api .Assertions .assertThatThrownBy ;
1724import static org .mockito .ArgumentMatchers .any ;
@@ -41,6 +48,17 @@ public void tearDown() throws Exception {
4148 consumer .close ();
4249 }
4350
51+ @ Test
52+ public void testSubscribeNullTopics () {
53+ consumer = LcKafkaConsumerBuilder .newBuilder (configs , consumerRecordHandler )
54+ .mockKafkaConsumer (new MockConsumer <>(OffsetResetStrategy .LATEST ))
55+ .buildSync ();
56+ Collection <String > topics = null ;
57+ assertThatThrownBy (() -> consumer .subscribe (topics ))
58+ .isInstanceOf (NullPointerException .class )
59+ .hasMessage ("topics" );
60+ }
61+
4462 @ Test
4563 public void testSubscribeWithEmptyTopics () {
4664 consumer = LcKafkaConsumerBuilder .newBuilder (configs , consumerRecordHandler )
@@ -52,7 +70,38 @@ public void testSubscribeWithEmptyTopics() {
5270 }
5371
5472 @ Test
55- public void testSubscribe () {
73+ public void testSubscribeContainsEmptyTopics () {
74+ consumer = LcKafkaConsumerBuilder .newBuilder (configs , consumerRecordHandler )
75+ .mockKafkaConsumer (new MockConsumer <>(OffsetResetStrategy .LATEST ))
76+ .buildSync ();
77+ assertThatThrownBy (() -> consumer .subscribe (Arrays .asList ("Topic" , "" )))
78+ .isInstanceOf (IllegalArgumentException .class )
79+ .hasMessage ("topic collection to subscribe to cannot contain null or empty topic" );
80+ }
81+
82+ @ Test
83+ public void testSubscribeContainsNullPattern () {
84+ consumer = LcKafkaConsumerBuilder .newBuilder (configs , consumerRecordHandler )
85+ .mockKafkaConsumer (new MockConsumer <>(OffsetResetStrategy .LATEST ))
86+ .buildSync ();
87+ Pattern pattern = null ;
88+ assertThatThrownBy (() -> consumer .subscribe (pattern ))
89+ .isInstanceOf (NullPointerException .class )
90+ .hasMessage ("pattern" );
91+ }
92+
93+ @ Test
94+ public void testSubscribeNull () {
95+ consumer = LcKafkaConsumerBuilder .newBuilder (configs , consumerRecordHandler )
96+ .mockKafkaConsumer (new MockConsumer <>(OffsetResetStrategy .LATEST ))
97+ .buildSync ();
98+ assertThatThrownBy (() -> consumer .subscribe (Arrays .asList ("Topic" , null )))
99+ .isInstanceOf (IllegalArgumentException .class )
100+ .hasMessage ("topic collection to subscribe to cannot contain null or empty topic" );
101+ }
102+
103+ @ Test
104+ public void testSubscribeTopics () {
56105 final MockConsumer <Object , Object > kafkaConsumer = new MockConsumer <>(OffsetResetStrategy .LATEST );
57106 consumer = LcKafkaConsumerBuilder .newBuilder (configs , consumerRecordHandler )
58107 .mockKafkaConsumer (kafkaConsumer )
@@ -63,6 +112,23 @@ public void testSubscribe() {
63112 assertThat (consumer .subscribed ()).isTrue ();
64113 }
65114
115+ @ Test
116+ public void testSubscribePattern () {
117+ final MockConsumer <Object , Object > kafkaConsumer = new MockConsumer <>(OffsetResetStrategy .LATEST );
118+ consumer = LcKafkaConsumerBuilder .newBuilder (configs , consumerRecordHandler )
119+ .mockKafkaConsumer (kafkaConsumer )
120+ .buildSync ();
121+
122+ final List <Integer > partitions = IntStream .range (0 , 30 ).boxed ().collect (toList ());
123+ kafkaConsumer .updateEndOffsets (generateEndOffsets (partitions , 0 ));
124+ kafkaConsumer .updatePartitions (testingTopic , generatePartitionInfos (partitions ));
125+ final Pattern pattern = Pattern .compile ("Test.*" );
126+ consumer .subscribe (pattern );
127+
128+ assertThat (kafkaConsumer .subscription ()).containsExactlyElementsOf (testingTopics );
129+ assertThat (consumer .subscribed ()).isTrue ();
130+ }
131+
66132 @ Test
67133 public void testSubscribedTwice () {
68134 consumer = LcKafkaConsumerBuilder .newBuilder (configs , consumerRecordHandler )
@@ -108,4 +174,19 @@ public void testGracefulShutdownWithoutShutdownWorkerPool() throws Exception {
108174 assertThat (consumer .closed ()).isTrue ();
109175 }
110176
177+ private List <PartitionInfo > generatePartitionInfos (List <Integer > partitions ) {
178+ return partitions
179+ .stream ()
180+ .map (p -> new PartitionInfo (testingTopic , p , null , null , null ))
181+ .collect (toList ());
182+ }
183+
184+ private Map <TopicPartition , Long > generateEndOffsets (List <Integer > partitions , long endOffset ) {
185+ return partitions
186+ .stream ()
187+ .map (p -> new TopicPartition (testingTopic , p ))
188+ .collect (toMap (Function .identity (), (p ) -> endOffset ));
189+
190+
191+ }
111192}
0 commit comments