diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 973d229befe..7ed977d99ce 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -40,6 +40,7 @@ import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.resolver.NoopAddressResolverGroup; +import io.netty.util.AttributeKey; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; @@ -96,6 +97,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); + private static final AttributeKey CHANNEL_WRAPPER_ATTRIBUTE_KEY = AttributeKey.valueOf( + "channelWrapper"); + private static final long LOCK_TIMEOUT_MILLIS = 3000; private static final long MIN_CLOSE_TIMEOUT_MILLIS = 100; @@ -106,7 +110,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private final Map proxyMap = new HashMap<>(); private final ConcurrentHashMap bootstrapMap = new ConcurrentHashMap<>(); private final ConcurrentMap channelTables = new ConcurrentHashMap<>(); - private final ConcurrentMap channelWrapperTables = new ConcurrentHashMap<>(); private final HashedWheelTimer timer = new HashedWheelTimer(r -> new Thread(r, "ClientHouseKeepingService")); @@ -381,7 +384,6 @@ public void shutdown() { channel.getValue().close(); } - this.channelWrapperTables.clear(); this.channelTables.clear(); this.eventLoopGroupWorker.shutdownGracefully(); @@ -439,7 +441,8 @@ public void closeChannel(final String addr, final Channel channel) { } if (removeItemFromTable) { - ChannelWrapper channelWrapper = this.channelWrapperTables.remove(channel); + ChannelWrapper channelWrapper = + RemotingHelper.getAttributeValue(CHANNEL_WRAPPER_ATTRIBUTE_KEY, channel); if (channelWrapper != null && channelWrapper.tryClose(channel)) { this.channelTables.remove(addrRemote); } @@ -487,7 +490,8 @@ public void closeChannel(final Channel channel) { } if (removeItemFromTable) { - ChannelWrapper channelWrapper = this.channelWrapperTables.remove(channel); + ChannelWrapper channelWrapper = + RemotingHelper.getAttributeValue(CHANNEL_WRAPPER_ATTRIBUTE_KEY, channel); if (channelWrapper != null && channelWrapper.tryClose(channel)) { this.channelTables.remove(addrRemote); } @@ -724,7 +728,6 @@ private ChannelWrapper createChannel(String addr) { LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr); ChannelWrapper cw = new ChannelWrapper(addr, channelFuture); this.channelTables.put(addr, cw); - this.channelWrapperTables.put(channelFuture.channel(), cw); return cw; } @@ -831,17 +834,12 @@ public CompletableFuture invokeImpl(final Channel channel, final if (response.getCode() == ResponseCode.GO_AWAY) { if (nettyClientConfig.isEnableReconnectForGoAway()) { LOGGER.info("Receive go away from channelId={}, channel={}", channel.id(), channel); - ChannelWrapper channelWrapper = channelWrapperTables.computeIfPresent(channel, (channel0, channelWrapper0) -> { - try { - if (channelWrapper0.reconnect(channel0)) { - LOGGER.info("Receive go away from channelId={}, channel={}, recreate the channelId={}", channel0.id(), channel0, channelWrapper0.getChannel().id()); - channelWrapperTables.put(channelWrapper0.getChannel(), channelWrapper0); - } - } catch (Throwable t) { - LOGGER.error("Channel {} reconnect error", channelWrapper0, t); - } - return channelWrapper0; - }); + ChannelWrapper channelWrapper = RemotingHelper.getAttributeValue(CHANNEL_WRAPPER_ATTRIBUTE_KEY, + channel); + if (channelWrapper != null && channelWrapper.reconnect(channel)) { + LOGGER.info("Receive go away from channelId={}, channel={}, recreate the channelId={}", + channel.id(), channel, channelWrapper.getChannel().id()); + } if (channelWrapper != null && !channelWrapper.isWrapperOf(channel)) { RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader()); retryRequest.setBody(request.getBody()); @@ -1006,6 +1004,7 @@ public ChannelWrapper(String address, ChannelFuture channelFuture) { this.channelFuture = channelFuture; this.lastResponseTime = System.currentTimeMillis(); this.channelAddress = address; + RemotingHelper.setPropertyToAttr(channelFuture.channel(), CHANNEL_WRAPPER_ATTRIBUTE_KEY, this); } public boolean isOK() { @@ -1055,10 +1054,13 @@ public boolean reconnect(Channel channel) { if (isWrapperOf(channel)) { channelToClose = channelFuture; channelFuture = doConnect(channelAddress); + RemotingHelper.setPropertyToAttr(channelFuture.channel(), CHANNEL_WRAPPER_ATTRIBUTE_KEY, this); return true; } else { LOGGER.warn("channelWrapper has reconnect, so do nothing, now channelId={}, input channelId={}",getChannel().id(), channel.id()); } + } catch (Throwable t) { + LOGGER.error("ChannelWrapper {} reconnect error", this, t); } finally { lock.writeLock().unlock(); }