1+ package cn .leancloud .kafka .consumer ;
2+
3+ import org .apache .kafka .clients .consumer .ConsumerRecord ;
4+ import org .apache .kafka .clients .consumer .MockConsumer ;
5+ import org .apache .kafka .clients .consumer .OffsetResetStrategy ;
6+ import org .apache .kafka .common .TopicPartition ;
7+ import org .junit .After ;
8+ import org .junit .Before ;
9+ import org .junit .Test ;
10+
11+ import java .util .List ;
12+ import java .util .stream .IntStream ;
13+
14+ import static cn .leancloud .kafka .consumer .TestingUtils .*;
15+ import static java .util .stream .Collectors .toList ;
16+ import static org .assertj .core .api .Assertions .assertThat ;
17+
18+ public class AutoCommitPolicyTest {
19+ private MockConsumer <Object , Object > consumer ;
20+ private AutoCommitPolicy <Object , Object > policy ;
21+
22+ @ Before
23+ public void setUp () {
24+ consumer = new MockConsumer <>(OffsetResetStrategy .LATEST );
25+ policy = new AutoCommitPolicy <>(consumer );
26+ }
27+
28+ @ After
29+ public void tearDown () {
30+ consumer .close ();
31+ }
32+
33+ @ Test
34+ public void testNoCompleteRecords () {
35+ final List <TopicPartition > partitions = toPartitions (IntStream .range (0 , 30 ).boxed ().collect (toList ()));
36+ preparePendingRecords (partitions , 1 );
37+ assertThat (policy .tryCommit (true )).isEmpty ();
38+ for (TopicPartition partition : partitions ) {
39+ assertThat (consumer .committed (partition )).isNull ();
40+ }
41+ }
42+
43+ @ Test
44+ public void testPartialComplete () {
45+ final List <TopicPartition > partitions = toPartitions (IntStream .range (0 , 30 ).boxed ().collect (toList ()));
46+ final List <ConsumerRecord <Object , Object >> pendingRecords = preparePendingRecords (partitions , 2 );
47+ // two records for each partitions
48+ for (ConsumerRecord <Object , Object > record : pendingRecords ) {
49+ policy .addPendingRecord (record );
50+ }
51+
52+ // complete the first half of the partitions
53+ for (ConsumerRecord <Object , Object > record : pendingRecords ) {
54+ if (record .partition () < partitions .size () / 2 && record .offset () < 3 ) {
55+ policy .completeRecord (record );
56+ }
57+ }
58+
59+ assertThat (policy .tryCommit (false ))
60+ .hasSize (partitions .size () / 2 )
61+ .containsExactlyInAnyOrderElementsOf (partitions .subList (0 , partitions .size () / 2 ));
62+ for (TopicPartition partition : partitions ) {
63+ // first half of the partitions is completed and cleaned
64+ // second half of the partitions is not completed and the topic offset mark is still there
65+ if (partition .partition () < partitions .size () / 2 ) {
66+ assertThat (policy .topicOffsetHighWaterMark ().get (partition )).isNull ();
67+ } else {
68+ assertThat (policy .topicOffsetHighWaterMark ().get (partition )).isEqualTo (3 );
69+ }
70+ }
71+
72+ assertThat (policy .completedTopicOffsets ()).isEmpty ();
73+ }
74+
75+ private List <ConsumerRecord <Object , Object >> preparePendingRecords (List <TopicPartition > partitions , int size ) {
76+ final List <ConsumerRecord <Object , Object >> pendingRecords = prepareConsumerRecords (partitions , 1 , size );
77+ assignPartitions (consumer , partitions , 0L );
78+ fireConsumerRecords (consumer , pendingRecords );
79+ consumer .poll (0 );
80+ return pendingRecords ;
81+ }
82+
83+ }
0 commit comments