Skip to content

Commit 42b1b2f

Browse files
committed
Put back "Replace ListenableFuture with CompletableFuture"
Related to a165247.
1 parent bd25df6 commit 42b1b2f

File tree

3 files changed

+23
-24
lines changed

3 files changed

+23
-24
lines changed

spring-batch-infrastructure/src/main/java/org/springframework/batch/item/kafka/KafkaItemWriter.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,10 +21,10 @@
2121
import org.springframework.kafka.core.KafkaTemplate;
2222
import org.springframework.kafka.support.SendResult;
2323
import org.springframework.util.Assert;
24-
import org.springframework.util.concurrent.ListenableFuture;
2524

2625
import java.util.ArrayList;
2726
import java.util.List;
27+
import java.util.concurrent.CompletableFuture;
2828
import java.util.concurrent.TimeUnit;
2929

3030
/**
@@ -42,32 +42,32 @@ public class KafkaItemWriter<K, T> extends KeyValueItemWriter<K, T> {
4242

4343
protected KafkaTemplate<K, T> kafkaTemplate;
4444

45-
private final List<ListenableFuture<SendResult<K, T>>> listenableFutures = new ArrayList<>();
45+
private final List<CompletableFuture<SendResult<K, T>>> completableFutures = new ArrayList<>();
4646

4747
private long timeout = -1;
4848

4949
@Override
5050
protected void writeKeyValue(K key, T value) {
5151
if (this.delete) {
52-
this.listenableFutures.add(this.kafkaTemplate.sendDefault(key, null));
52+
this.completableFutures.add(this.kafkaTemplate.sendDefault(key, null));
5353
}
5454
else {
55-
this.listenableFutures.add(this.kafkaTemplate.sendDefault(key, value));
55+
this.completableFutures.add(this.kafkaTemplate.sendDefault(key, value));
5656
}
5757
}
5858

5959
@Override
6060
protected void flush() throws Exception {
6161
this.kafkaTemplate.flush();
62-
for (ListenableFuture<SendResult<K, T>> future : this.listenableFutures) {
62+
for (var future : this.completableFutures) {
6363
if (this.timeout >= 0) {
6464
future.get(this.timeout, TimeUnit.MILLISECONDS);
6565
}
6666
else {
6767
future.get();
6868
}
6969
}
70-
this.listenableFutures.clear();
70+
this.completableFutures.clear();
7171
}
7272

7373
@Override

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemReaderTests.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.Map;
2424
import java.util.Properties;
25+
import java.util.concurrent.CompletableFuture;
2526
import java.util.concurrent.ExecutionException;
2627

2728
import org.apache.kafka.clients.admin.NewTopic;
@@ -39,12 +40,10 @@
3940
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4041
import org.springframework.kafka.core.KafkaTemplate;
4142
import org.springframework.kafka.core.ProducerFactory;
42-
import org.springframework.kafka.support.SendResult;
4343
import org.springframework.kafka.test.EmbeddedKafkaBroker;
4444
import org.springframework.kafka.test.context.EmbeddedKafka;
4545
import org.springframework.kafka.test.utils.KafkaTestUtils;
4646
import org.springframework.test.context.junit.jupiter.SpringExtension;
47-
import org.springframework.util.concurrent.ListenableFuture;
4847

4948
import static org.hamcrest.MatcherAssert.assertThat;
5049
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -143,12 +142,12 @@ void testValidation() {
143142
@Test
144143
void testReadFromSinglePartition() throws ExecutionException, InterruptedException {
145144
this.template.setDefaultTopic("topic1");
146-
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
145+
var futures = new ArrayList<CompletableFuture<?>>();
147146
futures.add(this.template.sendDefault("val0"));
148147
futures.add(this.template.sendDefault("val1"));
149148
futures.add(this.template.sendDefault("val2"));
150149
futures.add(this.template.sendDefault("val3"));
151-
for (ListenableFuture<SendResult<String, String>> future : futures) {
150+
for (var future : futures) {
152151
future.get();
153152
}
154153

@@ -177,12 +176,12 @@ void testReadFromSinglePartition() throws ExecutionException, InterruptedExcepti
177176
@Test
178177
void testReadFromSinglePartitionFromCustomOffset() throws ExecutionException, InterruptedException {
179178
this.template.setDefaultTopic("topic5");
180-
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
179+
var futures = new ArrayList<CompletableFuture<?>>();
181180
futures.add(this.template.sendDefault("val0")); // <-- offset 0
182181
futures.add(this.template.sendDefault("val1")); // <-- offset 1
183182
futures.add(this.template.sendDefault("val2")); // <-- offset 2
184183
futures.add(this.template.sendDefault("val3")); // <-- offset 3
185-
for (ListenableFuture<SendResult<String, String>> future : futures) {
184+
for (var future : futures) {
186185
future.get();
187186
}
188187

@@ -213,10 +212,10 @@ void testReadFromSinglePartitionFromTheOffsetStoredInKafka() throws Exception {
213212
// first run: read a topic from the beginning
214213

215214
this.template.setDefaultTopic("topic6");
216-
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
215+
var futures = new ArrayList<CompletableFuture<?>>();
217216
futures.add(this.template.sendDefault("val0")); // <-- offset 0
218217
futures.add(this.template.sendDefault("val1")); // <-- offset 1
219-
for (ListenableFuture<SendResult<String, String>> future : futures) {
218+
for (var future : futures) {
220219
future.get();
221220
}
222221
this.reader = new KafkaItemReader<>(this.consumerProperties, "topic6", 0);
@@ -267,12 +266,12 @@ void testReadFromSinglePartitionFromTheOffsetStoredInKafka() throws Exception {
267266
@Test
268267
void testReadFromMultiplePartitions() throws ExecutionException, InterruptedException {
269268
this.template.setDefaultTopic("topic2");
270-
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
269+
var futures = new ArrayList<CompletableFuture<?>>();
271270
futures.add(this.template.sendDefault("val0"));
272271
futures.add(this.template.sendDefault("val1"));
273272
futures.add(this.template.sendDefault("val2"));
274273
futures.add(this.template.sendDefault("val3"));
275-
for (ListenableFuture<SendResult<String, String>> future : futures) {
274+
for (var future : futures) {
276275
future.get();
277276
}
278277

@@ -295,13 +294,13 @@ void testReadFromMultiplePartitions() throws ExecutionException, InterruptedExce
295294
@Test
296295
void testReadFromSinglePartitionAfterRestart() throws ExecutionException, InterruptedException {
297296
this.template.setDefaultTopic("topic3");
298-
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
297+
var futures = new ArrayList<CompletableFuture<?>>();
299298
futures.add(this.template.sendDefault("val0"));
300299
futures.add(this.template.sendDefault("val1"));
301300
futures.add(this.template.sendDefault("val2"));
302301
futures.add(this.template.sendDefault("val3"));
303302
futures.add(this.template.sendDefault("val4"));
304-
for (ListenableFuture<SendResult<String, String>> future : futures) {
303+
for (var future : futures) {
305304
future.get();
306305
}
307306
ExecutionContext executionContext = new ExecutionContext();
@@ -331,7 +330,7 @@ void testReadFromSinglePartitionAfterRestart() throws ExecutionException, Interr
331330

332331
@Test
333332
void testReadFromMultiplePartitionsAfterRestart() throws ExecutionException, InterruptedException {
334-
List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
333+
var futures = new ArrayList<CompletableFuture<?>>();
335334
futures.add(this.template.send("topic4", 0, null, "val0"));
336335
futures.add(this.template.send("topic4", 0, null, "val2"));
337336
futures.add(this.template.send("topic4", 0, null, "val4"));
@@ -341,7 +340,7 @@ void testReadFromMultiplePartitionsAfterRestart() throws ExecutionException, Int
341340
futures.add(this.template.send("topic4", 1, null, "val5"));
342341
futures.add(this.template.send("topic4", 1, null, "val7"));
343342

344-
for (ListenableFuture<?> future : futures) {
343+
for (var future : futures) {
345344
future.get();
346345
}
347346

spring-batch-infrastructure/src/test/java/org/springframework/batch/item/kafka/KafkaItemWriterTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717

1818
import java.util.Arrays;
1919
import java.util.List;
20+
import java.util.concurrent.CompletableFuture;
2021
import java.util.concurrent.TimeUnit;
2122

2223
import org.junit.jupiter.api.BeforeEach;
@@ -29,7 +30,6 @@
2930
import org.springframework.core.convert.converter.Converter;
3031
import org.springframework.kafka.core.KafkaTemplate;
3132
import org.springframework.kafka.support.SendResult;
32-
import org.springframework.util.concurrent.ListenableFuture;
3333

3434
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
3535
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -46,7 +46,7 @@ class KafkaItemWriterTests {
4646
private KafkaTemplate<String, String> kafkaTemplate;
4747

4848
@Mock
49-
private ListenableFuture<SendResult<String, String>> future;
49+
private CompletableFuture<SendResult<String, String>> future;
5050

5151
private KafkaItemKeyMapper itemKeyMapper;
5252

0 commit comments

Comments
 (0)