Skip to content

Commit d3413d2

Browse files
committed
Add Netty-based frame handler
1 parent d4bb4e5 commit d3413d2

29 files changed

+1186
-330
lines changed

.github/workflows/test.yml

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,23 +39,14 @@ jobs:
3939
run: ci/start-cluster.sh
4040
- name: Get dependencies
4141
run: make deps
42-
- name: Test with NIO
42+
- name: Test with Netty
4343
run: |
44-
./mvnw verify -P use-nio -Drabbitmqctl.bin=DOCKER:rabbitmq0 \
44+
./mvnw verify -Dio.layer=netty -Drabbitmqctl.bin=DOCKER:rabbitmq0 \
4545
-Dtest-broker.A.nodename=rabbit@node0 -Dtest-broker.B.nodename=rabbit@node1 \
4646
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
4747
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
4848
-Dmaven.javadoc.skip=true \
4949
--no-transfer-progress
50-
- name: Test with blocking IO
51-
run: |
52-
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq0 \
53-
-Dtest-broker.A.nodename=rabbit@node0 -Dtest-broker.B.nodename=rabbit@node1 \
54-
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
55-
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
56-
-Dmaven.javadoc.skip=true \
57-
-Dtest-client-cert.password= -Dtest-tls-certs.dir=rabbitmq-configuration/tls \
58-
--no-transfer-progress
5950
- name: Stop cluster
6051
run: docker compose --file ci/cluster/docker-compose.yml down
6152
- name: Publish snapshot

pom.xml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656

5757
<spotless.check.skip>true</spotless.check.skip>
5858
<slf4j.version>1.7.36</slf4j.version>
59+
<netty.version>4.2.3.Final</netty.version>
5960
<metrics.version>4.2.33</metrics.version>
6061
<micrometer.version>1.15.2</micrometer.version>
6162
<opentelemetry.version>1.52.0</opentelemetry.version>
@@ -401,6 +402,21 @@
401402
<artifactId>slf4j-api</artifactId>
402403
<version>${slf4j.version}</version>
403404
</dependency>
405+
<dependency>
406+
<groupId>io.netty</groupId>
407+
<artifactId>netty-transport</artifactId>
408+
<version>${netty.version}</version>
409+
</dependency>
410+
<dependency>
411+
<groupId>io.netty</groupId>
412+
<artifactId>netty-codec</artifactId>
413+
<version>${netty.version}</version>
414+
</dependency>
415+
<dependency>
416+
<groupId>io.netty</groupId>
417+
<artifactId>netty-handler</artifactId>
418+
<version>${netty.version}</version>
419+
</dependency>
404420
<dependency>
405421
<groupId>io.dropwizard.metrics</groupId>
406422
<artifactId>metrics-core</artifactId>
@@ -768,6 +784,7 @@
768784
<configuration>
769785
<java>
770786
<includes>
787+
<include>src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java</include>
771788
<include>src/main/java/com/rabbitmq/client/observation/**/*.java</include>
772789
<include>src/test/java/com/rabbitmq/client/test/functional/MicrometerObservationCollectorMetrics.java</include>
773790
</includes>

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@
2323
import com.rabbitmq.client.impl.recovery.RetryHandler;
2424
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
2525
import com.rabbitmq.client.observation.ObservationCollector;
26-
import org.slf4j.Logger;
27-
import org.slf4j.LoggerFactory;
26+
import io.netty.channel.EventLoopGroup;
27+
import io.netty.handler.ssl.SslContext;
28+
import io.netty.handler.ssl.SslContextBuilder;
2829

2930
import javax.net.SocketFactory;
3031
import javax.net.ssl.SSLContext;
@@ -40,6 +41,8 @@
4041
import java.util.Map.Entry;
4142
import java.util.concurrent.*;
4243
import java.util.function.BiConsumer;
44+
import java.util.function.Consumer;
45+
import java.util.function.Function;
4346
import java.util.function.Predicate;
4447

4548
import static java.util.concurrent.TimeUnit.MINUTES;
@@ -141,6 +144,7 @@ public class ConnectionFactory implements Cloneable {
141144
private ObservationCollector observationCollector = ObservationCollector.NO_OP;
142145

143146
private boolean nio = false;
147+
private boolean netty = true;
144148
private FrameHandlerFactory frameHandlerFactory;
145149
private NioParams nioParams = new NioParams();
146150

@@ -1022,6 +1026,15 @@ connectionTimeout, nioParams, isSSL(), sslContextFactory,
10221026
this.maxInboundMessageBodySize);
10231027
}
10241028
return this.frameHandlerFactory;
1029+
} else if (netty) {
1030+
if (this.frameHandlerFactory == null) {
1031+
this.frameHandlerFactory = new NettyFrameHandlerFactory(
1032+
this.nettyConf.eventLoopGroup,
1033+
this.nettyConf.channelCustomizer,
1034+
this.nettyConf.sslContextFactory,
1035+
connectionTimeout, socketConf, maxInboundMessageBodySize);
1036+
}
1037+
return this.frameHandlerFactory;
10251038
} else {
10261039
return new SocketFrameHandlerFactory(connectionTimeout, socketFactory,
10271040
socketConf, isSSL(), this.shutdownExecutor, sslContextFactory,
@@ -1584,6 +1597,7 @@ public NioParams getNioParams() {
15841597
*/
15851598
public ConnectionFactory useNio() {
15861599
this.nio = true;
1600+
this.netty = false;
15871601
return this;
15881602
}
15891603

@@ -1594,6 +1608,7 @@ public ConnectionFactory useNio() {
15941608
*/
15951609
public ConnectionFactory useBlockingIo() {
15961610
this.nio = false;
1611+
this.netty = false;
15971612
return this;
15981613
}
15991614

@@ -1762,4 +1777,56 @@ public ConnectionFactory setTrafficListener(TrafficListener trafficListener) {
17621777
this.trafficListener = trafficListener;
17631778
return this;
17641779
}
1780+
1781+
public ConnectionFactory useNetty() {
1782+
this.netty = true;
1783+
this.nio = false;
1784+
return this;
1785+
}
1786+
1787+
private final NettyConfiguration nettyConf = new NettyConfiguration(this);
1788+
1789+
public NettyConfiguration netty() {
1790+
useNetty();
1791+
return this.nettyConf;
1792+
}
1793+
1794+
public static final class NettyConfiguration {
1795+
1796+
private final ConnectionFactory cf;
1797+
private EventLoopGroup eventLoopGroup;
1798+
private Consumer<io.netty.channel.Channel> channelCustomizer = ch -> { };
1799+
private SslContext sslContext;
1800+
private Function<String, SslContext> sslContextFactory;
1801+
1802+
public NettyConfiguration(ConnectionFactory cf) {
1803+
this.cf = cf;
1804+
}
1805+
1806+
public NettyConfiguration eventLoopGroup(EventLoopGroup eventLoopGroup) {
1807+
this.eventLoopGroup = eventLoopGroup;
1808+
return this;
1809+
}
1810+
1811+
public NettyConfiguration channelCustomizer(Consumer<io.netty.channel.Channel> channelCustomizer) {
1812+
this.channelCustomizer = channelCustomizer;
1813+
return this;
1814+
}
1815+
1816+
public NettyConfiguration sslContext(SslContext sslContext) {
1817+
this.sslContext = sslContext;
1818+
this.sslContextFactory = name -> sslContext;
1819+
return this;
1820+
}
1821+
1822+
public NettyConfiguration sslContextFactory(Function<String, SslContext> sslContextFactory) {
1823+
this.sslContextFactory = sslContextFactory;
1824+
return this;
1825+
}
1826+
1827+
public ConnectionFactory connectionFactory() {
1828+
return this.cf;
1829+
}
1830+
1831+
}
17651832
}

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.slf4j.Logger;
2626
import org.slf4j.LoggerFactory;
2727

28+
import javax.net.ssl.SSLHandshakeException;
2829
import java.io.EOFException;
2930
import java.io.IOException;
3031
import java.net.InetAddress;
@@ -278,11 +279,15 @@ AMQChannel createChannel0() {
278279
}
279280

280281
private void initializeConsumerWorkService() {
281-
this._workService = new ConsumerWorkService(consumerWorkServiceExecutor, threadFactory, workPoolTimeout, shutdownTimeout);
282+
this._workService = new ConsumerWorkService(consumerWorkServiceExecutor, threadFactory, workPoolTimeout, shutdownTimeout);
282283
}
283284

284285
private void initializeHeartbeatSender() {
285-
this._heartbeatSender = new HeartbeatSender(_frameHandler, heartbeatExecutor, threadFactory);
286+
if (_frameHandler.internalHearbeat()) {
287+
this._heartbeatSender = HeartbeatSender.NO_OP;
288+
} else {
289+
this._heartbeatSender = new DefaultHeartbeatSender(_frameHandler, heartbeatExecutor, threadFactory);
290+
}
286291
}
287292

288293
/**
@@ -403,7 +408,11 @@ public void start()
403408
throw te;
404409
} catch (ShutdownSignalException sse) {
405410
_frameHandler.close();
406-
throw AMQChannel.wrap(sse);
411+
if (sse.getCause() instanceof SSLHandshakeException) {
412+
throw (SSLHandshakeException) sse.getCause();
413+
} else {
414+
throw AMQChannel.wrap(sse);
415+
}
407416
}
408417

409418
try {
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
17+
package com.rabbitmq.client.impl;
18+
19+
import com.rabbitmq.client.AMQP;
20+
21+
import java.util.concurrent.ExecutorService;
22+
import java.util.concurrent.ScheduledExecutorService;
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.ThreadFactory;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.ScheduledFuture;
27+
import java.io.IOException;
28+
29+
import static java.util.concurrent.TimeUnit.SECONDS;
30+
31+
/**
32+
* Manages heartbeat sending for a {@link AMQConnection}.
33+
* <p/>
34+
* Heartbeats are sent in a dedicated thread that is separate
35+
* from the main loop thread used for the connection.
36+
*/
37+
final class DefaultHeartbeatSender implements HeartbeatSender {
38+
39+
private final Object monitor = new Object();
40+
41+
private final FrameHandler frameHandler;
42+
private final ThreadFactory threadFactory;
43+
44+
private ScheduledExecutorService executor;
45+
private final boolean privateExecutor;
46+
47+
private ScheduledFuture<?> future;
48+
49+
private boolean shutdown = false;
50+
51+
private volatile long lastActivityTime;
52+
53+
DefaultHeartbeatSender(FrameHandler frameHandler, ScheduledExecutorService heartbeatExecutor, ThreadFactory threadFactory) {
54+
this.frameHandler = frameHandler;
55+
this.privateExecutor = (heartbeatExecutor == null);
56+
this.executor = heartbeatExecutor;
57+
this.threadFactory = threadFactory;
58+
}
59+
60+
public void signalActivity() {
61+
this.lastActivityTime = System.nanoTime();
62+
}
63+
64+
/**
65+
* Sets the heartbeat in seconds.
66+
*/
67+
public void setHeartbeat(int heartbeatSeconds) {
68+
synchronized(this.monitor) {
69+
if(this.shutdown) {
70+
return;
71+
}
72+
73+
// cancel any existing heartbeat task
74+
if(this.future != null) {
75+
this.future.cancel(true);
76+
this.future = null;
77+
}
78+
79+
if (heartbeatSeconds > 0) {
80+
// wake every heartbeatSeconds / 2 to avoid the worst case
81+
// where the last activity comes just after the last heartbeat
82+
long interval = SECONDS.toNanos(heartbeatSeconds) / 2;
83+
ScheduledExecutorService executor = createExecutorIfNecessary();
84+
Runnable task = new HeartbeatRunnable(interval);
85+
this.future = executor.scheduleAtFixedRate(
86+
task, interval, interval, TimeUnit.NANOSECONDS);
87+
}
88+
}
89+
}
90+
91+
private ScheduledExecutorService createExecutorIfNecessary() {
92+
synchronized (this.monitor) {
93+
if (this.executor == null) {
94+
this.executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
95+
}
96+
return this.executor;
97+
}
98+
}
99+
100+
/**
101+
* Shutdown the heartbeat process, if any.
102+
*/
103+
public void shutdown() {
104+
ExecutorService executorToShutdown = null;
105+
synchronized (this.monitor) {
106+
if (this.future != null) {
107+
this.future.cancel(true);
108+
this.future = null;
109+
}
110+
111+
if (this.privateExecutor) {
112+
// to be safe, we shouldn't call shutdown holding the
113+
// monitor.
114+
executorToShutdown = this.executor;
115+
}
116+
117+
this.executor = null;
118+
this.shutdown = true;
119+
}
120+
if(executorToShutdown != null) {
121+
executorToShutdown.shutdown();
122+
}
123+
}
124+
125+
private final class HeartbeatRunnable implements Runnable {
126+
127+
private final long heartbeatNanos;
128+
129+
private HeartbeatRunnable(long heartbeatNanos) {
130+
this.heartbeatNanos = heartbeatNanos;
131+
}
132+
133+
@Override
134+
public void run() {
135+
try {
136+
long now = System.nanoTime();
137+
138+
if (now > (lastActivityTime + this.heartbeatNanos)) {
139+
frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
140+
frameHandler.flush();
141+
}
142+
} catch (IOException e) {
143+
// ignore
144+
}
145+
}
146+
}
147+
}

0 commit comments

Comments
 (0)