1919import java .time .Duration ;
2020import java .util .function .Function ;
2121
22+ import org .jspecify .annotations .Nullable ;
23+
2224import org .springframework .boot .context .properties .PropertyMapper ;
2325import org .springframework .boot .kafka .autoconfigure .KafkaProperties .Listener ;
2426import org .springframework .core .task .SimpleAsyncTaskExecutor ;
3638import org .springframework .kafka .support .converter .BatchMessageConverter ;
3739import org .springframework .kafka .support .converter .RecordMessageConverter ;
3840import org .springframework .kafka .transaction .KafkaAwareTransactionManager ;
41+ import org .springframework .util .Assert ;
3942
4043/**
4144 * Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults tuned
5356 */
5457public class ConcurrentKafkaListenerContainerFactoryConfigurer {
5558
56- private KafkaProperties properties ;
59+ private @ Nullable KafkaProperties properties ;
5760
58- private BatchMessageConverter batchMessageConverter ;
61+ private @ Nullable BatchMessageConverter batchMessageConverter ;
5962
60- private RecordMessageConverter recordMessageConverter ;
63+ private @ Nullable RecordMessageConverter recordMessageConverter ;
6164
62- private RecordFilterStrategy <Object , Object > recordFilterStrategy ;
65+ private @ Nullable RecordFilterStrategy <Object , Object > recordFilterStrategy ;
6366
64- private KafkaTemplate <Object , Object > replyTemplate ;
67+ private @ Nullable KafkaTemplate <Object , Object > replyTemplate ;
6568
66- private KafkaAwareTransactionManager <Object , Object > transactionManager ;
69+ private @ Nullable KafkaAwareTransactionManager <Object , Object > transactionManager ;
6770
68- private ConsumerAwareRebalanceListener rebalanceListener ;
71+ private @ Nullable ConsumerAwareRebalanceListener rebalanceListener ;
6972
70- private CommonErrorHandler commonErrorHandler ;
73+ private @ Nullable CommonErrorHandler commonErrorHandler ;
7174
72- private AfterRollbackProcessor <Object , Object > afterRollbackProcessor ;
75+ private @ Nullable AfterRollbackProcessor <Object , Object > afterRollbackProcessor ;
7376
74- private RecordInterceptor <Object , Object > recordInterceptor ;
77+ private @ Nullable RecordInterceptor <Object , Object > recordInterceptor ;
7578
76- private BatchInterceptor <Object , Object > batchInterceptor ;
79+ private @ Nullable BatchInterceptor <Object , Object > batchInterceptor ;
7780
78- private Function <MessageListenerContainer , String > threadNameSupplier ;
81+ private @ Nullable Function <MessageListenerContainer , String > threadNameSupplier ;
7982
80- private SimpleAsyncTaskExecutor listenerTaskExecutor ;
83+ private @ Nullable SimpleAsyncTaskExecutor listenerTaskExecutor ;
8184
8285 /**
8386 * Set the {@link KafkaProperties} to use.
8487 * @param properties the properties
8588 */
86- void setKafkaProperties (KafkaProperties properties ) {
89+ void setKafkaProperties (@ Nullable KafkaProperties properties ) {
8790 this .properties = properties ;
8891 }
8992
9093 /**
9194 * Set the {@link BatchMessageConverter} to use.
9295 * @param batchMessageConverter the message converter
9396 */
94- void setBatchMessageConverter (BatchMessageConverter batchMessageConverter ) {
97+ void setBatchMessageConverter (@ Nullable BatchMessageConverter batchMessageConverter ) {
9598 this .batchMessageConverter = batchMessageConverter ;
9699 }
97100
98101 /**
99102 * Set the {@link RecordMessageConverter} to use.
100103 * @param recordMessageConverter the message converter
101104 */
102- void setRecordMessageConverter (RecordMessageConverter recordMessageConverter ) {
105+ void setRecordMessageConverter (@ Nullable RecordMessageConverter recordMessageConverter ) {
103106 this .recordMessageConverter = recordMessageConverter ;
104107 }
105108
106109 /**
107110 * Set the {@link RecordFilterStrategy} to use to filter incoming records.
108111 * @param recordFilterStrategy the record filter strategy
109112 */
110- void setRecordFilterStrategy (RecordFilterStrategy <Object , Object > recordFilterStrategy ) {
113+ void setRecordFilterStrategy (@ Nullable RecordFilterStrategy <Object , Object > recordFilterStrategy ) {
111114 this .recordFilterStrategy = recordFilterStrategy ;
112115 }
113116
114117 /**
115118 * Set the {@link KafkaTemplate} to use to send replies.
116119 * @param replyTemplate the reply template
117120 */
118- void setReplyTemplate (KafkaTemplate <Object , Object > replyTemplate ) {
121+ void setReplyTemplate (@ Nullable KafkaTemplate <Object , Object > replyTemplate ) {
119122 this .replyTemplate = replyTemplate ;
120123 }
121124
122125 /**
123126 * Set the {@link KafkaAwareTransactionManager} to use.
124127 * @param transactionManager the transaction manager
125128 */
126- void setTransactionManager (KafkaAwareTransactionManager <Object , Object > transactionManager ) {
129+ void setTransactionManager (@ Nullable KafkaAwareTransactionManager <Object , Object > transactionManager ) {
127130 this .transactionManager = transactionManager ;
128131 }
129132
130133 /**
131134 * Set the {@link ConsumerAwareRebalanceListener} to use.
132135 * @param rebalanceListener the rebalance listener.
133136 */
134- void setRebalanceListener (ConsumerAwareRebalanceListener rebalanceListener ) {
137+ void setRebalanceListener (@ Nullable ConsumerAwareRebalanceListener rebalanceListener ) {
135138 this .rebalanceListener = rebalanceListener ;
136139 }
137140
138141 /**
139142 * Set the {@link CommonErrorHandler} to use.
140143 * @param commonErrorHandler the error handler.
141144 */
142- public void setCommonErrorHandler (CommonErrorHandler commonErrorHandler ) {
145+ public void setCommonErrorHandler (@ Nullable CommonErrorHandler commonErrorHandler ) {
143146 this .commonErrorHandler = commonErrorHandler ;
144147 }
145148
146149 /**
147150 * Set the {@link AfterRollbackProcessor} to use.
148151 * @param afterRollbackProcessor the after rollback processor
149152 */
150- void setAfterRollbackProcessor (AfterRollbackProcessor <Object , Object > afterRollbackProcessor ) {
153+ void setAfterRollbackProcessor (@ Nullable AfterRollbackProcessor <Object , Object > afterRollbackProcessor ) {
151154 this .afterRollbackProcessor = afterRollbackProcessor ;
152155 }
153156
154157 /**
155158 * Set the {@link RecordInterceptor} to use.
156159 * @param recordInterceptor the record interceptor.
157160 */
158- void setRecordInterceptor (RecordInterceptor <Object , Object > recordInterceptor ) {
161+ void setRecordInterceptor (@ Nullable RecordInterceptor <Object , Object > recordInterceptor ) {
159162 this .recordInterceptor = recordInterceptor ;
160163 }
161164
162165 /**
163166 * Set the {@link BatchInterceptor} to use.
164167 * @param batchInterceptor the batch interceptor.
165168 */
166- void setBatchInterceptor (BatchInterceptor <Object , Object > batchInterceptor ) {
169+ void setBatchInterceptor (@ Nullable BatchInterceptor <Object , Object > batchInterceptor ) {
167170 this .batchInterceptor = batchInterceptor ;
168171 }
169172
170173 /**
171174 * Set the thread name supplier to use.
172175 * @param threadNameSupplier the thread name supplier to use
173176 */
174- void setThreadNameSupplier (Function <MessageListenerContainer , String > threadNameSupplier ) {
177+ void setThreadNameSupplier (@ Nullable Function <MessageListenerContainer , String > threadNameSupplier ) {
175178 this .threadNameSupplier = threadNameSupplier ;
176179 }
177180
178181 /**
179182 * Set the executor for threads that poll the consumer.
180183 * @param listenerTaskExecutor task executor
181184 */
182- void setListenerTaskExecutor (SimpleAsyncTaskExecutor listenerTaskExecutor ) {
185+ void setListenerTaskExecutor (@ Nullable SimpleAsyncTaskExecutor listenerTaskExecutor ) {
183186 this .listenerTaskExecutor = listenerTaskExecutor ;
184187 }
185188
@@ -199,6 +202,7 @@ public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> li
199202
200203 private void configureListenerFactory (ConcurrentKafkaListenerContainerFactory <Object , Object > factory ) {
201204 PropertyMapper map = PropertyMapper .get ().alwaysApplyingWhenNonNull ();
205+ Assert .state (this .properties != null , "'properties' must not be null" );
202206 Listener properties = this .properties .getListener ();
203207 map .from (properties ::getConcurrency ).to (factory ::setConcurrency );
204208 map .from (properties ::isAutoStartup ).to (factory ::setAutoStartup );
@@ -219,6 +223,7 @@ private void configureListenerFactory(ConcurrentKafkaListenerContainerFactory<Ob
219223
220224 private void configureContainer (ContainerProperties container ) {
221225 PropertyMapper map = PropertyMapper .get ().alwaysApplyingWhenNonNull ();
226+ Assert .state (this .properties != null , "'properties' must not be null" );
222227 Listener properties = this .properties .getListener ();
223228 map .from (properties ::getAckMode ).to (container ::setAckMode );
224229 map .from (properties ::getAsyncAcks ).to (container ::setAsyncAcks );
0 commit comments