Skip to content

Commit c0bb203

Browse files
committed
refactor: replace deprecated code
Signed-off-by: Chaedong Im <[email protected]>
1 parent df86512 commit c0bb203

File tree

2 files changed

+49
-32
lines changed

2 files changed

+49
-32
lines changed

spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordFilteredTest.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,33 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.time.Duration;
20+
import java.util.ArrayList;
21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
26+
1927
import org.apache.kafka.clients.consumer.Consumer;
2028
import org.apache.kafka.clients.consumer.ConsumerRecord;
2129
import org.apache.kafka.clients.consumer.ConsumerRecords;
2230
import org.apache.kafka.common.TopicPartition;
31+
2332
import org.junit.jupiter.api.Test;
33+
2434
import org.springframework.kafka.core.ConsumerFactory;
2535
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
2636
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
2737

28-
import java.time.Duration;
29-
import java.util.*;
30-
import java.util.concurrent.CountDownLatch;
31-
import java.util.concurrent.TimeUnit;
32-
3338
import static org.assertj.core.api.Assertions.assertThat;
39+
3440
import static org.mockito.ArgumentMatchers.any;
3541
import static org.mockito.BDDMockito.given;
36-
import static org.mockito.Mockito.*;
42+
import static org.mockito.Mockito.times;
43+
import static org.mockito.Mockito.verify;
44+
import static org.mockito.Mockito.mock;
45+
import static org.mockito.Mockito.never;
3746

3847
/**
3948
* Tests for the RECORD_FILTERED acknowledge mode.
@@ -45,7 +54,7 @@
4554
*/
4655
public class AckModeRecordFilteredTest {
4756

48-
@SuppressWarnings("unchecked")
57+
@SuppressWarnings({"unchecked", "deprecation"})
4958
@Test
5059
public void testRecordFilteredModeOnlyCommitsProcessedRecords() throws InterruptedException {
5160
// Given: A container with RECORD_FILTERED ack mode
@@ -88,7 +97,7 @@ public void testRecordFilteredModeOnlyCommitsProcessedRecords() throws Interrupt
8897

8998
given(consumer.poll(any(Duration.class)))
9099
.willReturn(consumerRecords)
91-
.willReturn(new ConsumerRecords<>(Collections.emptyMap()));
100+
.willReturn(ConsumerRecords.empty());
92101

93102
// When: Start the container and process records
94103
container.start();
@@ -102,7 +111,7 @@ public void testRecordFilteredModeOnlyCommitsProcessedRecords() throws Interrupt
102111
verify(consumer, times(2)).commitSync(any(), any(Duration.class));
103112
}
104113

105-
@SuppressWarnings("unchecked")
114+
@SuppressWarnings({"unchecked", "deprecation"})
106115
@Test
107116
public void testRecordFilteredModeWithAllRecordsFiltered() throws InterruptedException {
108117
// Given: All records are filtered
@@ -139,7 +148,7 @@ public void testRecordFilteredModeWithAllRecordsFiltered() throws InterruptedExc
139148

140149
given(consumer.poll(any(Duration.class)))
141150
.willReturn(consumerRecords)
142-
.willReturn(new ConsumerRecords<>(Collections.emptyMap()));
151+
.willReturn(ConsumerRecords.empty());
143152

144153
// When
145154
container.start();
@@ -150,7 +159,7 @@ public void testRecordFilteredModeWithAllRecordsFiltered() throws InterruptedExc
150159
verify(consumer, never()).commitSync(any(), any(Duration.class));
151160
}
152161

153-
@SuppressWarnings("unchecked")
162+
@SuppressWarnings({"unchecked", "deprecation"})
154163
@Test
155164
public void testRecordFilteredModeWithMixedPartitions() throws InterruptedException {
156165
// Given: Mixed partitions with different filtering scenarios
@@ -201,7 +210,7 @@ public void testRecordFilteredModeWithMixedPartitions() throws InterruptedExcept
201210

202211
given(consumer.poll(any(Duration.class)))
203212
.willReturn(consumerRecords)
204-
.willReturn(new ConsumerRecords<>(Collections.emptyMap()));
213+
.willReturn(ConsumerRecords.empty());
205214

206215
// When
207216
container.start();
@@ -213,7 +222,7 @@ public void testRecordFilteredModeWithMixedPartitions() throws InterruptedExcept
213222
verify(consumer, times(3)).commitSync(any(), any(Duration.class));
214223
}
215224

216-
@SuppressWarnings("unchecked")
225+
@SuppressWarnings({"unchecked", "deprecation"})
217226
@Test
218227
public void testRecordFilteredModeEfficiencyGains() throws InterruptedException {
219228
ConsumerFactory<String, String> consumerFactory = mock(ConsumerFactory.class);
@@ -253,7 +262,7 @@ public void testRecordFilteredModeEfficiencyGains() throws InterruptedException
253262

254263
given(consumer.poll(any(Duration.class)))
255264
.willReturn(consumerRecords)
256-
.willReturn(new ConsumerRecords<>(Collections.emptyMap()));
265+
.willReturn(ConsumerRecords.empty());
257266

258267
// When
259268
container.start();
@@ -266,7 +275,7 @@ public void testRecordFilteredModeEfficiencyGains() throws InterruptedException
266275
verify(consumer, times(1)).commitSync(any(), any(Duration.class));
267276
}
268277

269-
@SuppressWarnings("unchecked")
278+
@SuppressWarnings({"unchecked", "deprecation"})
270279
@Test
271280
public void testRecordFilteredModeDoesNotBreakNormalProcessing() throws InterruptedException {
272281
ConsumerFactory<String, String> consumerFactory = mock(ConsumerFactory.class);
@@ -307,7 +316,7 @@ public void testRecordFilteredModeDoesNotBreakNormalProcessing() throws Interrup
307316

308317
given(consumer.poll(any(Duration.class)))
309318
.willReturn(consumerRecords)
310-
.willReturn(new ConsumerRecords<>(Collections.emptyMap()));
319+
.willReturn(ConsumerRecords.empty());
311320

312321
// When
313322
container.start();

spring-kafka/src/test/java/org/springframework/kafka/listener/AckModeRecordWithFilteringTest.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,33 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.time.Duration;
20+
import java.util.ArrayList;
21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
26+
1927
import org.apache.kafka.clients.consumer.Consumer;
2028
import org.apache.kafka.clients.consumer.ConsumerRecord;
2129
import org.apache.kafka.clients.consumer.ConsumerRecords;
2230
import org.apache.kafka.common.TopicPartition;
31+
2332
import org.junit.jupiter.api.Test;
33+
2434
import org.springframework.kafka.core.ConsumerFactory;
2535
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
2636
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
2737

28-
import java.time.Duration;
29-
import java.util.*;
30-
import java.util.concurrent.CountDownLatch;
31-
import java.util.concurrent.TimeUnit;
32-
3338
import static org.assertj.core.api.Assertions.assertThat;
3439
import static org.mockito.ArgumentMatchers.any;
3540
import static org.mockito.ArgumentMatchers.anyMap;
3641
import static org.mockito.BDDMockito.given;
37-
import static org.mockito.Mockito.*;
42+
43+
import static org.mockito.Mockito.mock;
44+
import static org.mockito.Mockito.verify;
45+
import static org.mockito.Mockito.times;
3846

3947
/**
4048
* Tests to verify the behavior of RECORD acknowledge mode when used with filtering strategies.
@@ -46,7 +54,7 @@
4654
*/
4755
public class AckModeRecordWithFilteringTest {
4856

49-
@SuppressWarnings("unchecked")
57+
@SuppressWarnings({"unchecked", "deprecation"})
5058
@Test
5159
public void testCurrentRecordModeCommitsAllRecords() throws InterruptedException {
5260
// Given: A container with RECORD ack mode and a filter that filters out even offsets
@@ -89,7 +97,7 @@ public void testCurrentRecordModeCommitsAllRecords() throws InterruptedException
8997

9098
given(consumer.poll(any(Duration.class)))
9199
.willReturn(consumerRecords)
92-
.willReturn(new ConsumerRecords<>(Collections.emptyMap()));
100+
.willReturn(ConsumerRecords.empty());
93101

94102
// When: Start the container and process records
95103
container.start();
@@ -103,7 +111,7 @@ public void testCurrentRecordModeCommitsAllRecords() throws InterruptedException
103111
verify(consumer, times(4)).commitSync(any(), any(Duration.class));
104112
}
105113

106-
@SuppressWarnings("unchecked")
114+
@SuppressWarnings({"unchecked", "deprecation"})
107115
@Test
108116
public void testAllRecordsFilteredStillCommits() throws InterruptedException {
109117
// Given: A container where all records are filtered
@@ -139,7 +147,7 @@ public void testAllRecordsFilteredStillCommits() throws InterruptedException {
139147

140148
given(consumer.poll(any(Duration.class)))
141149
.willReturn(consumerRecords)
142-
.willReturn(new ConsumerRecords<>(Collections.emptyMap()));
150+
.willReturn(ConsumerRecords.empty());
143151

144152
// When: Start the container
145153
container.start();
@@ -151,7 +159,7 @@ public void testAllRecordsFilteredStillCommits() throws InterruptedException {
151159
verify(consumer, times(2)).commitSync(any(), any(Duration.class));
152160
}
153161

154-
@SuppressWarnings("unchecked")
162+
@SuppressWarnings({"unchecked", "deprecation"})
155163
@Test
156164
public void testMixedPartitionsWithFiltering() throws InterruptedException {
157165
// Given: Multiple partitions with different records
@@ -201,7 +209,7 @@ record -> record.value().contains("skip");
201209

202210
given(consumer.poll(any(Duration.class)))
203211
.willReturn(consumerRecords)
204-
.willReturn(new ConsumerRecords<>(Collections.emptyMap()));
212+
.willReturn(ConsumerRecords.empty());
205213

206214
// When: Start container
207215
container.start();
@@ -215,7 +223,7 @@ record -> record.value().contains("skip");
215223
verify(consumer, times(5)).commitSync(any(), any(Duration.class));
216224
}
217225

218-
@SuppressWarnings("unchecked")
226+
@SuppressWarnings({"unchecked", "deprecation"})
219227
@Test
220228
public void testCommitLogging() throws InterruptedException {
221229
ConsumerFactory<String, String> consumerFactory = mock(ConsumerFactory.class);
@@ -251,7 +259,7 @@ public void testCommitLogging() throws InterruptedException {
251259

252260
given(consumer.poll(any(Duration.class)))
253261
.willReturn(consumerRecords)
254-
.willReturn(new ConsumerRecords<>(Collections.emptyMap()));
262+
.willReturn(ConsumerRecords.empty());
255263

256264
// When
257265
container.start();
@@ -262,7 +270,7 @@ public void testCommitLogging() throws InterruptedException {
262270
verify(consumer, times(2)).commitSync(anyMap(), any(Duration.class));
263271
}
264272

265-
@SuppressWarnings("unchecked")
273+
@SuppressWarnings({"unchecked", "deprecation"})
266274
@Test
267275
public void testAckDiscardedParameterBehavior() throws InterruptedException {
268276
ConsumerFactory<String, String> consumerFactory = mock(ConsumerFactory.class);
@@ -303,7 +311,7 @@ public void testAckDiscardedParameterBehavior() throws InterruptedException {
303311

304312
given(consumer.poll(any(Duration.class)))
305313
.willReturn(consumerRecords)
306-
.willReturn(new ConsumerRecords<>(Collections.emptyMap()));
314+
.willReturn(ConsumerRecords.empty());
307315

308316
container.start();
309317
assertThat(processedLatch.await(5, TimeUnit.SECONDS)).isTrue();

0 commit comments

Comments
 (0)