|
1 | 1 | /* |
2 | | - * Copyright 2016-2020 the original author or authors. |
| 2 | + * Copyright 2016-2021 the original author or authors. |
3 | 3 | * |
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | 5 | * you may not use this file except in compliance with the License. |
|
57 | 57 | import org.springframework.context.ApplicationListener; |
58 | 58 | import org.springframework.context.event.ContextStoppedEvent; |
59 | 59 | import org.springframework.core.log.LogAccessor; |
| 60 | +import org.springframework.kafka.KafkaException; |
60 | 61 | import org.springframework.kafka.support.TransactionSupport; |
61 | 62 | import org.springframework.lang.Nullable; |
62 | 63 | import org.springframework.util.Assert; |
@@ -508,7 +509,20 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix, |
508 | 509 | this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet()); |
509 | 510 | } |
510 | 511 | newProducer = createRawProducer(newProducerConfigs); |
511 | | - newProducer.initTransactions(); |
| 512 | + try { |
| 513 | + newProducer.initTransactions(); |
| 514 | + } |
| 515 | + catch (RuntimeException ex) { |
| 516 | + try { |
| 517 | + newProducer.close(this.physicalCloseTimeout); |
| 518 | + } |
| 519 | + catch (RuntimeException ex2) { |
| 520 | + KafkaException newEx = new KafkaException("initTransactions() failed and then close() failed", ex); |
| 521 | + newEx.addSuppressed(ex2); |
| 522 | + throw newEx; // NOSONAR - lost stack trace |
| 523 | + } |
| 524 | + throw new KafkaException("initTransactions() failed", ex); |
| 525 | + } |
512 | 526 | return new CloseSafeProducer<>(newProducer, getCache(prefix), remover, |
513 | 527 | (String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), this.physicalCloseTimeout, |
514 | 528 | this.epoch); |
|
0 commit comments