Skip to content

Commit 20cbb8b

Browse files
garyrussellartembilan
authored andcommitted
Log TX Exceptions
- Log exceptions on commit/abort transaction - Don't attempt to abort if commit fails, otherwise we see `Abort failed` `org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state` **cherry-pick to 2.1.x** * Rename variable to detect whether the commit failed * Polishing; use try/catch around commit # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java
1 parent e8cafc1 commit 20cbb8b

File tree

2 files changed

+22
-1
lines changed

2 files changed

+22
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ public void commitTransaction() throws ProducerFencedException {
417417
this.delegate.commitTransaction();
418418
}
419419
catch (RuntimeException e) {
420+
logger.error("Commit failed", e);
420421
this.txFailed = true;
421422
throw e;
422423
}
@@ -428,6 +429,7 @@ public void abortTransaction() throws ProducerFencedException {
428429
this.delegate.abortTransaction();
429430
}
430431
catch (RuntimeException e) {
432+
logger.error("Abort failed", e);
431433
this.txFailed = true;
432434
throw e;
433435
}

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,17 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
271271
this.producers.set(producer);
272272
T result = null;
273273
try {
274-
result = callback.doInOperations(this);
274+
T result = callback.doInOperations(this);
275+
try {
276+
producer.commitTransaction();
277+
}
278+
catch (Exception e) {
279+
throw new SkipAbortException(e);
280+
}
281+
return result;
282+
}
283+
catch (SkipAbortException e) {
284+
throw ((RuntimeException) e.getCause());
275285
}
276286
catch (Exception e) {
277287
try {
@@ -415,4 +425,13 @@ private Producer<K, V> getTheProducer() {
415425
}
416426
}
417427

428+
@SuppressWarnings("serial")
429+
private static final class SkipAbortException extends RuntimeException {
430+
431+
SkipAbortException(Throwable cause) {
432+
super(cause);
433+
}
434+
435+
}
436+
418437
}

0 commit comments

Comments
 (0)