Skip to content

Commit b2d6518

Browse files
committed
Upgrade to 2.4 clients; bump to 2.4.0.B-S
- Remove references to ZkClient - deprecation of `Consumer.committed(TopicPartition tp)` - Addition of `onPartitionsLost` to `ConsumerRebalanceListener` - since `onPartitionsLost` calls `onPartitionsRevoked` by default with a possible partial list, remove the revoked partitions from the assignments - fix some reflection uses in tests (`groupId` field and `NewTopic`) - fix `EKIT` `listen12` listener - we now always get the full batch - `ConsumerRebalanceListener.onPartitionsRevoked` is no longer called with an empty collection - deprecation of some `TopologyDriver` helper classes - update what's new and change log appendix - add `unregisterSeekCallback` to `ConsumerSeekAware` - `AbstractCSA` now removes TL there since we may have only a partial revoke * Fix what's new * GH-1277: isAckAfterHandle default true Resolves: #1277 * Fix SBFB Javadocs * Update version * Fix test for isAckAfterHandle default true * Upgrade to 2.4.0 Clients * Remove mavenLocal() repo
1 parent a34f48d commit b2d6518

File tree

20 files changed

+351
-253
lines changed

20 files changed

+351
-253
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ ext {
4545
jaywayJsonPathVersion = '2.4.0'
4646
junit4Version = '4.12'
4747
junitJupiterVersion = '5.5.2'
48-
kafkaVersion = '2.3.1'
48+
kafkaVersion = '2.4.0'
4949
log4jVersion = '2.12.1'
5050
micrometerVersion = '1.3.2'
5151
mockitoVersion = '3.0.0'

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=2.3.5.BUILD-SNAPSHOT
1+
version=2.4.0.BUILD-SNAPSHOT

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@
3434
import java.util.concurrent.atomic.AtomicBoolean;
3535
import java.util.stream.Collectors;
3636

37-
import org.I0Itec.zkclient.ZkClient;
38-
import org.I0Itec.zkclient.exception.ZkInterruptedException;
3937
import org.apache.commons.logging.LogFactory;
4038
import org.apache.kafka.clients.admin.AdminClient;
4139
import org.apache.kafka.clients.admin.AdminClientConfig;
@@ -66,7 +64,6 @@
6664
import kafka.server.NotRunning;
6765
import kafka.utils.CoreUtils;
6866
import kafka.utils.TestUtils;
69-
import kafka.utils.ZKStringSerializer$;
7067
import kafka.zk.ZkFourLetterWords;
7168
import kafka.zookeeper.ZooKeeperClient;
7269

@@ -133,8 +130,6 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
133130

134131
private volatile ZooKeeperClient zooKeeperClient;
135132

136-
private volatile ZkClient zkClient;
137-
138133
public EmbeddedKafkaBroker(int count) {
139134
this(count, false);
140135
}
@@ -298,7 +293,8 @@ private Properties createBrokerProperties(int i) {
298293
scala.Option.apply(null),
299294
scala.Option.apply(null),
300295
scala.Option.apply(null),
301-
true, false, 0, false, 0, false, 0, scala.Option.apply(null), 1, false);
296+
true, false, 0, false, 0, false, 0, scala.Option.apply(null), 1, false,
297+
this.partitionsPerTopic, (short) this.count);
302298
}
303299

304300
/**
@@ -395,19 +391,11 @@ public void destroy() {
395391
// do nothing
396392
}
397393
}
398-
try {
399-
synchronized (this) {
400-
if (this.zooKeeperClient != null) {
401-
this.zooKeeperClient.close();
402-
}
403-
if (this.zkClient != null) {
404-
this.zkClient.close();
405-
}
394+
synchronized (this) {
395+
if (this.zooKeeperClient != null) {
396+
this.zooKeeperClient.close();
406397
}
407398
}
408-
catch (ZkInterruptedException e) {
409-
// do nothing
410-
}
411399
try {
412400
this.zookeeper.shutdown();
413401
this.zkConnect = null;
@@ -433,20 +421,6 @@ public EmbeddedZookeeper getZookeeper() {
433421
return this.zookeeper;
434422
}
435423

436-
/**
437-
* Return the ZkClient.
438-
* @return the client.
439-
* @deprecated in favor of {@link #getZooKeeperClient()}.
440-
*/
441-
@Deprecated
442-
public synchronized ZkClient getZkClient() {
443-
if (this.zkClient == null) {
444-
this.zkClient = new ZkClient(this.zkConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
445-
ZKStringSerializer$.MODULE$);
446-
}
447-
return this.zkClient;
448-
}
449-
450424
/**
451425
* Return the ZooKeeperClient.
452426
* @return the client.

spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public StreamsBuilderFactoryBean(KafkaStreamsConfiguration streamsConfig) {
126126
}
127127

128128
/**
129-
* Set {@link StreamsConfig} on this factory.
129+
* Set the streams configuration {@link Properties} on this factory.
130130
* @param streamsConfig the streams configuration.
131131
* @since 2.2
132132
*/

spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaConsumerTemplate.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,22 @@ public Mono<Long> position(TopicPartition partition) {
146146
return doOnConsumer(consumer -> consumer.position(partition));
147147
}
148148

149+
/**
150+
* Get the committed {@link OffsetAndMetadata} for the partition.
151+
* @param partition the partition.
152+
* @return the {@link OffsetAndMetadata}.
153+
* @deprecated in favor of {@link #committed(Set)}.
154+
*/
155+
@Deprecated
156+
@SuppressWarnings("deprecation")
149157
public Mono<OffsetAndMetadata> committed(TopicPartition partition) {
150158
return doOnConsumer(consumer -> consumer.committed(partition));
151159
}
152160

161+
public Mono<Map<TopicPartition, OffsetAndMetadata>> committed(Set<TopicPartition> partitions) {
162+
return doOnConsumer(consumer -> consumer.committed(partitions));
163+
}
164+
153165
public Flux<PartitionInfo> partitionsFromConsumerFor(String topic) {
154166
Mono<List<PartitionInfo>> partitions = doOnConsumer(c -> c.partitionsFor(topic));
155167
return partitions.flatMapIterable(Function.identity());

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
5656
@Override
5757
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
5858
partitions.forEach(tp -> this.callbacks.remove(tp));
59+
}
60+
61+
@Override
62+
public void unregisterSeekCallback() {
5963
this.callbackForThread.remove();
6064
}
6165

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,12 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
465465
getGroupId() + ": partitions assigned: " + partitions);
466466
}
467467

468+
@Override
469+
public void onPartitionsLost(Collection<TopicPartition> partitions) {
470+
AbstractMessageListenerContainer.this.logger.info(() ->
471+
getGroupId() + ": partitions lost: " + partitions);
472+
}
473+
468474
};
469475
}
470476

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareRebalanceListener.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,16 @@ default void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<
5353
// do nothing
5454
}
5555

56+
/**
57+
* The same as {@link #onPartitionsLost(Collection)} with an additional consumer parameter.
58+
* @param consumer the consumer.
59+
* @param partitions the partitions.
60+
* @since 2.4
61+
*/
62+
default void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
63+
// do nothing
64+
}
65+
5666
/**
5767
* The same as {@link #onPartitionsAssigned(Collection)} with the additional consumer
5868
* parameter.
@@ -73,4 +83,9 @@ default void onPartitionsAssigned(Collection<TopicPartition> partitions) {
7383
throw new UnsupportedOperationException("Listener container should never call this");
7484
}
7585

86+
@Override
87+
default void onPartitionsLost(Collection<TopicPartition> partitions) {
88+
throw new UnsupportedOperationException("Listener container should never call this");
89+
}
90+
7691
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,15 @@ default void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeek
7272
// do nothing
7373
}
7474

75+
/**
76+
* Called when the listener consumer terminates allowing implementations to clean up
77+
* state, such as thread locals.
78+
* @since 2.4
79+
*/
80+
default void unregisterSeekCallback() {
81+
// do nothing
82+
}
83+
7584
/**
7685
* A callback that a listener can invoke to seek to a specific offset.
7786
*/

spring-kafka/src/main/java/org/springframework/kafka/listener/GenericErrorHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ default void clearThreadState() {
6363
* @since 2.3.2
6464
*/
6565
default boolean isAckAfterHandle() {
66-
// TODO: Default true in the next release.
67-
return false;
66+
return true;
6867
}
6968

7069
}

0 commit comments

Comments
 (0)