Skip to content

Commit 56f2e5f

Browse files
authored
Migrate to Netty 4.2 (kroxylicious#1467)
* Migrate to Netty 4.2 --------- Signed-off-by: Sam Barker <[email protected]>
1 parent 225f2df commit 56f2e5f

File tree

11 files changed

+388
-121
lines changed

11 files changed

+388
-121
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ Format `<github issue/pr number>: <short description>`.
1313
* [#2681](https://github.com/kroxylicious/kroxylicious/pull/2681): Create a maven archetype for filter development io.kroxylicious:kroxylicious-filter-archetype
1414
* [#2778](https://github.com/kroxylicious/kroxylicious/pull/2778): The proxy now allocates a sessionId to connect client and server channels for logging purposes. Allowing users to track activity between downstream and upstream channels.
1515
* [#143](https://github.com/kroxylicious/kroxylicious/issues/143): Add support for Netty metrics
16+
* [#2809](https://github.com/kroxylicious/kroxylicious/pull/2809): Add optional configuration parameter to control the number of worker threads used by Netty.
17+
* [#1467](https://github.com/kroxylicious/kroxylicious/pull/1467): Migrate to Netty 4.2
1618

1719
## 0.16.0
1820

DEV_GUIDE.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,29 @@ You'll see an API response. If the service_timeout change is effective, the soc
380380
will continue for 3 minutes. If `socat` terminates after about 10 seconds, the workaround
381381
has been applied ineffectively.
382382
383+
## Running integration tests with IO_Uring
384+
385+
THe integration test suite enables IO_Uring un-conditionally which may trigger issues with memory limits. Certain platforms e.g. Fedora default to running with `RLIMIT_MEMLOCK` set.
386+
387+
If you see test failures such as
388+
```shell
389+
[ERROR] Errors:
390+
[ERROR] MockServerTest.testClientCanSendAndReceiveRPCToMock:47 » IllegalState failed to create a child event loop
391+
```
392+
or
393+
```shell
394+
java.lang.IllegalStateException: failed to create a child event loop
395+
...
396+
Caused by: java.lang.RuntimeException: failed to allocate memory for io_uring ring; try raising memlock limit (see getrlimit(RLIMIT_MEMLOCK, ...) or ulimit -l): Cannot allocate memory
397+
```
398+
399+
Raise the `RLIMIT_MEMLOCK` (see https://lwn.net/Articles/876288/ for a discussion on the merits or otherwise of the default) by adding entries to `/etc/security/limits.conf` (see https://access.redhat.com/solutions/61334 for details on the file) the updates will take effect in the next login shell.
400+
example config entry:
401+
```text
402+
* hard memlock unlimited
403+
* soft memlock unlimited
404+
```
405+
383406
## Running system tests locally
384407
385408
### Prerequisites

kroxylicious-integration-test-support/pom.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@
8888
</dependency>
8989
<dependency>
9090
<groupId>io.netty</groupId>
91-
<artifactId>netty-codec</artifactId>
91+
<artifactId>netty-codec-base</artifactId>
9292
</dependency>
9393
<dependency>
9494
<groupId>io.netty</groupId>
@@ -102,6 +102,10 @@
102102
<groupId>io.netty</groupId>
103103
<artifactId>netty-transport-classes-kqueue</artifactId>
104104
</dependency>
105+
<dependency>
106+
<groupId>io.netty</groupId>
107+
<artifactId>netty-transport-classes-io_uring</artifactId>
108+
</dependency>
105109
<dependency>
106110
<groupId>org.slf4j</groupId>
107111
<artifactId>slf4j-api</artifactId>

kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/client/EventGroupConfig.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,25 @@
77
package io.kroxylicious.test.client;
88

99
import io.netty.channel.EventLoopGroup;
10+
import io.netty.channel.IoHandlerFactory;
11+
import io.netty.channel.MultiThreadIoEventLoopGroup;
1012
import io.netty.channel.epoll.Epoll;
11-
import io.netty.channel.epoll.EpollEventLoopGroup;
13+
import io.netty.channel.epoll.EpollIoHandler;
1214
import io.netty.channel.epoll.EpollServerSocketChannel;
1315
import io.netty.channel.epoll.EpollSocketChannel;
1416
import io.netty.channel.kqueue.KQueue;
15-
import io.netty.channel.kqueue.KQueueEventLoopGroup;
17+
import io.netty.channel.kqueue.KQueueIoHandler;
1618
import io.netty.channel.kqueue.KQueueServerSocketChannel;
1719
import io.netty.channel.kqueue.KQueueSocketChannel;
18-
import io.netty.channel.nio.NioEventLoopGroup;
20+
import io.netty.channel.nio.NioIoHandler;
1921
import io.netty.channel.socket.ServerSocketChannel;
2022
import io.netty.channel.socket.SocketChannel;
2123
import io.netty.channel.socket.nio.NioServerSocketChannel;
2224
import io.netty.channel.socket.nio.NioSocketChannel;
25+
import io.netty.channel.uring.IoUring;
26+
import io.netty.channel.uring.IoUringIoHandler;
27+
import io.netty.channel.uring.IoUringServerSocketChannel;
28+
import io.netty.channel.uring.IoUringSocketChannel;
2329

2430
public record EventGroupConfig(
2531
Class<? extends SocketChannel> clientChannelClass,
@@ -29,7 +35,11 @@ public static EventGroupConfig create() {
2935
final Class<? extends SocketChannel> clientChannelClass;
3036
final Class<? extends ServerSocketChannel> serverChannelClass;
3137

32-
if (Epoll.isAvailable()) {
38+
if (IoUring.isAvailable()) {
39+
clientChannelClass = IoUringSocketChannel.class;
40+
serverChannelClass = IoUringServerSocketChannel.class;
41+
}
42+
else if (Epoll.isAvailable()) {
3343
clientChannelClass = EpollSocketChannel.class;
3444
serverChannelClass = EpollServerSocketChannel.class;
3545
}
@@ -44,23 +54,28 @@ else if (KQueue.isAvailable()) {
4454
return new EventGroupConfig(clientChannelClass, serverChannelClass);
4555
}
4656

47-
private static EventLoopGroup newGroup(int nThreads) {
48-
if (Epoll.isAvailable()) {
49-
return new EpollEventLoopGroup(nThreads);
57+
private static EventLoopGroup newGroup() {
58+
final IoHandlerFactory ioHandlerFactory;
59+
if (IoUring.isAvailable()) {
60+
ioHandlerFactory = IoUringIoHandler.newFactory();
61+
}
62+
else if (Epoll.isAvailable()) {
63+
ioHandlerFactory = EpollIoHandler.newFactory();
5064
}
5165
else if (KQueue.isAvailable()) {
52-
return new KQueueEventLoopGroup(nThreads);
66+
ioHandlerFactory = KQueueIoHandler.newFactory();
5367
}
5468
else {
55-
return new NioEventLoopGroup(nThreads);
69+
ioHandlerFactory = NioIoHandler.newFactory();
5670
}
71+
return new MultiThreadIoEventLoopGroup(1, ioHandlerFactory);
5772
}
5873

5974
public EventLoopGroup newWorkerGroup() {
60-
return newGroup(1);
75+
return newGroup();
6176
}
6277

6378
public EventLoopGroup newBossGroup() {
64-
return newGroup(1);
79+
return newGroup();
6580
}
6681
}

kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/server/MockServer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public int start(int port, ResponsePayload response, SslContext serverSslContext
125125
final EventGroupConfig eventGroupConfig = EventGroupConfig.create();
126126
bossGroup = eventGroupConfig.newBossGroup();
127127
workerGroup = eventGroupConfig.newWorkerGroup();
128-
serverHandler = new MockHandler(response);
128+
serverHandler = new MockHandler(null);
129129
ServerBootstrap b = new ServerBootstrap();
130130
b.group(bossGroup, workerGroup)
131131
.channel(eventGroupConfig.serverChannelClass())
@@ -143,6 +143,10 @@ public void initChannel(SocketChannel ch) {
143143
}
144144
});
145145

146+
if (response != null) {
147+
addMockResponseForApiKey(response);
148+
}
149+
146150
// Start the server.
147151
ChannelFuture f;
148152
try {

kroxylicious-integration-test-support/src/test/java/io/kroxylicious/mock/MockServerTest.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414

1515
import org.apache.kafka.common.message.ApiMessageType;
1616
import org.apache.kafka.common.protocol.ApiMessage;
17+
import org.junit.jupiter.api.AfterAll;
18+
import org.junit.jupiter.api.BeforeAll;
19+
import org.junit.jupiter.api.BeforeEach;
1720
import org.junit.jupiter.params.ParameterizedTest;
1821
import org.junit.jupiter.params.provider.MethodSource;
1922

@@ -31,6 +34,19 @@
3134
class MockServerTest {
3235
private static final Map<ApiAndVersion, ApiMessage> responseSamples = ApiMessageSampleGenerator.createResponseSamples();
3336
private static final Map<ApiAndVersion, ApiMessage> requestSamples = ApiMessageSampleGenerator.createRequestSamples();
37+
private static MockServer mockServer;
38+
private static KafkaClient kafkaClient;
39+
40+
@BeforeAll
41+
static void beforeAll() {
42+
mockServer = MockServer.startOnRandomPort(null);
43+
kafkaClient = new KafkaClient("127.0.0.1", mockServer.port());
44+
}
45+
46+
@AfterAll
47+
static void afterAll() {
48+
mockServer.close();
49+
}
3450

3551
public static Stream<ApiAndVersion> allSupportedApiVersions() {
3652
return DataClasses.getRequestClasses().keySet().stream().flatMap(apiKeys -> {
@@ -40,16 +56,20 @@ public static Stream<ApiAndVersion> allSupportedApiVersions() {
4056
});
4157
}
4258

59+
@BeforeEach
60+
void setUp() {
61+
mockServer.clear();
62+
63+
}
64+
4365
@ParameterizedTest
4466
@MethodSource("allSupportedApiVersions")
4567
void testClientCanSendAndReceiveRPCToMock(ApiAndVersion apiKey) throws Exception {
4668
ResponsePayload mockResponse = getResponse(apiKey);
47-
try (var mockServer = MockServer.startOnRandomPort(mockResponse);
48-
var kafkaClient = new KafkaClient("127.0.0.1", mockServer.port())) {
49-
CompletableFuture<Response> future = kafkaClient.get(getRequest(apiKey));
50-
Response clientResponse = future.get(10, TimeUnit.SECONDS);
51-
assertEquals(mockResponse, clientResponse.payload());
52-
}
69+
mockServer.addMockResponseForApiKey(mockResponse);
70+
CompletableFuture<Response> future = kafkaClient.get(getRequest(apiKey));
71+
Response clientResponse = future.get(10, TimeUnit.SECONDS);
72+
assertEquals(mockResponse, clientResponse.payload());
5373
}
5474

5575
private ResponsePayload getResponse(ApiAndVersion apiAndVersion) {

kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/IOUringIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* Integration test that tests Kroxylicious ability to utilise Linux io_uring.
2929
*/
3030
@ExtendWith(KafkaClusterExtension.class)
31-
@EnabledIf(value = "io.netty.incubator.channel.uring.IOUring#isAvailable", disabledReason = "IOUring is not available")
31+
@EnabledIf(value = "io.netty.channel.uring.IoUring#isAvailable", disabledReason = "IOUring is not available")
3232
class IOUringIT extends BaseIT {
3333

3434
private static final String HELLO_WORLD = "helloworld";

kroxylicious-runtime/pom.xml

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@
9595
</dependency>
9696
<dependency>
9797
<groupId>io.netty</groupId>
98-
<artifactId>netty-codec</artifactId>
98+
<artifactId>netty-codec-base</artifactId>
9999
</dependency>
100100
<dependency>
101101
<groupId>io.netty</groupId>
@@ -120,7 +120,7 @@
120120
<dependency>
121121
<groupId>io.netty</groupId>
122122
<artifactId>netty-transport-native-epoll</artifactId>
123-
<classifier>${netty.epoll.classifier}</classifier>
123+
<classifier>${netty.epoll.architecture}</classifier>
124124
<scope>runtime</scope>
125125
</dependency>
126126
<dependency>
@@ -130,21 +130,21 @@
130130
<dependency>
131131
<groupId>io.netty</groupId>
132132
<artifactId>netty-transport-native-kqueue</artifactId>
133-
<classifier>${netty.kqueue.classifier}</classifier>
133+
<classifier>${netty.kqueue.architecture}</classifier>
134134
<scope>runtime</scope>
135135
</dependency>
136136
<dependency>
137137
<groupId>io.netty</groupId>
138138
<artifactId>netty-transport-native-unix-common</artifactId>
139139
</dependency>
140140
<dependency>
141-
<groupId>io.netty.incubator</groupId>
142-
<artifactId>netty-incubator-transport-classes-io_uring</artifactId>
141+
<groupId>io.netty</groupId>
142+
<artifactId>netty-transport-classes-io_uring</artifactId>
143143
</dependency>
144144
<dependency>
145-
<groupId>io.netty.incubator</groupId>
146-
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
147-
<classifier>${netty.io_uring.classifier}</classifier>
145+
<groupId>io.netty</groupId>
146+
<artifactId>netty-transport-native-io_uring</artifactId>
147+
<classifier>${netty.io_uring.architecture}</classifier>
148148
<scope>runtime</scope>
149149
</dependency>
150150
<dependency>
@@ -186,6 +186,11 @@
186186
<artifactId>junit-jupiter-params</artifactId>
187187
<scope>test</scope>
188188
</dependency>
189+
<dependency>
190+
<groupId>org.junit-pioneer</groupId>
191+
<artifactId>junit-pioneer</artifactId>
192+
<scope>test</scope>
193+
</dependency>
189194
<dependency>
190195
<groupId>org.mockito</groupId>
191196
<artifactId>mockito-core</artifactId>

0 commit comments

Comments
 (0)