Skip to content

Commit 225f2df

Browse files
SamBarkerrobobario
andauthored
Feat: Allow the configuration of the number of thread to use for the event loop
* Make the thread count used by the proxy an optional configuration property --------- Signed-off-by: Sam Barker <[email protected]> Co-authored-by: Robert Young <[email protected]>
1 parent 4c0d655 commit 225f2df

File tree

13 files changed

+237
-46
lines changed

13 files changed

+237
-46
lines changed

kroxylicious-integration-test-support/src/main/java/io/kroxylicious/proxy/config/BuilderConfig.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
"io.kroxylicious.proxy.config.tls.ServerOptions",
3030
"io.kroxylicious.proxy.config.tls.InsecureTls",
3131
"io.kroxylicious.proxy.config.tls.KeyStore",
32-
"io.kroxylicious.proxy.config.tls.KeyPair"
32+
"io.kroxylicious.proxy.config.tls.KeyPair",
33+
"io.kroxylicious.proxy.config.NetworkDefinition",
34+
"io.kroxylicious.proxy.config.NettySettings"
3335
})
3436
public final class BuilderConfig {
3537
public static final String TARGET_CONFIG_PACKAGE = "io.kroxylicious.proxy.config.model";

kroxylicious-integration-test-support/src/test/java/io/kroxylicious/proxy/config/ConfigurationTest.java

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ void shouldRejectSniGatewayWithNoAdvertisedBrokerAddressPattern() {
117117
.hasMessageContaining("Missing required creator property 'advertisedBrokerAddressPattern'");
118118
}
119119

120+
@SuppressWarnings("deprecation")
120121
static Stream<Arguments> fluentApiConfigYamlFidelity() {
121122
NamedFilterDefinition filter = new NamedFilterDefinitionBuilder("filter-1", ExampleFilterFactory.class.getSimpleName())
122123
.withConfig("examplePlugin", "ExamplePluginInstance",
@@ -436,6 +437,38 @@ static Stream<Arguments> fluentApiConfigYamlFidelity() {
436437
- name: default
437438
portIdentifiesNode:
438439
bootstrapAddress: cluster1:9192
440+
"""),
441+
argumentSet("Proxy worker thread count",
442+
new ConfigurationBuilder().addToVirtualClusters(VIRTUAL_CLUSTER).withNewNetwork().withNewProxy().withWorkerThreadCount(5).endProxy().endNetwork()
443+
.build(),
444+
"""
445+
network:
446+
proxy:
447+
workerThreadCount: 5
448+
virtualClusters:
449+
- name: demo
450+
targetCluster:
451+
bootstrapServers: kafka.example:1234
452+
gateways:
453+
- name: default
454+
portIdentifiesNode:
455+
bootstrapAddress: example.com:1234
456+
"""),
457+
argumentSet("Manaegment worker thread count",
458+
new ConfigurationBuilder().addToVirtualClusters(VIRTUAL_CLUSTER).withNewNetwork().withNewManagement().withWorkerThreadCount(2).endManagement()
459+
.endNetwork().build(),
460+
"""
461+
network:
462+
management:
463+
workerThreadCount: 2
464+
virtualClusters:
465+
- name: demo
466+
targetCluster:
467+
bootstrapServers: kafka.example:1234
468+
gateways:
469+
- name: default
470+
portIdentifiesNode:
471+
bootstrapAddress: example.com:1234
439472
""")
440473

441474
);
@@ -458,13 +491,15 @@ void shouldRejectFilterDefinitionsWithSameName() {
458491
new NamedFilterDefinition("foo", "", ""));
459492
Optional<Map<String, Object>> development = Optional.empty();
460493
var virtualCluster = List.of(VIRTUAL_CLUSTER);
494+
NetworkDefinition network = null;
461495
assertThatThrownBy(() -> new Configuration(null,
462496
filterDefinitions,
463497
null,
464498
virtualCluster,
465499
null,
466500
false,
467-
development))
501+
development,
502+
network))
468503
.isInstanceOf(IllegalConfigurationException.class)
469504
.hasMessage("'filterDefinitions' contains multiple items with the same names: [foo]");
470505
}
@@ -480,7 +515,8 @@ void shouldRejectMissingDefaultFilter() {
480515
virtualCluster,
481516
null,
482517
false,
483-
development))
518+
development,
519+
null))
484520
.isInstanceOf(IllegalConfigurationException.class)
485521
.hasMessage("'defaultFilters' references filters not defined in 'filterDefinitions': [missing]");
486522
}
@@ -494,11 +530,13 @@ void shouldRejectMissingClusterFilter() {
494530
List<VirtualCluster> virtualClusters = List
495531
.of(new VirtualCluster("vc1", targetCluster, defaultGateway, false, false, List.of("missing")));
496532
assertThatThrownBy(() -> new Configuration(
497-
null, filterDefinitions,
533+
null,
534+
filterDefinitions,
498535
null,
499536
virtualClusters,
500537
null, false,
501-
development))
538+
development,
539+
null))
502540
.isInstanceOf(IllegalConfigurationException.class)
503541
.hasMessage("'virtualClusters.vc1.filters' references filters not defined in 'filterDefinitions': [missing]");
504542
}
@@ -517,11 +555,14 @@ void shouldRejectUnusedFilterDefinition() {
517555
List<VirtualClusterGateway> defaultGateway = List.of(VIRTUAL_CLUSTER_GATEWAY);
518556
TargetCluster targetCluster = new TargetCluster("unused:9082", Optional.empty());
519557
List<VirtualCluster> virtualClusters = List.of(new VirtualCluster("vc1", targetCluster, defaultGateway, false, false, List.of("used2")));
520-
assertThatThrownBy(() -> new Configuration(null, filterDefinitions,
558+
assertThatThrownBy(() -> new Configuration(null,
559+
filterDefinitions,
521560
defaultFilters,
522561
virtualClusters,
523-
null, false,
524-
development))
562+
null,
563+
false,
564+
development,
565+
null))
525566
.isInstanceOf(IllegalConfigurationException.class)
526567
.hasMessage("'filterDefinitions' defines filters which are not used in 'defaultFilters' or in any virtual cluster's 'filters': [unused]");
527568
}
@@ -551,11 +592,14 @@ void virtualClusterModelShouldUseCorrectFilters() {
551592
null); // filters not defined => should default to the top level
552593

553594
Configuration configuration = new Configuration(
554-
null, filterDefinitions,
595+
null,
596+
filterDefinitions,
555597
List.of("bar"),
556598
List.of(direct, defaulted),
557-
null, false,
558-
Optional.empty());
599+
null,
600+
false,
601+
Optional.empty(),
602+
null);
559603

560604
// When
561605
var model = configuration.virtualClusterModel(new ServiceBasedPluginFactoryRegistry());

kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/IOUringIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class IOUringIT extends BaseIT {
3636
@Test
3737
void proxyUsingIOUring(KafkaCluster cluster, Topic topic) throws Exception {
3838

39-
var proxy = proxy(cluster).withUseIoUring();
39+
var proxy = proxy(cluster).withUseIoUring().withNewNetwork().withNewProxy().withWorkerThreadCount(2).endProxy().endNetwork();
4040

4141
try (var tester = kroxyliciousTester(proxy);
4242
var producer = tester.producer();

kroxylicious-operator-test-support/src/test/java/io/kroxylicious/kubernetes/operator/assertj/OperatorAssertionsTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ void shouldReturnConfigurationAssert() {
105105
List.of())),
106106
List.of(),
107107
false,
108-
Optional.empty());
108+
Optional.empty(),
109+
null);
109110

110111
// When
111112
Assert<?, ?> actualAssertion = OperatorAssertions.assertThat(configurations);

kroxylicious-operator-test-support/src/test/java/io/kroxylicious/kubernetes/operator/assertj/ProxyConfigAssertTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ void virtualClusterWhenNotContainedInConfig() {
2929
VirtualCluster virtualCluster = new VirtualCluster("cluster", new TargetCluster("localhost:9092", Optional.empty()),
3030
List.of(virtualClusterGateway), false,
3131
false, List.of());
32-
Configuration config = new Configuration(null, null, null, List.of(virtualCluster), List.of(), false, Optional.empty());
32+
Configuration config = new Configuration(null, null, null, List.of(virtualCluster), List.of(), false, Optional.empty(), null);
3333

3434
Assertions.assertThatThrownBy(() -> {
3535
OperatorAssertions.assertThat(config).cluster("arbitrary");
@@ -44,7 +44,7 @@ void virtualClusterWhenContainedInConfig() {
4444
VirtualCluster virtualCluster = new VirtualCluster(clusterName, new TargetCluster("localhost:9092", Optional.empty()),
4545
List.of(virtualClusterGateway), false,
4646
false, List.of());
47-
Configuration config = new Configuration(null, null, null, List.of(virtualCluster), List.of(), false, Optional.empty());
47+
Configuration config = new Configuration(null, null, null, List.of(virtualCluster), List.of(), false, Optional.empty(), null);
4848

4949
ProxyConfigAssert.ProxyConfigClusterAssert cluster = OperatorAssertions.assertThat(config).cluster(clusterName);
5050
Assertions.assertThat(cluster.actual()).isNotNull().isSameAs(virtualCluster);

kroxylicious-operator/src/main/java/io/kroxylicious/kubernetes/operator/KafkaProxyReconciler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,8 @@ private ConfigurationFragment<Configuration> generateProxyConfig(ProxyModel mode
198198
List.of(),
199199
false,
200200
// micrometer
201-
Optional.empty()),
201+
Optional.empty(),
202+
null),
202203
allVolumes,
203204
allMounts);
204205
}

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/KafkaProxy.java

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.concurrent.CompletableFuture;
1313
import java.util.concurrent.ForkJoinPool;
1414
import java.util.concurrent.atomic.AtomicBoolean;
15+
import java.util.function.Function;
1516
import java.util.stream.Collectors;
1617
import java.util.stream.Stream;
1718

@@ -42,6 +43,8 @@
4243
import io.kroxylicious.proxy.config.Configuration;
4344
import io.kroxylicious.proxy.config.IllegalConfigurationException;
4445
import io.kroxylicious.proxy.config.MicrometerDefinition;
46+
import io.kroxylicious.proxy.config.NettySettings;
47+
import io.kroxylicious.proxy.config.NetworkDefinition;
4548
import io.kroxylicious.proxy.config.PluginFactoryRegistry;
4649
import io.kroxylicious.proxy.config.admin.ManagementConfiguration;
4750
import io.kroxylicious.proxy.internal.ApiVersionsServiceImpl;
@@ -74,20 +77,32 @@ public List<Future<?>> shutdownGracefully() {
7477
return List.of(bossGroup.shutdownGracefully(), workerGroup.shutdownGracefully());
7578
}
7679

77-
public static EventGroupConfig build(String name, int availableCores, boolean useIoUring) {
80+
public static EventGroupConfig build(String name, Configuration configuration, Function<NetworkDefinition, NettySettings> settingsSupplier, boolean useIoUring) {
81+
int workerThreadCount = resolveThreadCount(configuration, settingsSupplier);
7882
if (useIoUring) {
7983
if (!IOUring.isAvailable()) {
8084
throw new IllegalStateException("io_uring not available due to: " + IOUring.unavailabilityCause());
8185
}
82-
return new EventGroupConfig(name, new IOUringEventLoopGroup(1), new IOUringEventLoopGroup(availableCores), IOUringServerSocketChannel.class);
86+
LOGGER.info("Using IOUring with {} event loop threads", workerThreadCount);
87+
return new EventGroupConfig(name, new IOUringEventLoopGroup(1), new IOUringEventLoopGroup(workerThreadCount), IOUringServerSocketChannel.class);
8388
}
8489
if (Epoll.isAvailable()) {
85-
return new EventGroupConfig(name, new EpollEventLoopGroup(1), new EpollEventLoopGroup(availableCores), EpollServerSocketChannel.class);
90+
LOGGER.info("Using EPOll with {} event loop threads", workerThreadCount);
91+
return new EventGroupConfig(name, new EpollEventLoopGroup(1), new EpollEventLoopGroup(workerThreadCount), EpollServerSocketChannel.class);
8692
}
8793
if (KQueue.isAvailable()) {
88-
return new EventGroupConfig(name, new KQueueEventLoopGroup(1), new KQueueEventLoopGroup(availableCores), KQueueServerSocketChannel.class);
94+
LOGGER.info("Using KQueue with {} event loop threads", workerThreadCount);
95+
return new EventGroupConfig(name, new KQueueEventLoopGroup(1), new KQueueEventLoopGroup(workerThreadCount), KQueueServerSocketChannel.class);
8996
}
90-
return new EventGroupConfig(name, new NioEventLoopGroup(1), new NioEventLoopGroup(availableCores), NioServerSocketChannel.class);
97+
LOGGER.info("Falling back to NIO with {} event loop threads", workerThreadCount);
98+
return new EventGroupConfig(name, new NioEventLoopGroup(1), new NioEventLoopGroup(workerThreadCount), NioServerSocketChannel.class);
99+
}
100+
101+
private static int resolveThreadCount(Configuration configuration, Function<NetworkDefinition, NettySettings> settingsSupplier) {
102+
return Optional.ofNullable(configuration.network())
103+
.map(settingsSupplier)
104+
.flatMap(NettySettings::workerThreadCount)
105+
.orElse(Runtime.getRuntime().availableProcessors());
91106
}
92107
}
93108

@@ -124,6 +139,18 @@ static Configuration validate(Configuration config, Features features) {
124139
return config;
125140
}
126141

142+
@VisibleForTesting
143+
@Nullable
144+
EventGroupConfig managementEventGroup() {
145+
return managementEventGroup;
146+
}
147+
148+
@VisibleForTesting
149+
@Nullable
150+
EventGroupConfig serverEventGroup() {
151+
return serverEventGroup;
152+
}
153+
127154
/**
128155
* Starts this proxy.
129156
* @return This proxy.
@@ -143,10 +170,8 @@ public KafkaProxy startup() {
143170
.map(c -> new HostPort(c.getEffectiveBindAddress(), c.getEffectivePort()));
144171
portConflictDefector.validate(virtualClusterModels, managementHostPort);
145172

146-
var availableCores = Runtime.getRuntime().availableProcessors();
147-
148-
this.managementEventGroup = EventGroupConfig.build("management", availableCores, config.isUseIoUring());
149-
this.serverEventGroup = EventGroupConfig.build("server", availableCores, config.isUseIoUring());
173+
this.managementEventGroup = EventGroupConfig.build("management", config, NetworkDefinition::management, config.isUseIoUring());
174+
this.serverEventGroup = EventGroupConfig.build("proxy", config, NetworkDefinition::proxy, config.isUseIoUring());
150175

151176
enableNettyMetrics(managementEventGroup, serverEventGroup);
152177

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/config/Configuration.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,18 @@
3636
* @param micrometer The micrometer config
3737
* @param useIoUring true to use iouring
3838
* @param development Development options
39+
* @param network Controls aspects of network configuration for the proxy.
3940
*/
40-
@JsonPropertyOrder({ "management", "filterDefinitions", "defaultFilters", "virtualClusters", "micrometer", "useIoUring", "development" })
41+
@JsonPropertyOrder({ "management", "filterDefinitions", "defaultFilters", "virtualClusters", "micrometer", "useIoUring", "development", "network" })
4142
public record Configuration(
4243
@Nullable ManagementConfiguration management,
4344
@Nullable List<NamedFilterDefinition> filterDefinitions,
4445
@Nullable List<String> defaultFilters,
4546
@JsonProperty(required = true) List<VirtualCluster> virtualClusters,
4647
@Nullable List<MicrometerDefinition> micrometer,
4748
boolean useIoUring,
48-
Optional<Map<String, Object>> development) {
49+
Optional<Map<String, Object>> development,
50+
@Nullable NetworkDefinition network) {
4951

5052
/**
5153
* Creates an instance of configuration.
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.proxy.config;
8+
9+
import java.util.Optional;
10+
11+
public record NettySettings(Optional<Integer> workerThreadCount) {
12+
13+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright Kroxylicious Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
7+
package io.kroxylicious.proxy.config;
8+
9+
import edu.umd.cs.findbugs.annotations.Nullable;
10+
11+
public record NetworkDefinition(@Nullable NettySettings management, @Nullable NettySettings proxy) {
12+
13+
}

0 commit comments

Comments
 (0)