-
Notifications
You must be signed in to change notification settings - Fork 640
Description
Search before asking
- I had searched in the issues and found no similar issues.
Environment
Linux
EventMesh version
master
What happened
v1.11.0 eventMesh.storage.plugin.type插件为kafka,在EventMeshTCPClient eventMeshClient.close() 消费客户端关闭时,报kafka异常
How to reproduce
@PostConstruct
public void startListener() {
try {
UserAgent userAgent = MessageUtils.generateSubClient(EventMeshUtils.buildUserAgent(eventMeshConfig, Integer.parseInt(RandomUtil.randomNumbers(5)), eventMeshConfig.getClient().getConsumerGroup()));
EventMeshTCPClientConfig config = EventMeshTCPClientConfig.builder()
.host(eventMeshConfig.getServer().getHost())
.port(eventMeshConfig.getServer().getTcpPort())
.userAgent(userAgent)
.build();
eventMeshClient = EventMeshTCPClientFactory.createEventMeshTCPClient(config, EventMeshMessage.class);
eventMeshClient.init();
// 为每个主题单独订阅
List topicList = Arrays.asList(eventMeshConfig.getClient().getTopics().split(","));
for (String topic : topicList) {
String trimmedTopic = topic.trim();
if (!trimmedTopic.isEmpty()) {
eventMeshClient.subscribe(trimmedTopic, SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC);
log.info("Subscribed to topic: {}", trimmedTopic);
}
}
eventMeshClient.registerSubBusiHandler(this::processMessage);
eventMeshClient.listen();
} catch (Exception e) {
log.error("Failed to start EventMesh TCP listener", e);
}
}
@PreDestroy
public void stopListener() {
try {
if (eventMeshClient != null) {
try {
// 再关闭客户端
eventMeshClient.close();
log.info("EventMesh TCP Listener stopped successfully");
} catch (Exception e) {
log.warn("Error during close: {}", e.getMessage());
}
}
} catch (Exception e) {
log.error("Error stopping EventMesh TCP listener", e);
}
}
关闭客户端时报异常
Debug logs
2025-10-11 17:15:52,628 ERROR [eventMesh-tcp-worker-2] ConsumerImpl(ConsumerImpl.java:126) - Error while unsubscribing the Kafka consumer:
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: eventMesh-tcp-worker-2, id: 50) otherThread(id: 55)
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.acquire(ClassicKafkaConsumer.java:1232) ~[kafka-clients-3.9.0.jar:?]
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.acquireAndEnsureOpen(ClassicKafkaConsumer.java:1213) ~[kafka-clients-3.9.0.jar:?]
at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.unsubscribe(ClassicKafkaConsumer.java:544) ~[kafka-clients-3.9.0.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:764) ~[kafka-clients-3.9.0.jar:?]
at org.apache.eventmesh.storage.kafka.consumer.ConsumerImpl.unsubscribe(ConsumerImpl.java:121) [eventmesh-storage-kafka-1.11.0-release.jar:1.11.0-release]
at org.apache.eventmesh.storage.kafka.consumer.KafkaConsumerImpl.unsubscribe(KafkaConsumerImpl.java:81) [eventmesh-storage-kafka-1.11.0-release.jar:1.11.0-release]
at org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper.unsubscribe(MQConsumerWrapper.java:50) [eventmesh-runtime-1.11.0-release.jar:1.11.0-release]
at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper.unsubscribe(ClientGroupWrapper.java:623) [eventmesh-runtime-1.11.0-release.jar:1.11.0-r
elease]
at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.cleanSubscriptionInSession(ClientSessionGroupMapping.java:304) [eventmesh-runtim
e-1.11.0-release.jar:1.11.0-release]
at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.cleanClientGroupWrapperByCloseSub(ClientSessionGroupMapping.java:281) [eventmesh
-runtime-1.11.0-release.jar:1.11.0-release]
at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.closeSession(ClientSessionGroupMapping.java:175) [eventmesh-runtime-1.11.0-relea
se.jar:1.11.0-release]
at org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.closeSession(ClientSessionGroupMapping.java:144) [eventmesh-runtime-1.11.0-relea
se.jar:1.11.0-release]
at org.apache.eventmesh.runtime.boot.AbstractTCPServer$TcpConnectionHandler.channelInactive(AbstractTCPServer.java:436) [eventmesh-runtime-1.11.0-release.jar:1.11.0-rele
ase]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:412) [netty-codec-4.1.112.Final.jar:4.1.112.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:377) [netty-codec-4.1.112.Final.jar:4.1.112.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
at io.netty.channel.AbstractChannelHandlerContext.access$300(AbstractChannelHandlerContext.java:61) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
at io.netty.channel.AbstractChannelHandlerContext$4.run(AbstractChannelHandlerContext.java:286) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) [netty-common-4.1.112.Final.jar:4.1.112.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) [netty-common-4.1.112.Final.jar:4.1.112.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) [netty-common-4.1.112.Final.jar:4.1.112.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569) [netty-transport-4.1.112.Final.jar:4.1.112.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) [netty-common-4.1.112.Final.jar:4.1.112.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.112.Final.jar:4.1.112.Final]
at java.base/java.lang.Thread.run(Thread.java:834) [?:?]Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct *