|
1 | 1 | /* |
2 | | - * Copyright 2013-2022 the original author or authors. |
| 2 | + * Copyright 2013-2023 the original author or authors. |
3 | 3 | * |
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | 5 | * you may not use this file except in compliance with the License. |
@@ -438,6 +438,7 @@ public String getComponentType() { |
438 | 438 | return this.isGateway ? "kafka:outbound-gateway" : "kafka:outbound-channel-adapter"; |
439 | 439 | } |
440 | 440 |
|
| 441 | + @Nullable |
441 | 442 | protected MessageChannel getSendFailureChannel() { |
442 | 443 | if (this.sendFailureChannel != null) { |
443 | 444 | return this.sendFailureChannel; |
@@ -515,19 +516,27 @@ protected Object handleRequestMessage(final Message<?> message) { |
515 | 516 | } |
516 | 517 | CompletableFuture<SendResult<K, V>> sendFuture; |
517 | 518 | RequestReplyFuture<K, V, Object> gatewayFuture = null; |
518 | | - if (this.isGateway && (!preBuilt || producerRecord.headers().lastHeader(KafkaHeaders.REPLY_TOPIC) == null)) { |
519 | | - producerRecord.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, getReplyTopic(message))); |
520 | | - gatewayFuture = ((ReplyingKafkaTemplate<K, V, Object>) this.kafkaTemplate).sendAndReceive(producerRecord); |
521 | | - sendFuture = gatewayFuture.getSendFuture(); |
522 | | - } |
523 | | - else { |
524 | | - if (this.transactional && !this.kafkaTemplate.inTransaction() && !this.allowNonTransactional) { |
525 | | - sendFuture = this.kafkaTemplate.executeInTransaction(template -> template.send(producerRecord)); |
| 519 | + try { |
| 520 | + if (this.isGateway |
| 521 | + && (!preBuilt || producerRecord.headers().lastHeader(KafkaHeaders.REPLY_TOPIC) == null)) { |
| 522 | + producerRecord.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, getReplyTopic(message))); |
| 523 | + gatewayFuture = ((ReplyingKafkaTemplate<K, V, Object>) this.kafkaTemplate) |
| 524 | + .sendAndReceive(producerRecord); |
| 525 | + sendFuture = gatewayFuture.getSendFuture(); |
526 | 526 | } |
527 | 527 | else { |
528 | | - sendFuture = this.kafkaTemplate.send(producerRecord); |
| 528 | + if (this.transactional && !this.kafkaTemplate.inTransaction() && !this.allowNonTransactional) { |
| 529 | + sendFuture = this.kafkaTemplate.executeInTransaction(template -> template.send(producerRecord)); |
| 530 | + } |
| 531 | + else { |
| 532 | + sendFuture = this.kafkaTemplate.send(producerRecord); |
| 533 | + } |
529 | 534 | } |
530 | 535 | } |
| 536 | + catch (RuntimeException rtex) { |
| 537 | + sendFailure(message, producerRecord, getSendFailureChannel(), rtex); |
| 538 | + throw rtex; |
| 539 | + } |
531 | 540 | sendFutureIfRequested(sendFuture, futureToken); |
532 | 541 | if (flush) { |
533 | 542 | this.kafkaTemplate.flush(); |
@@ -699,10 +708,8 @@ public void processSendResult(final Message<?> message, final ProducerRecord<K, |
699 | 708 | .build()); |
700 | 709 | } |
701 | 710 | } |
702 | | - else if (failureChannel != null) { |
703 | | - KafkaProducerMessageHandler.this.messagingTemplate.send(failureChannel, |
704 | | - KafkaProducerMessageHandler.this.errorMessageStrategy.buildErrorMessage( |
705 | | - new KafkaSendFailureException(message, producerRecord, exception), null)); |
| 711 | + else { |
| 712 | + sendFailure(message, producerRecord, failureChannel, exception); |
706 | 713 | } |
707 | 714 | }); |
708 | 715 | } |
@@ -730,6 +737,16 @@ else if (failureChannel != null) { |
730 | 737 | } |
731 | 738 | } |
732 | 739 |
|
| 740 | + private void sendFailure(final Message<?> message, final ProducerRecord<K, V> producerRecord, |
| 741 | + @Nullable MessageChannel failureChannel, Throwable exception) { |
| 742 | + |
| 743 | + if (failureChannel != null) { |
| 744 | + KafkaProducerMessageHandler.this.messagingTemplate.send(failureChannel, |
| 745 | + KafkaProducerMessageHandler.this.errorMessageStrategy.buildErrorMessage( |
| 746 | + new KafkaSendFailureException(message, producerRecord, exception), null)); |
| 747 | + } |
| 748 | + } |
| 749 | + |
733 | 750 | private Future<?> processReplyFuture(@Nullable RequestReplyFuture<?, ?, Object> future) { |
734 | 751 | if (future == null) { |
735 | 752 | return null; |
|
0 commit comments