Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -93,6 +94,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -129,6 +131,9 @@ public class ChannelManager {
private final boolean allowReleaseEventLoopGroup;
private final Bootstrap httpBootstrap;
private final Bootstrap wsBootstrap;
// Channel options, resolved from config once at construction, applied to each channel from the channel
// initializer instead of via Bootstrap#option to avoid Netty's synchronized per-connect options map (issue #2218).
private final Map.Entry<ChannelOption<?>, Object>[] channelOptions;
private final long handshakeTimeout;
private final @Nullable AddressResolverGroup<InetSocketAddress> addressResolverGroup;

Expand Down Expand Up @@ -200,8 +205,9 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
}
}

httpBootstrap = newBootstrap(transportFactory, eventLoopGroup, config);
wsBootstrap = newBootstrap(transportFactory, eventLoopGroup, config);
channelOptions = buildChannelOptions(config);
httpBootstrap = newBootstrap(transportFactory, eventLoopGroup);
wsBootstrap = newBootstrap(transportFactory, eventLoopGroup);

// Use the address resolver group from config if provided; otherwise null (legacy per-request resolution)
addressResolverGroup = config.getAddressResolverGroup();
Expand Down Expand Up @@ -243,37 +249,64 @@ public static boolean isSslHandlerConfigured(ChannelPipeline pipeline) {
return pipeline.get(SSL_HANDLER) != null;
}

private static Bootstrap newBootstrap(ChannelFactory<? extends Channel> channelFactory, EventLoopGroup eventLoopGroup, AsyncHttpClientConfig config) {
Bootstrap bootstrap = new Bootstrap().channelFactory(channelFactory).group(eventLoopGroup)
.option(ChannelOption.ALLOCATOR, config.getAllocator() != null ? config.getAllocator() : ByteBufAllocator.DEFAULT)
.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay())
.option(ChannelOption.SO_REUSEADDR, config.isSoReuseAddress())
.option(ChannelOption.SO_KEEPALIVE, config.isSoKeepAlive())
.option(ChannelOption.AUTO_CLOSE, false);
private static Bootstrap newBootstrap(ChannelFactory<? extends Channel> channelFactory, EventLoopGroup eventLoopGroup) {
// Channel options are intentionally NOT set on the Bootstrap. Netty's AbstractBootstrap applies them
// per-connect by copying the shared options map under "synchronized (options)" in newOptionsArray(),
// which serializes every outbound connection on a single monitor (issue #2218). Instead, we apply the
// pre-resolved options to each Channel from the channel initializer via Channel.config(), keeping the
// Bootstrap options map empty and removing that global lock from the connect path.
return new Bootstrap().channelFactory(channelFactory).group(eventLoopGroup);
}

/**
* Resolves the configured {@link ChannelOption}s from the client config exactly once. Values and conditional
* options (connect timeout, SO_LINGER, buffer sizes) are computed here so the per-connection path only iterates
* a fixed array and never re-reads the config.
*/
@SuppressWarnings("unchecked")
private static Map.Entry<ChannelOption<?>, Object>[] buildChannelOptions(AsyncHttpClientConfig config) {
Map<ChannelOption<?>, Object> options = new LinkedHashMap<>();
options.put(ChannelOption.ALLOCATOR, config.getAllocator() != null ? config.getAllocator() : ByteBufAllocator.DEFAULT);
options.put(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
options.put(ChannelOption.SO_REUSEADDR, config.isSoReuseAddress());
options.put(ChannelOption.SO_KEEPALIVE, config.isSoKeepAlive());
options.put(ChannelOption.AUTO_CLOSE, false);

long connectTimeout = config.getConnectTimeout().toMillis();
if (connectTimeout > 0) {
connectTimeout = Math.min(connectTimeout, Integer.MAX_VALUE);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) connectTimeout);
options.put(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) connectTimeout);
}

if (config.getSoLinger() >= 0) {
bootstrap.option(ChannelOption.SO_LINGER, config.getSoLinger());
options.put(ChannelOption.SO_LINGER, config.getSoLinger());
}

if (config.getSoSndBuf() >= 0) {
bootstrap.option(ChannelOption.SO_SNDBUF, config.getSoSndBuf());
options.put(ChannelOption.SO_SNDBUF, config.getSoSndBuf());
}

if (config.getSoRcvBuf() >= 0) {
bootstrap.option(ChannelOption.SO_RCVBUF, config.getSoRcvBuf());
options.put(ChannelOption.SO_RCVBUF, config.getSoRcvBuf());
}

for (Entry<ChannelOption<Object>, Object> entry : config.getChannelOptions().entrySet()) {
bootstrap.option(entry.getKey(), entry.getValue());
}
// User-supplied options last so they can override the defaults above, matching the previous Bootstrap order.
options.putAll(config.getChannelOptions());

return options.entrySet().toArray(new Map.Entry[0]);
}

return bootstrap;
/**
* Applies the pre-resolved channel options to a freshly created channel. Invoked from the channel initializer
* (once per connection, on the channel's event loop, before the channel is connected), mirroring what
* {@link Bootstrap#option} would otherwise do but without the shared, synchronized options map.
*/
@SuppressWarnings("unchecked")
private void applyChannelOptions(Channel channel) {
ChannelConfig channelConfig = channel.config();
for (Map.Entry<ChannelOption<?>, Object> option : channelOptions) {
channelConfig.setOption((ChannelOption<Object>) option.getKey(), option.getValue());
}
}

public void configureBootstraps(NettyRequestSender requestSender) {
Expand All @@ -284,6 +317,8 @@ public void configureBootstraps(NettyRequestSender requestSender) {
httpBootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
applyChannelOptions(ch);

ChannelPipeline pipeline = ch.pipeline()
.addLast(HTTP_CLIENT_CODEC, newHttpClientCodec());

Expand All @@ -309,6 +344,8 @@ protected void initChannel(Channel ch) {
wsBootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
applyChannelOptions(ch);

ChannelPipeline pipeline = ch.pipeline()
.addLast(HTTP_CLIENT_CODEC, newHttpClientCodec())
.addLast(AHC_WS_HANDLER, wsHandler);
Expand Down
Loading