Skip to content

Commit 82b816d

Browse files
garyrussellartembilan
authored andcommitted
@NonNullApi for core package
Also use a single `Duration` object for polling.
1 parent f15bf08 commit 82b816d

File tree

7 files changed

+36
-23
lines changed

7 files changed

+36
-23
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/ConsumerFactory.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.kafka.clients.consumer.Consumer;
2222
import org.apache.kafka.common.serialization.Deserializer;
2323

24+
import org.springframework.lang.Nullable;
25+
2426
/**
2527
* The strategy to produce a {@link Consumer} instance(s).
2628
*
@@ -47,7 +49,7 @@ default Consumer<K, V> createConsumer() {
4749
* @return the consumer.
4850
* @since 1.3
4951
*/
50-
default Consumer<K, V> createConsumer(String clientIdSuffix) {
52+
default Consumer<K, V> createConsumer(@Nullable String clientIdSuffix) {
5153
return createConsumer(null, clientIdSuffix);
5254
}
5355

@@ -60,7 +62,7 @@ default Consumer<K, V> createConsumer(String clientIdSuffix) {
6062
* @return the consumer.
6163
* @since 1.3
6264
*/
63-
default Consumer<K, V> createConsumer(String groupId, String clientIdSuffix) {
65+
default Consumer<K, V> createConsumer(@Nullable String groupId, @Nullable String clientIdSuffix) {
6466
return createConsumer(groupId, null, clientIdSuffix);
6567
}
6668

@@ -76,7 +78,8 @@ default Consumer<K, V> createConsumer(String groupId, String clientIdSuffix) {
7678
* @return the consumer.
7779
* @since 2.1.1
7880
*/
79-
Consumer<K, V> createConsumer(String groupId, String clientIdPrefix, String clientIdSuffix);
81+
Consumer<K, V> createConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
82+
@Nullable String clientIdSuffix);
8083

8184
/**
8285
* Return true if consumers created by this factory use auto commit.

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,11 @@ public DefaultKafkaConsumerFactory(Map<String, Object> configs,
7171
this.valueDeserializer = valueDeserializer;
7272
}
7373

74-
public void setKeyDeserializer(Deserializer<K> keyDeserializer) {
74+
public void setKeyDeserializer(@Nullable Deserializer<K> keyDeserializer) {
7575
this.keyDeserializer = keyDeserializer;
7676
}
7777

78-
public void setValueDeserializer(Deserializer<V> valueDeserializer) {
78+
public void setValueDeserializer(@Nullable Deserializer<V> valueDeserializer) {
7979
this.valueDeserializer = valueDeserializer;
8080
}
8181

@@ -95,12 +95,14 @@ public Deserializer<V> getValueDeserializer() {
9595
}
9696

9797
@Override
98-
public Consumer<K, V> createConsumer(String groupId, String clientIdPrefix, String clientIdSuffix) {
98+
public Consumer<K, V> createConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
99+
@Nullable String clientIdSuffix) {
100+
99101
return createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffix);
100102
}
101103

102-
protected KafkaConsumer<K, V> createKafkaConsumer(String groupId, String clientIdPrefix,
103-
String clientIdSuffix) {
104+
protected KafkaConsumer<K, V> createKafkaConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
105+
@Nullable String clientIdSuffix) {
104106

105107
boolean overrideClientIdPrefix = StringUtils.hasText(clientIdPrefix);
106108
if (clientIdSuffix == null) {

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,11 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
116116
this.valueSerializer = valueSerializer;
117117
}
118118

119-
public void setKeySerializer(Serializer<K> keySerializer) {
119+
public void setKeySerializer(@Nullable Serializer<K> keySerializer) {
120120
this.keySerializer = keySerializer;
121121
}
122122

123-
public void setValueSerializer(Serializer<V> valueSerializer) {
123+
public void setValueSerializer(@Nullable Serializer<V> valueSerializer) {
124124
this.valueSerializer = valueSerializer;
125125
}
126126

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.springframework.kafka.support.converter.MessageConverter;
3939
import org.springframework.kafka.support.converter.MessagingMessageConverter;
4040
import org.springframework.kafka.support.converter.RecordMessageConverter;
41+
import org.springframework.lang.Nullable;
4142
import org.springframework.messaging.Message;
4243
import org.springframework.transaction.support.TransactionSynchronizationManager;
4344
import org.springframework.util.Assert;
@@ -126,7 +127,7 @@ public void setDefaultTopic(String defaultTopic) {
126127
* which logs errors only.
127128
* @param producerListener the listener; may be {@code null}.
128129
*/
129-
public void setProducerListener(ProducerListener<K, V> producerListener) {
130+
public void setProducerListener(@Nullable ProducerListener<K, V> producerListener) {
130131
this.producerListener = producerListener;
131132
}
132133

@@ -143,6 +144,7 @@ public MessageConverter getMessageConverter() {
143144
* @param messageConverter the message converter.
144145
*/
145146
public void setMessageConverter(RecordMessageConverter messageConverter) {
147+
Assert.notNull(messageConverter, "'messageConverter' cannot be null");
146148
this.messageConverter = messageConverter;
147149
}
148150

@@ -157,50 +159,51 @@ public boolean isTransactional() {
157159
}
158160

159161
@Override
160-
public ListenableFuture<SendResult<K, V>> sendDefault(V data) {
162+
public ListenableFuture<SendResult<K, V>> sendDefault(@Nullable V data) {
161163
return send(this.defaultTopic, data);
162164
}
163165

164166
@Override
165-
public ListenableFuture<SendResult<K, V>> sendDefault(K key, V data) {
167+
public ListenableFuture<SendResult<K, V>> sendDefault(K key, @Nullable V data) {
166168
return send(this.defaultTopic, key, data);
167169
}
168170

169171
@Override
170-
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data) {
172+
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, @Nullable V data) {
171173
return send(this.defaultTopic, partition, key, data);
172174
}
173175

174176
@Override
175-
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data) {
177+
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, @Nullable V data) {
176178
return send(this.defaultTopic, partition, timestamp, key, data);
177179
}
178180

179181
@Override
180-
public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
182+
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
181183
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
182184
return doSend(producerRecord);
183185
}
184186

185187
@Override
186-
public ListenableFuture<SendResult<K, V>> send(String topic, K key, V data) {
188+
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
187189
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
188190
return doSend(producerRecord);
189191
}
190192

191193
@Override
192-
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data) {
194+
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
193195
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, key, data);
194196
return doSend(producerRecord);
195197
}
196198

197199
@Override
198-
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data) {
200+
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,
201+
@Nullable V data) {
202+
199203
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, timestamp, key, data);
200204
return doSend(producerRecord);
201205
}
202206

203-
204207
@Override
205208
public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
206209
return doSend(record);
@@ -244,6 +247,7 @@ public List<PartitionInfo> partitionsFor(String topic) {
244247

245248
@Override
246249
public <T> T execute(ProducerCallback<K, V, T> callback) {
250+
Assert.notNull(callback, "'callback' cannot be null");
247251
Producer<K, V> producer = getTheProducer();
248252
try {
249253
return callback.doInKafka(producer);
@@ -255,6 +259,7 @@ public <T> T execute(ProducerCallback<K, V, T> callback) {
255259

256260
@Override
257261
public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
262+
Assert.notNull(callback, "'callback' cannot be null");
258263
Assert.state(this.transactional, "Producer factory does not support transactions");
259264
Producer<K, V> producer = this.producers.get();
260265
Assert.state(producer == null, "Nested calls to 'executeInTransaction' are not allowed");

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.clients.producer.Producer;
2020

21+
import org.springframework.lang.Nullable;
2122
import org.springframework.transaction.support.ResourceHolderSynchronization;
2223
import org.springframework.transaction.support.TransactionSynchronization;
2324
import org.springframework.transaction.support.TransactionSynchronizationManager;
@@ -72,7 +73,7 @@ public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
7273
return resourceHolder;
7374
}
7475

75-
public static <K, V> void releaseResources(KafkaResourceHolder<K, V> resourceHolder) {
76+
public static <K, V> void releaseResources(@Nullable KafkaResourceHolder<K, V> resourceHolder) {
7677
if (resourceHolder != null) {
7778
resourceHolder.getProducer().close();
7879
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/**
22
* Package for kafka core components
33
*/
4+
@org.springframework.lang.NonNullApi
45
package org.springframework.kafka.core;

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
406406
private final LogIfLevelEnabled commitLogger = new LogIfLevelEnabled(this.logger,
407407
this.containerProperties.getCommitLogLevel());
408408

409+
private final Duration pollTimeout = Duration.ofMillis(this.containerProperties.getPollTimeout());
410+
409411
private volatile Map<TopicPartition, OffsetMetadata> definedPartitions;
410412

411413
private volatile Collection<TopicPartition> assignedPartitions;
@@ -705,8 +707,7 @@ public void run() {
705707
}
706708
publishConsumerPausedEvent(this.consumer.assignment());
707709
}
708-
ConsumerRecords<K, V> records = this.consumer
709-
.poll(Duration.ofMillis(this.containerProperties.getPollTimeout()));
710+
ConsumerRecords<K, V> records = this.consumer.poll(this.pollTimeout);
710711
this.lastPoll = System.currentTimeMillis();
711712
if (this.consumerPaused && !isPaused()) {
712713
if (this.logger.isDebugEnabled()) {

0 commit comments

Comments
 (0)