Skip to content

Commit c1b2359

Browse files
committed
Add --native-io-uring option
1 parent 606bd9a commit c1b2359

File tree

3 files changed

+45
-0
lines changed

3 files changed

+45
-0
lines changed

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,13 @@
147147
<classifier>linux-x86_64</classifier>
148148
</dependency>
149149

150+
<dependency>
151+
<groupId>io.netty</groupId>
152+
<artifactId>netty-transport-native-io_uring</artifactId>
153+
<version>${netty.version}</version>
154+
<classifier>linux-x86_64</classifier>
155+
</dependency>
156+
150157
<dependency>
151158
<groupId>org.jgroups</groupId>
152159
<artifactId>jgroups</artifactId>

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import io.netty.channel.epoll.EpollIoHandler;
5151
import io.netty.channel.epoll.EpollSocketChannel;
5252
import io.netty.channel.nio.NioIoHandler;
53+
import io.netty.channel.uring.IoUringIoHandler;
54+
import io.netty.channel.uring.IoUringSocketChannel;
5355
import io.netty.handler.ssl.SslContextBuilder;
5456
import io.netty.handler.ssl.SslHandler;
5557
import io.netty.util.internal.PlatformDependent;
@@ -498,6 +500,18 @@ void setNativeEpoll(String input) throws Exception {
498500

499501
volatile boolean nativeEpoll;
500502

503+
@CommandLine.Option(
504+
names = {"--native-io-uring", "-niu"},
505+
description = "use Netty's native io_uring transport (Linux x86-64 only)",
506+
arity = "0..1",
507+
fallbackValue = "true",
508+
defaultValue = "false")
509+
void setNativeIoUring(String input) throws Exception {
510+
this.nativeIoUring = Converters.BOOLEAN_TYPE_CONVERTER.convert(input);
511+
}
512+
513+
volatile boolean nativeIoUring;
514+
501515
@ArgGroup(exclusive = false, multiplicity = "0..1")
502516
InstanceSyncOptions instanceSyncOptions;
503517

@@ -932,10 +946,17 @@ public Integer call() throws Exception {
932946
}
933947
}
934948

949+
if (this.nativeEpoll && this.nativeIoUring) {
950+
throw new IllegalArgumentException("Cannot use both native epoll and io_uring");
951+
}
952+
935953
java.util.function.Consumer<Bootstrap> bootstrapCustomizer;
936954
if (this.nativeEpoll) {
937955
this.eventLoopGroup = new MultiThreadIoEventLoopGroup(EpollIoHandler.newFactory());
938956
bootstrapCustomizer = b -> b.channel(EpollSocketChannel.class);
957+
} else if (this.nativeIoUring) {
958+
this.eventLoopGroup = new MultiThreadIoEventLoopGroup(IoUringIoHandler.newFactory());
959+
bootstrapCustomizer = b -> b.channel(IoUringSocketChannel.class);
939960
} else {
940961
this.eventLoopGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
941962
bootstrapCustomizer = b -> {};

src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,18 @@ void nativeEpollWorksOnLinux() throws Exception {
521521
assertThat(streamExists(s)).isTrue();
522522
}
523523

524+
@Test
525+
@EnabledOnOs(OS.LINUX)
526+
@EnabledIfSystemProperty(named = "os.arch", matches = "amd64")
527+
void nativeIoUringWorksOnLinux() throws Exception {
528+
Future<?> run = run(builder().nativeEpoll());
529+
waitUntilStreamExists(s);
530+
waitOneSecond();
531+
run.cancel(true);
532+
waitRunEnds();
533+
assertThat(streamExists(s)).isTrue();
534+
}
535+
524536
@Test
525537
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_13_0)
526538
void shouldNotFailWhenFilteringIsActivated() throws Exception {
@@ -780,6 +792,11 @@ ArgumentsBuilder nativeEpoll() {
780792
return this;
781793
}
782794

795+
ArgumentsBuilder nativeIoUring() {
796+
arguments.put("native-io-uring", "");
797+
return this;
798+
}
799+
783800
ArgumentsBuilder filterValueSet(String... values) {
784801
arguments.put("filter-value-set", String.join(",", values));
785802
return this;

0 commit comments

Comments
 (0)