@@ -142,17 +142,17 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
142142
143143 private TransactionIdSuffixStrategy transactionIdSuffixStrategy = new DefaultTransactionIdSuffixStrategy (0 );
144144
145- private Supplier <Serializer <K >> keySerializerSupplier ;
145+ private @ Nullable Supplier <Serializer <K >> keySerializerSupplier ;
146146
147- private Supplier <Serializer <V >> valueSerializerSupplier ;
147+ private @ Nullable Supplier <Serializer <V >> valueSerializerSupplier ;
148148
149- private Supplier <Serializer <K >> rawKeySerializerSupplier ;
149+ private @ Nullable Supplier <Serializer <K >> rawKeySerializerSupplier ;
150150
151- private Supplier <Serializer <V >> rawValueSerializerSupplier ;
151+ private @ Nullable Supplier <Serializer <V >> rawValueSerializerSupplier ;
152152
153153 private Duration physicalCloseTimeout = DEFAULT_PHYSICAL_CLOSE_TIMEOUT ;
154154
155- private ApplicationContext applicationContext ;
155+ private @ Nullable ApplicationContext applicationContext ;
156156
157157 private String beanName = "not.managed.by.Spring" ;
158158
@@ -162,11 +162,11 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
162162
163163 private boolean configureSerializers = true ;
164164
165- private volatile String transactionIdPrefix ;
165+ private volatile @ Nullable String transactionIdPrefix ;
166166
167- private volatile String clientIdPrefix ;
167+ private volatile @ Nullable String clientIdPrefix ;
168168
169- private volatile CloseSafeProducer <K , V > producer ;
169+ private volatile @ Nullable CloseSafeProducer <K , V > producer ;
170170
171171 /**
172172 * Construct a factory with the provided configuration.
@@ -267,7 +267,7 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
267267 }
268268 }
269269
270- private Supplier <Serializer <K >> keySerializerSupplier (@ Nullable Supplier <Serializer <K >> keySerializerSupplier ) {
270+ private @ Nullable Supplier <Serializer <K >> keySerializerSupplier (@ Nullable Supplier <Serializer <K >> keySerializerSupplier ) {
271271 this .rawKeySerializerSupplier = keySerializerSupplier ;
272272 if (!this .configureSerializers ) {
273273 return keySerializerSupplier ;
@@ -283,7 +283,7 @@ private Supplier<Serializer<K>> keySerializerSupplier(@Nullable Supplier<Seriali
283283 };
284284 }
285285
286- private Supplier <Serializer <V >> valueSerializerSupplier (@ Nullable Supplier <Serializer <V >> valueSerializerSupplier ) {
286+ private @ Nullable Supplier <Serializer <V >> valueSerializerSupplier (@ Nullable Supplier <Serializer <V >> valueSerializerSupplier ) {
287287 this .rawValueSerializerSupplier = valueSerializerSupplier ;
288288 if (!this .configureSerializers ) {
289289 return valueSerializerSupplier ;
@@ -456,21 +456,23 @@ public boolean isProducerPerThread() {
456456 @ Override
457457 @ Nullable
458458 public Serializer <K > getKeySerializer () {
459- return this .keySerializerSupplier .get ();
459+ return this .keySerializerSupplier == null ? null : this . keySerializerSupplier .get ();
460460 }
461461
462462 @ Override
463463 @ Nullable
464464 public Serializer <V > getValueSerializer () {
465- return this .valueSerializerSupplier .get ();
465+ return this .valueSerializerSupplier == null ? null : this . valueSerializerSupplier .get ();
466466 }
467467
468468 @ Override
469+ @ Nullable
469470 public Supplier <Serializer <K >> getKeySerializerSupplier () {
470471 return this .rawKeySerializerSupplier ;
471472 }
472473
473474 @ Override
475+ @ Nullable
474476 public Supplier <Serializer <V >> getValueSerializerSupplier () {
475477 return this .rawValueSerializerSupplier ;
476478 }
@@ -546,9 +548,11 @@ public int getPhase() {
546548 * properties applied
547549 */
548550 @ Override
549- public ProducerFactory <K , V > copyWithConfigurationOverride (Map <String , Object > overrideProperties ) {
551+ public ProducerFactory <K , V > copyWithConfigurationOverride (@ Nullable Map <String , Object > overrideProperties ) {
550552 Map <String , Object > producerProperties = new HashMap <>(getConfigurationProperties ());
551- producerProperties .putAll (overrideProperties );
553+ if (overrideProperties != null ) {
554+ producerProperties .putAll (overrideProperties );
555+ }
552556 producerProperties = ensureExistingTransactionIdPrefixInProperties (producerProperties );
553557 DefaultKafkaProducerFactory <K , V > newFactory = new DefaultKafkaProducerFactory <>(producerProperties ,
554558 getKeySerializerSupplier (),
@@ -850,7 +854,7 @@ protected Producer<K, V> createTransactionalProducer() {
850854 return createTransactionalProducer (this .transactionIdPrefix );
851855 }
852856
853- protected Producer <K , V > createTransactionalProducer (String txIdPrefix ) {
857+ protected Producer <K , V > createTransactionalProducer (@ Nullable String txIdPrefix ) {
854858 BlockingQueue <CloseSafeProducer <K , V >> queue = getCache (txIdPrefix );
855859 Assert .notNull (queue , () -> "No cache found for " + txIdPrefix );
856860 CloseSafeProducer <K , V > cachedProducer = queue .poll ();
@@ -912,7 +916,7 @@ private void removeTransactionProducer(CloseSafeProducer<K, V> producer, Duratio
912916 listeners .forEach (listener -> listener .producerRemoved (producer .clientId , producer ));
913917 }
914918
915- private CloseSafeProducer <K , V > doCreateTxProducer (String prefix , String suffix ,
919+ private CloseSafeProducer <K , V > doCreateTxProducer (@ Nullable String prefix , String suffix ,
916920 BiPredicate <CloseSafeProducer <K , V >, Duration > remover ) {
917921 Producer <K , V > newProducer = createRawProducer (getTxProducerConfigs (prefix + suffix ));
918922 try {
@@ -941,7 +945,8 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
941945
942946 protected Producer <K , V > createRawProducer (Map <String , Object > rawConfigs ) {
943947 Producer <K , V > kafkaProducer =
944- new KafkaProducer <>(rawConfigs , this .keySerializerSupplier .get (), this .valueSerializerSupplier .get ());
948+ new KafkaProducer <>(rawConfigs , this .keySerializerSupplier == null ? null : this .keySerializerSupplier .get (),
949+ this .valueSerializerSupplier == null ? null : this .valueSerializerSupplier .get ());
945950 for (ProducerPostProcessor <K , V > pp : this .postProcessors ) {
946951 kafkaProducer = pp .apply (kafkaProducer );
947952 }
@@ -1033,9 +1038,9 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
10331038
10341039 private final BiPredicate <CloseSafeProducer <K , V >, Duration > removeProducer ;
10351040
1036- final String txIdPrefix ; // NOSONAR
1041+ final @ Nullable String txIdPrefix ; // NOSONAR
10371042
1038- final String txIdSuffix ; // NOSONAR
1043+ final @ Nullable String txIdSuffix ; // NOSONAR
10391044
10401045 final long created ; // NOSONAR
10411046
@@ -1045,7 +1050,7 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
10451050
10461051 final int epoch ; // NOSONAR
10471052
1048- private volatile Exception producerFailed ;
1053+ private volatile @ Nullable Exception producerFailed ;
10491054
10501055 volatile boolean closed ; // NOSONAR
10511056
@@ -1186,8 +1191,11 @@ public void commitTransaction() throws ProducerFencedException {
11861191 public void abortTransaction () throws ProducerFencedException {
11871192 LOGGER .debug (() -> toString () + " abortTransaction()" );
11881193 if (this .producerFailed != null ) {
1189- LOGGER .debug (() -> "abortTransaction ignored - previous txFailed: " + this .producerFailed .getMessage ()
1190- + ": " + this );
1194+ LOGGER .debug (() -> {
1195+ String message = this .producerFailed == null ? "" : this .producerFailed .getMessage ();
1196+ return "abortTransaction ignored - previous txFailed: " + message
1197+ + ": " + this ;
1198+ });
11911199 }
11921200 else {
11931201 try {
0 commit comments