|
16 | 16 |
|
17 | 17 | package org.springframework.kafka.listener; |
18 | 18 |
|
| 19 | +import java.time.Duration; |
| 20 | +import java.util.*; |
| 21 | +import java.util.concurrent.CountDownLatch; |
| 22 | +import java.util.concurrent.TimeUnit; |
| 23 | + |
19 | 24 | import org.apache.kafka.clients.consumer.Consumer; |
20 | 25 | import org.apache.kafka.clients.consumer.ConsumerRecord; |
21 | 26 | import org.apache.kafka.clients.consumer.ConsumerRecords; |
|
25 | 30 | import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter; |
26 | 31 | import org.springframework.kafka.listener.adapter.RecordFilterStrategy; |
27 | 32 |
|
28 | | -import java.time.Duration; |
29 | | -import java.util.*; |
30 | | -import java.util.concurrent.CountDownLatch; |
31 | | -import java.util.concurrent.TimeUnit; |
32 | | - |
33 | 33 | import static org.assertj.core.api.Assertions.assertThat; |
34 | 34 | import static org.mockito.ArgumentMatchers.any; |
35 | 35 | import static org.mockito.ArgumentMatchers.anyMap; |
@@ -85,6 +85,8 @@ public void testCurrentRecordModeCommitsAllRecords() throws InterruptedException |
85 | 85 |
|
86 | 86 | Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>(); |
87 | 87 | recordsMap.put(tp, records); |
| 88 | + |
| 89 | + @SuppressWarnings("deprecation") |
88 | 90 | ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap); |
89 | 91 |
|
90 | 92 | given(consumer.poll(any(Duration.class))) |
@@ -135,6 +137,8 @@ public void testAllRecordsFilteredStillCommits() throws InterruptedException { |
135 | 137 |
|
136 | 138 | Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>(); |
137 | 139 | recordsMap.put(tp, records); |
| 140 | + |
| 141 | + @SuppressWarnings("deprecation") |
138 | 142 | ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap); |
139 | 143 |
|
140 | 144 | given(consumer.poll(any(Duration.class))) |
@@ -197,6 +201,8 @@ record -> record.value().contains("skip"); |
197 | 201 | Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>(); |
198 | 202 | recordsMap.put(tp0, records.subList(0, 2)); |
199 | 203 | recordsMap.put(tp1, records.subList(2, 5)); |
| 204 | + |
| 205 | + @SuppressWarnings("deprecation") |
200 | 206 | ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap); |
201 | 207 |
|
202 | 208 | given(consumer.poll(any(Duration.class))) |
@@ -247,6 +253,8 @@ public void testCommitLogging() throws InterruptedException { |
247 | 253 |
|
248 | 254 | Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>(); |
249 | 255 | recordsMap.put(tp, records); |
| 256 | + |
| 257 | + @SuppressWarnings("deprecation") |
250 | 258 | ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap); |
251 | 259 |
|
252 | 260 | given(consumer.poll(any(Duration.class))) |
@@ -299,6 +307,8 @@ public void testAckDiscardedParameterBehavior() throws InterruptedException { |
299 | 307 |
|
300 | 308 | Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>(); |
301 | 309 | recordsMap.put(tp, records); |
| 310 | + |
| 311 | + @SuppressWarnings("deprecation") |
302 | 312 | ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap); |
303 | 313 |
|
304 | 314 | given(consumer.poll(any(Duration.class))) |
|
0 commit comments