Skip to content

Commit 7a21a2c

Browse files
committed
Flush Netty buffer more regurlaly to keep it writable
And add bootstrap customizer.
1 parent cd845bf commit 7a21a2c

File tree

6 files changed

+166
-90
lines changed

6 files changed

+166
-90
lines changed

pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,20 @@
525525
<scope>test</scope>
526526
<optional>true</optional>
527527
</dependency>
528+
<dependency>
529+
<groupId>io.netty</groupId>
530+
<artifactId>netty-transport-native-kqueue</artifactId>
531+
<version>${netty.version}</version>
532+
<classifier>osx-aarch_64</classifier>
533+
<scope>test</scope>
534+
</dependency>
535+
<dependency>
536+
<groupId>io.netty</groupId>
537+
<artifactId>netty-transport-native-epoll</artifactId>
538+
<version>${netty.version}</version>
539+
<classifier>linux-x86_64</classifier>
540+
<scope>test</scope>
541+
</dependency>
528542

529543
</dependencies>
530544

@@ -787,6 +801,7 @@
787801
<include>src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java</include>
788802
<include>src/main/java/com/rabbitmq/client/observation/**/*.java</include>
789803
<include>src/test/java/com/rabbitmq/client/test/functional/MicrometerObservationCollectorMetrics.java</include>
804+
<include>src/test/java/com/rabbitmq/client/test/NettyTest.java</include>
790805
</includes>
791806
<googleJavaFormat>
792807
<version>${google-java-format.version}</version>

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.rabbitmq.client.impl.recovery.RetryHandler;
2424
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
2525
import com.rabbitmq.client.observation.ObservationCollector;
26+
import io.netty.bootstrap.Bootstrap;
2627
import io.netty.channel.EventLoopGroup;
2728
import io.netty.handler.ssl.SslContext;
2829
import io.netty.handler.ssl.SslContextBuilder;
@@ -1031,6 +1032,7 @@ connectionTimeout, nioParams, isSSL(), sslContextFactory,
10311032
this.frameHandlerFactory = new NettyFrameHandlerFactory(
10321033
this.nettyConf.eventLoopGroup,
10331034
this.nettyConf.channelCustomizer,
1035+
this.nettyConf.bootstrapCustomizer,
10341036
this.nettyConf.sslContextFactory,
10351037
connectionTimeout, socketConf, maxInboundMessageBodySize);
10361038
}
@@ -1796,7 +1798,7 @@ public static final class NettyConfiguration {
17961798
private final ConnectionFactory cf;
17971799
private EventLoopGroup eventLoopGroup;
17981800
private Consumer<io.netty.channel.Channel> channelCustomizer = ch -> { };
1799-
private SslContext sslContext;
1801+
private Consumer<Bootstrap> bootstrapCustomizer = b -> { };
18001802
private Function<String, SslContext> sslContextFactory;
18011803

18021804
public NettyConfiguration(ConnectionFactory cf) {
@@ -1813,8 +1815,12 @@ public NettyConfiguration channelCustomizer(Consumer<io.netty.channel.Channel> c
18131815
return this;
18141816
}
18151817

1818+
public NettyConfiguration bootstrapCustomizer(Consumer<Bootstrap> bootstrapCustomizer) {
1819+
this.bootstrapCustomizer = bootstrapCustomizer;
1820+
return this;
1821+
}
1822+
18161823
public NettyConfiguration sslContext(SslContext sslContext) {
1817-
this.sslContext = sslContext;
18181824
this.sslContextFactory = name -> sslContext;
18191825
return this;
18201826
}

src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,18 +67,22 @@ public final class NettyFrameHandlerFactory extends AbstractFrameHandlerFactory
6767
private final EventLoopGroup eventLoopGroup;
6868
private final Function<String, SslContext> sslContextFactory;
6969
private final Consumer<Channel> channelCustomizer;
70+
private final Consumer<Bootstrap> bootstrapCustomizer;
7071

7172
public NettyFrameHandlerFactory(
7273
EventLoopGroup eventLoopGroup,
7374
Consumer<Channel> channelCustomizer,
75+
Consumer<Bootstrap> bootstrapCustomizer,
7476
Function<String, SslContext> sslContextFactory,
7577
int connectionTimeout,
7678
SocketConfigurator configurator,
7779
int maxInboundMessageBodySize) {
7880
super(connectionTimeout, configurator, sslContextFactory != null, maxInboundMessageBodySize);
7981
this.eventLoopGroup = eventLoopGroup;
8082
this.sslContextFactory = sslContextFactory == null ? ignored -> null : sslContextFactory;
81-
this.channelCustomizer = channelCustomizer;
83+
this.channelCustomizer = channelCustomizer == null ? Utils.noOpConsumer() : channelCustomizer;
84+
this.bootstrapCustomizer =
85+
bootstrapCustomizer == null ? Utils.noOpConsumer() : bootstrapCustomizer;
8286
}
8387

8488
private static void closeNettyState(Channel channel, EventLoopGroup eventLoopGroup) {
@@ -121,7 +125,8 @@ public FrameHandler create(Address addr, String connectionName) throws IOExcepti
121125
addr,
122126
sslContext,
123127
this.eventLoopGroup,
124-
this.channelCustomizer);
128+
this.channelCustomizer,
129+
this.bootstrapCustomizer);
125130
}
126131

127132
private static final class NettyFrameHandler implements FrameHandler {
@@ -146,18 +151,33 @@ private NettyFrameHandler(
146151
Address addr,
147152
SslContext sslContext,
148153
EventLoopGroup elg,
149-
Consumer<Channel> channelCustomizer)
154+
Consumer<Channel> channelCustomizer,
155+
Consumer<Bootstrap> bootstrapCustomizer)
150156
throws IOException {
151157
Bootstrap b = new Bootstrap();
152-
if (elg == null) {
153-
elg = Utils.eventLoopGroup();
154-
this.eventLoopGroup = elg;
158+
bootstrapCustomizer.accept(b);
159+
if (b.config().group() == null) {
160+
EventLoopGroup eventLoopGroup;
161+
if (elg == null) {
162+
elg = Utils.eventLoopGroup();
163+
this.eventLoopGroup = elg;
164+
} else {
165+
this.eventLoopGroup = null;
166+
}
167+
b.group(elg);
155168
} else {
156169
this.eventLoopGroup = null;
157170
}
158-
b.group(elg);
159-
b.channel(NioSocketChannel.class);
160-
b.option(ChannelOption.ALLOCATOR, Utils.byteBufAllocator());
171+
if (b.config().channelFactory() == null) {
172+
b.channel(NioSocketChannel.class);
173+
}
174+
if (!b.config().options().containsKey(ChannelOption.SO_KEEPALIVE)) {
175+
b.option(ChannelOption.SO_KEEPALIVE, true);
176+
}
177+
if (!b.config().options().containsKey(ChannelOption.ALLOCATOR)) {
178+
b.option(ChannelOption.ALLOCATOR, Utils.byteBufAllocator());
179+
}
180+
161181
// type + channel + payload size + payload + frame end marker
162182
int maxFrameLength = 1 + 2 + 4 + maxInboundMessageBodySize + 1;
163183
int lengthFieldOffset = 3;
@@ -296,7 +316,7 @@ public void writeFrame(Frame frame) throws IOException {
296316
private void doWriteFrame(Frame frame) throws IOException {
297317
ByteBuf bb = this.channel.alloc().buffer(frame.size());
298318
frame.writeToByteBuf(bb);
299-
this.channel.write(bb);
319+
this.channel.writeAndFlush(bb);
300320
}
301321

302322
@Override
@@ -411,6 +431,8 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
411431
if (canWrite) {
412432
CountDownLatch latch = writableLatch.getAndSet(new CountDownLatch(1));
413433
latch.countDown();
434+
} else {
435+
ctx.channel().flush();
414436
}
415437
}
416438
super.channelWritabilityChanged(ctx);

src/main/java/com/rabbitmq/client/impl/Utils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,13 @@
2222
import io.netty.channel.nio.NioEventLoopGroup;
2323
import io.netty.channel.nio.NioIoHandler;
2424

25+
import java.util.function.Consumer;
26+
2527
final class Utils {
2628

29+
@SuppressWarnings("rawtypes")
30+
private static final Consumer NO_OP_CONSUMER = o -> {};
31+
2732
static final boolean IS_NETTY_4_2;
2833

2934
private static final int AVAILABLE_PROCESSORS =
@@ -60,4 +65,9 @@ static EventLoopGroup eventLoopGroup() {
6065
static ByteBufAllocator byteBufAllocator() {
6166
return ByteBufAllocator.DEFAULT;
6267
}
68+
69+
@SuppressWarnings("unchecked")
70+
static <T> Consumer<T> noOpConsumer() {
71+
return (Consumer<T>) NO_OP_CONSUMER;
72+
}
6373
}
Lines changed: 93 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,48 @@
1+
// Copyright (c) 2025 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Java client library, is triple-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
6+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
7+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
8+
// please see LICENSE-APACHE2.
9+
//
10+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
11+
// either express or implied. See the LICENSE file for specific language governing
12+
// rights and limitations of this software.
13+
//
14+
// If you have any questions regarding licensing, please contact us at
15+
116
package com.rabbitmq.client.test;
217

18+
import static com.rabbitmq.client.test.TestUtils.LatchConditions.completed;
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
321
import com.rabbitmq.client.AMQP;
422
import com.rabbitmq.client.Channel;
5-
import com.rabbitmq.client.ConfirmCallback;
6-
import com.rabbitmq.client.ConfirmListener;
723
import com.rabbitmq.client.Connection;
824
import com.rabbitmq.client.ConnectionFactory;
925
import com.rabbitmq.client.DefaultConsumer;
1026
import com.rabbitmq.client.Envelope;
11-
import org.assertj.core.api.Assertions;
12-
import org.junit.jupiter.api.BeforeEach;
13-
import org.junit.jupiter.api.Test;
14-
27+
import io.netty.channel.EventLoopGroup;
28+
import io.netty.channel.IoHandlerFactory;
29+
import io.netty.channel.MultiThreadIoEventLoopGroup;
30+
import io.netty.channel.epoll.EpollIoHandler;
31+
import io.netty.channel.epoll.EpollSocketChannel;
32+
import io.netty.channel.kqueue.KQueueIoHandler;
33+
import io.netty.channel.kqueue.KQueueSocketChannel;
1534
import java.io.IOException;
1635
import java.nio.charset.StandardCharsets;
36+
import java.util.Set;
37+
import java.util.concurrent.ConcurrentHashMap;
1738
import java.util.concurrent.CountDownLatch;
18-
19-
import static com.rabbitmq.client.test.TestUtils.LatchConditions.completed;
20-
import static org.assertj.core.api.Assertions.assertThat;
39+
import java.util.concurrent.TimeUnit;
40+
import java.util.concurrent.TimeoutException;
41+
import org.junit.jupiter.api.BeforeEach;
42+
import org.junit.jupiter.api.Test;
43+
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
44+
import org.junit.jupiter.api.condition.EnabledOnOs;
45+
import org.junit.jupiter.api.condition.OS;
2146

2247
public class NettyTest {
2348

@@ -26,34 +51,79 @@ public class NettyTest {
2651
@BeforeEach
2752
void init() {
2853
cf = TestUtils.connectionFactory();
54+
cf.useNetty();
55+
}
56+
57+
@Test
58+
void publishConsumeDefaults() throws Exception {
59+
publishConsume(cf);
60+
}
61+
62+
@Test
63+
@EnabledOnOs(OS.MAC)
64+
@EnabledIfSystemProperty(named = "os.arch", matches = "aarch64")
65+
void kqueue() throws Exception {
66+
nativeIoTest(KQueueIoHandler.newFactory(), KQueueSocketChannel.class);
2967
}
3068

3169
@Test
32-
void test() throws Exception {
70+
@EnabledOnOs(OS.LINUX)
71+
@EnabledIfSystemProperty(named = "os.arch", matches = "amd64")
72+
void epoll() throws Exception {
73+
nativeIoTest(EpollIoHandler.newFactory(), EpollSocketChannel.class);
74+
}
75+
76+
private static void nativeIoTest(
77+
IoHandlerFactory ioHandlerFactory, Class<? extends io.netty.channel.Channel> channelClass)
78+
throws IOException, TimeoutException {
79+
ConnectionFactory cf = TestUtils.connectionFactory();
80+
EventLoopGroup eventLoopGroup = new MultiThreadIoEventLoopGroup(ioHandlerFactory);
81+
Set<io.netty.channel.Channel> channels = ConcurrentHashMap.newKeySet();
82+
cf.netty()
83+
.eventLoopGroup(eventLoopGroup)
84+
.bootstrapCustomizer(b -> b.channel(channelClass))
85+
.channelCustomizer(channels::add);
86+
try {
87+
publishConsume(cf);
88+
assertThat(channels)
89+
.isNotEmpty()
90+
.allMatch(ch -> ch.getClass().isAssignableFrom(channelClass));
91+
} finally {
92+
eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS);
93+
}
94+
}
95+
96+
private static void publishConsume(ConnectionFactory cf) throws IOException, TimeoutException {
3397
try (Connection c = cf.newConnection()) {
3498
Channel ch1 = c.createChannel();
3599
String q = ch1.queueDeclare().getQueue();
36100

37101
Channel ch2 = c.createChannel();
38102
CountDownLatch consumeLatch = new CountDownLatch(1);
39103
CountDownLatch cancelLatch = new CountDownLatch(1);
40-
String ctag = ch2.basicConsume(q, new DefaultConsumer(ch2) {
41-
@Override
42-
public void handleDelivery(String ctg, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
43-
ch2.basicAck(envelope.getDeliveryTag(), false);
44-
consumeLatch.countDown();
45-
}
46-
47-
@Override
48-
public void handleCancelOk(String consumerTag) {
49-
cancelLatch.countDown();
50-
}
51-
});
104+
String ctag =
105+
ch2.basicConsume(
106+
q,
107+
new DefaultConsumer(ch2) {
108+
@Override
109+
public void handleDelivery(
110+
String ctg, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
111+
throws IOException {
112+
ch2.basicAck(envelope.getDeliveryTag(), false);
113+
consumeLatch.countDown();
114+
}
115+
116+
@Override
117+
public void handleCancelOk(String consumerTag) {
118+
cancelLatch.countDown();
119+
}
120+
});
52121

53122
Channel ch3 = c.createChannel();
54123
ch3.confirmSelect();
55124
CountDownLatch confirmLatch = new CountDownLatch(1);
56-
ch3.addConfirmListener((deliveryTag, multiple) -> confirmLatch.countDown(), (dtag, multiple) -> {});
125+
ch3.addConfirmListener(
126+
(deliveryTag, multiple) -> confirmLatch.countDown(), (dtag, multiple) -> {});
57127
ch3.basicPublish("", q, null, "hello".getBytes(StandardCharsets.UTF_8));
58128
assertThat(confirmLatch).is(completed());
59129
assertThat(consumeLatch).is(completed());
@@ -70,5 +140,4 @@ public void handleCancelOk(String consumerTag) {
70140
ch1.close();
71141
}
72142
}
73-
74143
}

0 commit comments

Comments
 (0)