@@ -310,7 +310,18 @@ public void setFailIfSendResultIsError(boolean failIfSendResultIsError) {
310310 }
311311
312312 /**
313- * Set the minumum time to wait for message sending. Default is the producer
313+ * If true, wait for the send result and throw an exception if it fails.
314+ * It will wait for the milliseconds specified in waitForSendResultTimeout for the result.
315+ * @return true to wait.
316+ * @since 2.7.14
317+ * @see #setWaitForSendResultTimeout(Duration)
318+ */
319+ protected boolean isFailIfSendResultIsError () {
320+ return this .failIfSendResultIsError ;
321+ }
322+
323+ /**
324+ * Set the minimum time to wait for message sending. Default is the producer
314325 * configuration {@code delivery.timeout.ms} plus the {@link #setTimeoutBuffer(long)}.
315326 * @param waitForSendResultTimeout the timeout.
316327 * @since 2.7
@@ -322,8 +333,9 @@ public void setWaitForSendResultTimeout(Duration waitForSendResultTimeout) {
322333 }
323334
324335 /**
325- * Set the number of milliseconds to add to the producer configuration {@code delivery.timeout.ms}
326- * property to avoid timing out before the Kafka producer. Default 5000.
336+ * Set the number of milliseconds to add to the producer configuration
337+ * {@code delivery.timeout.ms} property to avoid timing out before the Kafka producer.
338+ * Default 5000.
327339 * @param buffer the buffer.
328340 * @since 2.7
329341 * @see #setWaitForSendResultTimeout(Duration)
@@ -332,6 +344,16 @@ public void setTimeoutBuffer(long buffer) {
332344 this .timeoutBuffer = buffer ;
333345 }
334346
347+ /**
348+ * The number of milliseconds to add to the producer configuration
349+ * {@code delivery.timeout.ms} property to avoid timing out before the Kafka producer.
350+ * @return the buffer.
351+ * @since 2.7.14
352+ */
353+ protected long getTimeoutBuffer () {
354+ return this .timeoutBuffer ;
355+ }
356+
335357 /**
336358 * Set to false to retain previous exception headers as well as headers for the
337359 * current exception. Default is true, which means only the current headers are
@@ -366,6 +388,15 @@ public void setExceptionHeadersCreator(ExceptionHeadersCreator headersCreator) {
366388 this .exceptionHeadersCreator = headersCreator ;
367389 }
368390
391+ /**
392+ * True if publishing should run in a transaction.
393+ * @return true for transactional.
394+ * @since 2.7.14
395+ */
396+ protected boolean isTransactional () {
397+ return this .transactional ;
398+ }
399+
369400 /**
370401 * Clear the header inclusion bit for the header name.
371402 * @param headers the headers to clear.
@@ -629,7 +660,14 @@ protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations
629660 }
630661 }
631662
632- private void verifySendResult (KafkaOperations <Object , Object > kafkaTemplate ,
663+ /**
664+ * Wait for the send future to complete.
665+ * @param kafkaTemplate the template used to send the record.
666+ * @param outRecord the record.
667+ * @param sendResult the future.
668+ * @param inRecord the original consumer record.
669+ */
670+ protected void verifySendResult (KafkaOperations <Object , Object > kafkaTemplate ,
633671 ProducerRecord <Object , Object > outRecord ,
634672 @ Nullable ListenableFuture <SendResult <Object , Object >> sendResult , ConsumerRecord <?, ?> inRecord ) {
635673
@@ -655,7 +693,14 @@ private String pubFailMessage(ProducerRecord<Object, Object> outRecord, Consumer
655693 + outRecord .topic () + "failed for: " + ListenerUtils .recordToString (inRecord , true );
656694 }
657695
658- private Duration determineSendTimeout (KafkaOperations <?, ?> template ) {
696+ /**
697+ * Determine the send timeout based on the template's producer factory and
698+ * {@link #setWaitForSendResultTimeout(Duration)}.
699+ * @param template the template.
700+ * @return the timeout.
701+ * @since 2.7.14
702+ */
703+ protected Duration determineSendTimeout (KafkaOperations <?, ?> template ) {
659704 ProducerFactory <? extends Object , ? extends Object > producerFactory = template .getProducerFactory ();
660705 if (producerFactory != null ) { // NOSONAR - will only occur in mock tests
661706 Map <String , Object > props = producerFactory .getConfigurationProperties ();
0 commit comments