diff --git a/pom.xml b/pom.xml index d8b4655..0bbcd0c 100644 --- a/pom.xml +++ b/pom.xml @@ -147,6 +147,13 @@ linux-x86_64 + + io.netty + netty-transport-native-io_uring + ${netty.version} + linux-x86_64 + + org.jgroups jgroups diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java index a1d6f34..846ec97 100644 --- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java +++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java @@ -50,6 +50,8 @@ import io.netty.channel.epoll.EpollIoHandler; import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.nio.NioIoHandler; +import io.netty.channel.uring.IoUringIoHandler; +import io.netty.channel.uring.IoUringSocketChannel; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; import io.netty.util.internal.PlatformDependent; @@ -498,6 +500,18 @@ void setNativeEpoll(String input) throws Exception { volatile boolean nativeEpoll; + @CommandLine.Option( + names = {"--native-io-uring", "-niu"}, + description = "use Netty's native io_uring transport (Linux x86-64 only)", + arity = "0..1", + fallbackValue = "true", + defaultValue = "false") + void setNativeIoUring(String input) throws Exception { + this.nativeIoUring = Converters.BOOLEAN_TYPE_CONVERTER.convert(input); + } + + volatile boolean nativeIoUring; + @ArgGroup(exclusive = false, multiplicity = "0..1") InstanceSyncOptions instanceSyncOptions; @@ -932,10 +946,17 @@ public Integer call() throws Exception { } } + if (this.nativeEpoll && this.nativeIoUring) { + throw new IllegalArgumentException("Cannot use both native epoll and io_uring"); + } + java.util.function.Consumer bootstrapCustomizer; if (this.nativeEpoll) { this.eventLoopGroup = new MultiThreadIoEventLoopGroup(EpollIoHandler.newFactory()); bootstrapCustomizer = b -> b.channel(EpollSocketChannel.class); + } else if (this.nativeIoUring) { + this.eventLoopGroup = new MultiThreadIoEventLoopGroup(IoUringIoHandler.newFactory()); + bootstrapCustomizer = b -> b.channel(IoUringSocketChannel.class); } else { this.eventLoopGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); bootstrapCustomizer = b -> {}; diff --git a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java index d54d61a..a8c2724 100644 --- a/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java @@ -521,6 +521,18 @@ void nativeEpollWorksOnLinux() throws Exception { assertThat(streamExists(s)).isTrue(); } + @Test + @EnabledOnOs(OS.LINUX) + @EnabledIfSystemProperty(named = "os.arch", matches = "amd64") + void nativeIoUringWorksOnLinux() throws Exception { + Future run = run(builder().nativeEpoll()); + waitUntilStreamExists(s); + waitOneSecond(); + run.cancel(true); + waitRunEnds(); + assertThat(streamExists(s)).isTrue(); + } + @Test @BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_13_0) void shouldNotFailWhenFilteringIsActivated() throws Exception { @@ -780,6 +792,11 @@ ArgumentsBuilder nativeEpoll() { return this; } + ArgumentsBuilder nativeIoUring() { + arguments.put("native-io-uring", ""); + return this; + } + ArgumentsBuilder filterValueSet(String... values) { arguments.put("filter-value-set", String.join(",", values)); return this;