diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java
new file mode 100644
index 0000000000..fbf2944a68
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java
@@ -0,0 +1,247 @@
+/*
+ * Copyright 2025-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.listener;
+
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.jspecify.annotations.Nullable;
+
+import org.springframework.beans.BeanUtils;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.BeanNameAware;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.context.ApplicationEventPublisherAware;
+import org.springframework.core.log.LogAccessor;
+import org.springframework.kafka.core.ShareConsumerFactory;
+import org.springframework.kafka.support.TopicPartitionOffset;
+import org.springframework.util.Assert;
+
+/**
+ * Abstract base class for share consumer message listener containers.
+ *
+ * Handles common lifecycle, configuration, and event publishing for containers using a
+ * {@link org.springframework.kafka.core.ShareConsumerFactory}.
+ *
+ * Subclasses are responsible for implementing the actual consumer loop and message dispatch logic.
+ *
+ * @param the key type
+ * @param the value type
+ *
+ * @author Soby Chacko
+ * @since 4.0
+ */
+public abstract class AbstractShareKafkaMessageListenerContainer
+ implements GenericMessageListenerContainer, BeanNameAware, ApplicationEventPublisherAware,
+ ApplicationContextAware {
+
+ /**
+ * The default {@link org.springframework.context.SmartLifecycle} phase for listener containers.
+ */
+ public static final int DEFAULT_PHASE = Integer.MAX_VALUE - 100;
+
+ protected final ShareConsumerFactory shareConsumerFactory;
+
+ protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
+
+ private final ContainerProperties containerProperties;
+
+ protected final ReentrantLock lifecycleLock = new ReentrantLock();
+
+ private String beanName = "noBeanNameSet";
+
+ @Nullable
+ private ApplicationEventPublisher applicationEventPublisher;
+
+ private boolean autoStartup = true;
+
+ private int phase = DEFAULT_PHASE;
+
+ @Nullable
+ private ApplicationContext applicationContext;
+
+ private volatile boolean running = false;
+
+ /**
+ * Construct an instance with the provided factory and properties.
+ * @param shareConsumerFactory the factory.
+ * @param containerProperties the properties.
+ */
+ @SuppressWarnings("unchecked")
+ protected AbstractShareKafkaMessageListenerContainer(@Nullable ShareConsumerFactory super K, ? super V> shareConsumerFactory,
+ ContainerProperties containerProperties) {
+ Assert.notNull(containerProperties, "'containerProperties' cannot be null");
+ Assert.notNull(shareConsumerFactory, "'shareConsumerFactory' cannot be null");
+ this.shareConsumerFactory = (ShareConsumerFactory) shareConsumerFactory;
+ @Nullable String @Nullable [] topics = containerProperties.getTopics();
+ if (topics != null) {
+ this.containerProperties = new ContainerProperties(topics);
+ }
+ else {
+ Pattern topicPattern = containerProperties.getTopicPattern();
+ if (topicPattern != null) {
+ this.containerProperties = new ContainerProperties(topicPattern);
+ }
+ else {
+ @Nullable TopicPartitionOffset @Nullable [] topicPartitions = containerProperties.getTopicPartitions();
+ if (topicPartitions != null) {
+ this.containerProperties = new ContainerProperties(topicPartitions);
+ }
+ else {
+ throw new IllegalStateException("topics, topicPattern, or topicPartitions must be provided");
+ }
+ }
+ }
+ BeanUtils.copyProperties(containerProperties, this.containerProperties,
+ "topics", "topicPartitions", "topicPattern");
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
+ @Nullable
+ public ApplicationContext getApplicationContext() {
+ return this.applicationContext;
+ }
+
+ @Override
+ public void setBeanName(String name) {
+ this.beanName = name;
+ }
+
+ /**
+ * Return the bean name.
+ * @return the bean name
+ */
+ public String getBeanName() {
+ return this.beanName;
+ }
+
+ @Override
+ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
+ this.applicationEventPublisher = applicationEventPublisher;
+ }
+
+ /**
+ * Get the event publisher.
+ * @return the publisher
+ */
+ @Nullable
+ public ApplicationEventPublisher getApplicationEventPublisher() {
+ return this.applicationEventPublisher;
+ }
+
+ @Override
+ public boolean isAutoStartup() {
+ return this.autoStartup;
+ }
+
+ @Override
+ public void setAutoStartup(boolean autoStartup) {
+ this.autoStartup = autoStartup;
+ }
+
+ @Override
+ public int getPhase() {
+ return this.phase;
+ }
+
+ public void setPhase(int phase) {
+ this.phase = phase;
+ }
+
+ @Override
+ public void stop(Runnable callback) {
+ stop();
+ callback.run();
+ }
+
+ @Override
+ public void start() {
+ this.lifecycleLock.lock();
+ try {
+ if (!isRunning()) {
+ Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener,
+ () -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
+ doStart();
+ }
+ }
+ finally {
+ this.lifecycleLock.unlock();
+ }
+ }
+
+ @Override
+ public void stop() {
+ this.lifecycleLock.lock();
+ try {
+ if (isRunning()) {
+ doStop();
+ }
+ }
+ finally {
+ this.lifecycleLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isRunning() {
+ return this.running;
+ }
+
+ protected void setRunning(boolean running) {
+ this.running = running;
+ }
+
+ @Override
+ public ContainerProperties getContainerProperties() {
+ return this.containerProperties;
+ }
+
+ @Override
+ @Nullable
+ public String getGroupId() {
+ return this.containerProperties.getGroupId() == null
+ ? (String) this.shareConsumerFactory.getConfigurationProperties().get(ConsumerConfig.GROUP_ID_CONFIG)
+ : this.containerProperties.getGroupId();
+ }
+
+ @Override
+ public String getListenerId() {
+ return this.beanName; // the container factory sets the bean name to the id attribute
+ }
+
+ @Override
+ public void setupMessageListener(Object messageListener) {
+ this.containerProperties.setMessageListener(messageListener);
+ }
+
+ protected abstract void doStart();
+
+ protected abstract void doStop();
+
+ @Override
+ public void destroy() {
+ stop();
+ }
+}
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java
new file mode 100644
index 0000000000..ae26d7b982
--- /dev/null
+++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java
@@ -0,0 +1,243 @@
+/*
+ * Copyright 2016-2025 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.kafka.listener;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.kafka.clients.consumer.AcknowledgeType;
+import org.apache.kafka.clients.consumer.ShareConsumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.jspecify.annotations.Nullable;
+
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.core.log.LogAccessor;
+import org.springframework.core.task.AsyncTaskExecutor;
+import org.springframework.core.task.SimpleAsyncTaskExecutor;
+import org.springframework.kafka.core.ShareConsumerFactory;
+import org.springframework.kafka.event.ConsumerStartedEvent;
+import org.springframework.kafka.event.ConsumerStartingEvent;
+import org.springframework.util.Assert;
+
+/**
+ * {@code ShareKafkaMessageListenerContainer} is a message listener container for Kafka's share consumer model.
+ *
+ * This container manages a single-threaded consumer loop using a {@link org.springframework.kafka.core.ShareConsumerFactory}.
+ * It is designed for use cases where Kafka's cooperative sharing protocol is desired, and provides a simple polling loop
+ * with per-record dispatch and acknowledgement.
+ *
+ * Lifecycle events are published for consumer starting and started. The container supports direct setting of the client.id.
+ *
+ * @param the key type
+ * @param the value type
+ *
+ * @author Soby Chacko
+ * @since 4.0
+ */
+public class ShareKafkaMessageListenerContainer
+ extends AbstractShareKafkaMessageListenerContainer {
+
+ private static final int POLL_TIMEOUT = 1000;
+
+ @Nullable
+ private String clientId;
+
+ @SuppressWarnings("NullAway.Init")
+ private volatile ShareListenerConsumer listenerConsumer;
+
+ @SuppressWarnings("NullAway.Init")
+ private volatile CompletableFuture listenerConsumerFuture;
+
+ private volatile CountDownLatch startLatch = new CountDownLatch(1);
+
+ /**
+ * Construct an instance with the supplied configuration properties.
+ * @param shareConsumerFactory the share consumer factory
+ * @param containerProperties the container properties
+ */
+ public ShareKafkaMessageListenerContainer(ShareConsumerFactory super K, ? super V> shareConsumerFactory,
+ ContainerProperties containerProperties) {
+ super(shareConsumerFactory, containerProperties);
+ Assert.notNull(shareConsumerFactory, "A ShareConsumerFactory must be provided");
+ }
+
+ /**
+ * Set the {@code client.id} to use for the consumer.
+ * @param clientId the client id to set
+ */
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ /**
+ * Get the {@code client.id} for the consumer.
+ * @return the client id, or null if not set
+ */
+ @Nullable
+ public String getClientId() {
+ return this.clientId;
+ }
+
+ @Override
+ public boolean isInExpectedState() {
+ return isRunning();
+ }
+
+ @Override
+ public Map> metrics() {
+ ShareListenerConsumer listenerConsumerForMetrics = this.listenerConsumer;
+ if (listenerConsumerForMetrics != null) {
+ Map metrics = listenerConsumerForMetrics.consumer.metrics();
+ return Collections.singletonMap(listenerConsumerForMetrics.getClientId(), metrics);
+ }
+ return Collections.emptyMap();
+ }
+
+ @Override
+ protected void doStart() {
+ if (isRunning()) {
+ return;
+ }
+ ContainerProperties containerProperties = getContainerProperties();
+ Object messageListener = containerProperties.getMessageListener();
+ AsyncTaskExecutor consumerExecutor = containerProperties.getListenerTaskExecutor();
+ if (consumerExecutor == null) {
+ consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");
+ containerProperties.setListenerTaskExecutor(consumerExecutor);
+ }
+ GenericMessageListener> listener = (GenericMessageListener>) messageListener;
+ Assert.state(listener != null, "'messageListener' cannot be null");
+ this.listenerConsumer = new ShareListenerConsumer(listener);
+ setRunning(true);
+ this.listenerConsumerFuture = CompletableFuture.runAsync(this.listenerConsumer, consumerExecutor);
+ }
+
+ @Override
+ protected void doStop() {
+ setRunning(false);
+ // The consumer will exit its loop naturally when running becomes false.
+ }
+
+ private void publishConsumerStartingEvent() {
+ this.startLatch.countDown();
+ ApplicationEventPublisher publisher = getApplicationEventPublisher();
+ if (publisher != null) {
+ publisher.publishEvent(new ConsumerStartingEvent(this, this));
+ }
+ }
+
+ private void publishConsumerStartedEvent() {
+ ApplicationEventPublisher publisher = getApplicationEventPublisher();
+ if (publisher != null) {
+ publisher.publishEvent(new ConsumerStartedEvent(this, this));
+ }
+ }
+
+ /**
+ * The inner share consumer thread: polls for records and dispatches to the listener.
+ */
+ private class ShareListenerConsumer implements Runnable {
+
+ private final LogAccessor logger = ShareKafkaMessageListenerContainer.this.logger;
+
+ private final ShareConsumer consumer;
+
+ private final GenericMessageListener> genericListener;
+
+ private final @Nullable String consumerGroupId = ShareKafkaMessageListenerContainer.this.getGroupId();
+
+ private final @Nullable String clientId;
+
+ ShareListenerConsumer(GenericMessageListener> listener) {
+ this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer(
+ ShareKafkaMessageListenerContainer.this.getGroupId(),
+ ShareKafkaMessageListenerContainer.this.getClientId());
+
+ this.genericListener = listener;
+ this.clientId = ShareKafkaMessageListenerContainer.this.getClientId();
+ // Subscribe to topics, just like in the test
+ ContainerProperties containerProperties = getContainerProperties();
+ this.consumer.subscribe(Arrays.asList(containerProperties.getTopics()));
+ }
+
+ @Nullable
+ String getClientId() {
+ return this.clientId;
+ }
+
+ @Override
+ public void run() {
+ initialize();
+ Throwable exitThrowable = null;
+ while (isRunning()) {
+ try {
+ var records = this.consumer.poll(java.time.Duration.ofMillis(POLL_TIMEOUT));
+ if (records != null && records.count() > 0) {
+ for (var record : records) {
+ @SuppressWarnings("unchecked")
+ GenericMessageListener