@@ -1863,6 +1863,71 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
18631863 container .stop ();
18641864 }
18651865
1866+ @ SuppressWarnings ({ "unchecked" , "rawtypes" })
1867+ @ Test
1868+ public void testAckModeCount () throws Exception {
1869+ ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
1870+ Consumer <Integer , String > consumer = mock (Consumer .class );
1871+ given (cf .createConsumer (isNull (), eq ("clientId" ), isNull ())).willReturn (consumer );
1872+ TopicPartition topicPartition = new TopicPartition ("foo" , 0 );
1873+ final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records1 = new HashMap <>();
1874+ records1 .put (topicPartition , Arrays .asList (
1875+ new ConsumerRecord <>("foo" , 0 , 0L , 1 , "foo" ),
1876+ new ConsumerRecord <>("foo" , 0 , 1L , 1 , "bar" )));
1877+ final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records2 = new HashMap <>();
1878+ records2 .put (topicPartition , Arrays .asList (
1879+ new ConsumerRecord <>("foo" , 0 , 2L , 1 , "baz" ),
1880+ new ConsumerRecord <>("foo" , 0 , 3L , 1 , "qux" ))); // commit (4 >= 3)
1881+ final Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records3 = new HashMap <>();
1882+ records3 .put (topicPartition , Arrays .asList (
1883+ new ConsumerRecord <>("foo" , 0 , 4L , 1 , "fiz" ),
1884+ new ConsumerRecord <>("foo" , 0 , 5L , 1 , "buz" ),
1885+ new ConsumerRecord <>("foo" , 0 , 6L , 1 , "bif" ))); // commit (3 >= 3)
1886+ ConsumerRecords <Integer , String > consumerRecords1 = new ConsumerRecords <>(records1 );
1887+ ConsumerRecords <Integer , String > consumerRecords2 = new ConsumerRecords <>(records2 );
1888+ ConsumerRecords <Integer , String > consumerRecords3 = new ConsumerRecords <>(records3 );
1889+ ConsumerRecords <Integer , String > emptyRecords = new ConsumerRecords <>(Collections .emptyMap ());
1890+ AtomicInteger which = new AtomicInteger ();
1891+ given (consumer .poll (anyLong ())).willAnswer (i -> {
1892+ Thread .sleep (50 );
1893+ int recordsToUse = which .incrementAndGet ();
1894+ switch (recordsToUse ) {
1895+ case 1 :
1896+ return consumerRecords1 ;
1897+ case 2 :
1898+ return consumerRecords2 ;
1899+ case 3 :
1900+ return consumerRecords3 ;
1901+ default :
1902+ return emptyRecords ;
1903+ }
1904+ });
1905+ final CountDownLatch commitLatch = new CountDownLatch (2 );
1906+ willAnswer (i -> {
1907+ commitLatch .countDown ();
1908+ return null ;
1909+ }).given (consumer ).commitSync (any (Map .class ));
1910+ given (consumer .assignment ()).willReturn (records1 .keySet ());
1911+ TopicPartitionInitialOffset [] topicPartitionOffset = new TopicPartitionInitialOffset [] {
1912+ new TopicPartitionInitialOffset ("foo" , 0 ) };
1913+ ContainerProperties containerProps = new ContainerProperties (topicPartitionOffset );
1914+ containerProps .setAckMode (AckMode .COUNT );
1915+ containerProps .setAckCount (3 );
1916+ containerProps .setClientId ("clientId" );
1917+ AtomicInteger recordCount = new AtomicInteger ();
1918+ containerProps .setMessageListener ((MessageListener ) r -> {
1919+ recordCount .incrementAndGet ();
1920+ });
1921+ KafkaMessageListenerContainer <Integer , String > container =
1922+ new KafkaMessageListenerContainer <>(cf , containerProps );
1923+ container .start ();
1924+ assertThat (commitLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
1925+ assertThat (recordCount .get ()).isEqualTo (7 );
1926+ verify (consumer ).commitSync (Collections .singletonMap (topicPartition , new OffsetAndMetadata (4L )));
1927+ verify (consumer ).commitSync (Collections .singletonMap (topicPartition , new OffsetAndMetadata (7L )));
1928+ container .stop ();
1929+ }
1930+
18661931 private Consumer <?, ?> spyOnConsumer (KafkaMessageListenerContainer <Integer , String > container ) {
18671932 Consumer <?, ?> consumer = spy (
18681933 KafkaTestUtils .getPropertyValue (container , "listenerConsumer.consumer" , Consumer .class ));
0 commit comments