Skip to content

Commit 4764775

Browse files
authored
Make the proxy adapter worker thread configrable (#1549)
1 parent 7d76b42 commit 4764775

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ public class MQTTProxyConfiguration extends MQTTCommonConfiguration {
6969
)
7070
private int maxNoOfChannels = 1;
7171

72+
@FieldContext(
73+
category = CATEGORY_MQTT,
74+
doc = "Number of threads to use for Netty IO for proxy to broker. "
75+
+ "Default is set to `Runtime.getRuntime().availableProcessors()`"
76+
)
77+
private int proxyAdapterNumIOThreads = Runtime.getRuntime().availableProcessors();
78+
7279
@FieldContext(
7380
category = CATEGORY_MQTT,
7481
doc = "Proxy connect to broker timeout (ms)"

mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/MQTTProxyAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,14 @@ public class MQTTProxyAdapter {
7171
private final AtomicLong counter = new AtomicLong(0);
7272
@Getter
7373
private final ConcurrentMap<InetSocketAddress, Map<Integer, CompletableFuture<Channel>>> pool;
74-
private final int workerThread = Runtime.getRuntime().availableProcessors();
7574
private final int maxNoOfChannels;
7675

7776
public MQTTProxyAdapter(MQTTProxyService proxyService) {
7877
this.proxyService = proxyService;
7978
this.pool = new ConcurrentHashMap<>();
8079
this.bootstrap = new Bootstrap();
81-
this.eventLoopGroup = EventLoopUtil.newEventLoopGroup(workerThread, false, threadFactory);
80+
this.eventLoopGroup = EventLoopUtil.newEventLoopGroup(
81+
proxyService.getProxyConfig().getProxyAdapterNumIOThreads(), false, threadFactory);
8282
this.maxNoOfChannels = proxyService.getProxyConfig().getMaxNoOfChannels();
8383
this.bootstrap.group(eventLoopGroup)
8484
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, proxyService.getProxyConfig().getConnectTimeoutMs())

0 commit comments

Comments
 (0)