Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,10 @@ This feature helps developers quickly identify when acknowledgment calls are mis
else {
acknowledgment.release(); // Temporary failure - retry later
}
else {
acknowledgment.reject(); // Invalid order - don't retry
}
}
else {
acknowledgment.reject(); // Invalid order - don't retry
}
}
catch (Exception e) {
// Exception automatically triggers REJECT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
containerFactory, beanName);

if (listenerContainerFactory instanceof ShareKafkaListenerContainerFactory<?, ?>) {
endpoint.setShareConsumer(Boolean.TRUE);
endpoint.setShareConsumer(true);
}

this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.regex.Pattern;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.jspecify.annotations.Nullable;
import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode;

import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
Expand Down Expand Up @@ -54,17 +54,20 @@
* @since 4.0
*/
public class ShareKafkaListenerContainerFactory<K, V>
implements KafkaListenerContainerFactory<ShareKafkaMessageListenerContainer<K, V>>, ApplicationEventPublisherAware, ApplicationContextAware {
implements KafkaListenerContainerFactory<ShareKafkaMessageListenerContainer<K, V>>,
ApplicationEventPublisherAware, ApplicationContextAware {

private final ShareConsumerFactory<? super K, ? super V> shareConsumerFactory;

private @Nullable Boolean autoStartup;
private boolean autoStartup = true;

private @Nullable Integer phase;
private int phase = 0;

private @Nullable ApplicationEventPublisher applicationEventPublisher;
@SuppressWarnings("NullAway.Init")
private ApplicationEventPublisher applicationEventPublisher;

private @Nullable ApplicationContext applicationContext;
@SuppressWarnings("NullAway.Init")
private ApplicationContext applicationContext;

/**
* Construct an instance with the provided consumer factory.
Expand All @@ -83,15 +86,15 @@ public void setApplicationContext(ApplicationContext applicationContext) {
* Set whether containers created by this factory should auto-start.
* @param autoStartup true to auto-start
*/
public void setAutoStartup(Boolean autoStartup) {
public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}

/**
* Set the phase in which containers created by this factory should start and stop.
* @param phase the phase
*/
public void setPhase(Integer phase) {
public void setPhase(int phase) {
this.phase = phase;
}

Expand Down Expand Up @@ -126,7 +129,7 @@ private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> endpoint) {
*/
protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> instance, KafkaListenerEndpoint endpoint) {
ContainerProperties properties = instance.getContainerProperties();
Boolean effectiveAutoStartup = endpoint.getAutoStartup() != null ? endpoint.getAutoStartup() : this.autoStartup;
boolean effectiveAutoStartup = endpoint.getAutoStartup() != null ? endpoint.getAutoStartup() : this.autoStartup;

// Validate share group configuration
validateShareConfiguration(endpoint);
Expand All @@ -135,11 +138,12 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
boolean explicitAck = determineExplicitAcknowledgment(properties);
properties.setExplicitShareAcknowledgment(explicitAck);

instance.setAutoStartup(effectiveAutoStartup);
instance.setPhase(this.phase);
instance.setApplicationContext(this.applicationContext);
instance.setApplicationEventPublisher(this.applicationEventPublisher);

JavaUtils.INSTANCE
.acceptIfNotNull(effectiveAutoStartup, instance::setAutoStartup)
.acceptIfNotNull(this.phase, instance::setPhase)
.acceptIfNotNull(this.applicationContext, instance::setApplicationContext)
.acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
.acceptIfNotNull(endpoint.getGroupId(), properties::setGroupId)
.acceptIfNotNull(endpoint.getClientIdPrefix(), properties::setClientId);
}
Expand All @@ -163,18 +167,8 @@ private boolean determineExplicitAcknowledgment(ContainerProperties containerPro
.get(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);

if (clientAckMode != null) {
String mode = clientAckMode.toString().toLowerCase();
if ("explicit".equals(mode)) {
return true;
}
else if ("implicit".equals(mode)) {
return false;
}
else {
throw new IllegalArgumentException(
"Invalid " + ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG + ": " + mode +
". Must be 'implicit' or 'explicit'");
}
ShareAcknowledgementMode mode = ShareAcknowledgementMode.fromString(clientAckMode.toString());
return mode == ShareAcknowledgementMode.EXPLICIT;
}
// Default to implicit acknowledgment (false)
return containerProperties.isExplicitShareAcknowledgment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.springframework.kafka.listener.AcknowledgingShareConsumerAwareMessageListener;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.support.ShareAcknowledgment;
import org.springframework.kafka.support.converter.JacksonProjectingMessageConverter;
import org.springframework.kafka.support.converter.ProjectingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;

Expand All @@ -50,10 +48,6 @@
public class ShareRecordMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V>
implements AcknowledgingShareConsumerAwareMessageListener<K, V> {

public ShareRecordMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Method method) {
this(bean, method, null);
}

public ShareRecordMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Method method,
@Nullable KafkaListenerErrorHandler errorHandler) {
super(bean, method, errorHandler);
Expand Down Expand Up @@ -81,8 +75,8 @@ public void onShareRecord(ConsumerRecord<K, V> record, @Nullable ShareAcknowledg
}
if (logger.isDebugEnabled()) {
RecordMessageConverter messageConverter = getMessageConverter();
if (!(messageConverter instanceof JacksonProjectingMessageConverter
|| messageConverter instanceof ProjectingMessageConverter)) {
if (!(messageConverter instanceof org.springframework.kafka.support.converter.JacksonProjectingMessageConverter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an overreacting.
I have talked only about a deprecated ProjectingMessageConverter class.

|| messageConverter instanceof org.springframework.kafka.support.converter.ProjectingMessageConverter)) {
this.logger.debug("Processing [" + message + "]");
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.awaitility.Awaitility;
import org.jspecify.annotations.Nullable;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.core.log.LogAccessor;
Expand All @@ -58,7 +59,6 @@
import org.springframework.kafka.test.utils.KafkaTestUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -150,7 +150,8 @@ void shouldSupportExplicitAcknowledgmentMode(EmbeddedKafkaBroker broker) throws
List<String> received = Collections.synchronizedList(new ArrayList<>());
List<ShareAcknowledgment> acknowledgments = Collections.synchronizedList(new ArrayList<>());

containerProps.setMessageListener((AcknowledgingShareConsumerAwareMessageListener<String, String>) (record, acknowledgment, consumer) -> {
containerProps.setMessageListener((AcknowledgingShareConsumerAwareMessageListener<String, String>) (
record, acknowledgment, consumer) -> {
received.add(record.value());
acknowledgments.add(acknowledgment);

Expand Down Expand Up @@ -264,7 +265,8 @@ void shouldEnforceExplicitAcknowledgmentConstraints(EmbeddedKafkaBroker broker)
container.setBeanName("constraintTestContainer");
container.start();

LogAccessor logAccessor = spy(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.logger", LogAccessor.class));
LogAccessor logAccessor = spy(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.logger",
LogAccessor.class));

DirectFieldAccessor accessor = new DirectFieldAccessor(container);
accessor.setPropertyValue("listenerConsumer.logger", logAccessor);
Expand All @@ -280,8 +282,9 @@ void shouldEnforceExplicitAcknowledgmentConstraints(EmbeddedKafkaBroker broker)
// Wait for the next poll to be blocked since no explicit acknowledgment has been made yet.
// this.logger.trace(() -> "Poll blocked waiting for " + this.pendingAcknowledgments.size() +
// " acknowledgments");
Awaitility.await().atMost(15, TimeUnit.SECONDS).untilAsserted(
() -> verify(logAccessor, atLeastOnce()).trace(any(Supplier.class)));
Awaitility.await().atMost(15, TimeUnit.SECONDS).untilAsserted(() ->
verify(logAccessor, atLeastOnce()).trace(ArgumentMatchers.<Supplier<CharSequence>>any())
);

assertThat(processedCount.get()).isEqualTo(3);

Expand Down Expand Up @@ -340,7 +343,8 @@ void shouldHandlePartialAcknowledgmentCorrectly(EmbeddedKafkaBroker broker) thro
container.setBeanName("partialAckTestContainer");
container.start();

LogAccessor logAccessor = spy(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.logger", LogAccessor.class));
LogAccessor logAccessor = spy(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.logger",
LogAccessor.class));

DirectFieldAccessor accessor = new DirectFieldAccessor(container);
accessor.setPropertyValue("listenerConsumer.logger", logAccessor);
Expand All @@ -363,8 +367,9 @@ void shouldHandlePartialAcknowledgmentCorrectly(EmbeddedKafkaBroker broker) thro
// Wait for the next poll to be blocked since one acknowledgment is still pending.
// this.logger.trace(() -> "Poll blocked waiting for " + this.pendingAcknowledgments.size() +
// " acknowledgments");
Awaitility.await().atMost(15, TimeUnit.SECONDS).untilAsserted(
() -> verify(logAccessor, atLeastOnce()).trace(any(Supplier.class)));
Awaitility.await().atMost(15, TimeUnit.SECONDS).untilAsserted(() ->
verify(logAccessor, atLeastOnce()).trace(ArgumentMatchers.<Supplier<CharSequence>>any())
);

assertThat(totalProcessed.get()).isEqualTo(4);

Expand Down