Skip to content

Commit b89318f

Browse files
Remove dn_rpc_min_concurrent_client_num to fix minWorker > maxWorker bugs apache#15296
Signed-off-by: OneSizeFitQuorum <[email protected]>
1 parent b8bba8f commit b89318f

File tree

7 files changed

+4
-78
lines changed

7 files changed

+4
-78
lines changed

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -81,39 +81,25 @@ public PipeConsensusConfig build() {
8181
}
8282

8383
public static class RPC {
84-
private final int rpcSelectorThreadNum;
85-
private final int rpcMinConcurrentClientNum;
8684
private final int rpcMaxConcurrentClientNum;
8785
private final int thriftServerAwaitTimeForStopService;
8886
private final boolean isRpcThriftCompressionEnabled;
8987
private final int connectionTimeoutInMs;
9088
private final int thriftMaxFrameSize;
9189

9290
public RPC(
93-
int rpcSelectorThreadNum,
94-
int rpcMinConcurrentClientNum,
9591
int rpcMaxConcurrentClientNum,
9692
int thriftServerAwaitTimeForStopService,
9793
boolean isRpcThriftCompressionEnabled,
9894
int connectionTimeoutInMs,
9995
int thriftMaxFrameSize) {
100-
this.rpcSelectorThreadNum = rpcSelectorThreadNum;
101-
this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
10296
this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
10397
this.thriftServerAwaitTimeForStopService = thriftServerAwaitTimeForStopService;
10498
this.isRpcThriftCompressionEnabled = isRpcThriftCompressionEnabled;
10599
this.connectionTimeoutInMs = connectionTimeoutInMs;
106100
this.thriftMaxFrameSize = thriftMaxFrameSize;
107101
}
108102

109-
public int getRpcSelectorThreadNum() {
110-
return rpcSelectorThreadNum;
111-
}
112-
113-
public int getRpcMinConcurrentClientNum() {
114-
return rpcMinConcurrentClientNum;
115-
}
116-
117103
public int getRpcMaxConcurrentClientNum() {
118104
return rpcMaxConcurrentClientNum;
119105
}
@@ -139,24 +125,12 @@ public static RPC.Builder newBuilder() {
139125
}
140126

141127
public static class Builder {
142-
private int rpcSelectorThreadNum = 1;
143-
private int rpcMinConcurrentClientNum = Runtime.getRuntime().availableProcessors();
144128
private int rpcMaxConcurrentClientNum = 65535;
145129
private int thriftServerAwaitTimeForStopService = 60;
146130
private boolean isRpcThriftCompressionEnabled = false;
147131
private int connectionTimeoutInMs = (int) TimeUnit.SECONDS.toMillis(60);
148132
private int thriftMaxFrameSize = 536870912;
149133

150-
public RPC.Builder setRpcSelectorThreadNum(int rpcSelectorThreadNum) {
151-
this.rpcSelectorThreadNum = rpcSelectorThreadNum;
152-
return this;
153-
}
154-
155-
public RPC.Builder setRpcMinConcurrentClientNum(int rpcMinConcurrentClientNum) {
156-
this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
157-
return this;
158-
}
159-
160134
public RPC.Builder setRpcMaxConcurrentClientNum(int rpcMaxConcurrentClientNum) {
161135
this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
162136
return this;
@@ -185,8 +159,6 @@ public RPC.Builder setThriftMaxFrameSize(int thriftMaxFrameSize) {
185159

186160
public RPC build() {
187161
return new RPC(
188-
rpcSelectorThreadNum,
189-
rpcMinConcurrentClientNum,
190162
rpcMaxConcurrentClientNum,
191163
thriftServerAwaitTimeForStopService,
192164
isRpcThriftCompressionEnabled,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,6 @@ public class IoTDBConfig {
145145
/** Rpc Selector thread num */
146146
private int rpcSelectorThreadCount = 1;
147147

148-
/** Min concurrent client number */
149-
private int rpcMinConcurrentClientNum = Runtime.getRuntime().availableProcessors();
150-
151148
/** Max concurrent client number */
152149
private int rpcMaxConcurrentClientNum = 1000;
153150

@@ -1770,14 +1767,6 @@ public void setRpcSelectorThreadCount(int rpcSelectorThreadCount) {
17701767
this.rpcSelectorThreadCount = rpcSelectorThreadCount;
17711768
}
17721769

1773-
public int getRpcMinConcurrentClientNum() {
1774-
return rpcMinConcurrentClientNum;
1775-
}
1776-
1777-
public void setRpcMinConcurrentClientNum(int rpcMinConcurrentClientNum) {
1778-
this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
1779-
}
1780-
17811770
public int getRpcMaxConcurrentClientNum() {
17821771
return rpcMaxConcurrentClientNum;
17831772
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -728,18 +728,6 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException
728728

729729
conf.setRpcSelectorThreadCount(rpcSelectorThreadNum);
730730

731-
int minConcurrentClientNum =
732-
Integer.parseInt(
733-
properties.getProperty(
734-
"dn_rpc_min_concurrent_client_num",
735-
Integer.toString(conf.getRpcMinConcurrentClientNum())));
736-
737-
if (minConcurrentClientNum <= 0) {
738-
minConcurrentClientNum = Runtime.getRuntime().availableProcessors();
739-
}
740-
741-
conf.setRpcMinConcurrentClientNum(minConcurrentClientNum);
742-
743731
int maxConcurrentClientNum =
744732
Integer.parseInt(
745733
properties.getProperty(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,6 @@ private static ConsensusConfig buildConsensusConfig() {
129129
.setRpc(
130130
RPC.newBuilder()
131131
.setConnectionTimeoutInMs(CONF.getConnectionTimeoutInMS())
132-
.setRpcSelectorThreadNum(CONF.getRpcSelectorThreadCount())
133-
.setRpcMinConcurrentClientNum(CONF.getRpcMinConcurrentClientNum())
134132
.setRpcMaxConcurrentClientNum(CONF.getRpcMaxConcurrentClientNum())
135133
.setRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnable())
136134
.setSelectorNumOfClientManager(CONF.getSelectorNumOfClientManager())
@@ -157,8 +155,6 @@ private static ConsensusConfig buildConsensusConfig() {
157155
PipeConsensusConfig.RPC
158156
.newBuilder()
159157
.setConnectionTimeoutInMs(CONF.getConnectionTimeoutInMS())
160-
.setRpcSelectorThreadNum(CONF.getRpcSelectorThreadCount())
161-
.setRpcMinConcurrentClientNum(CONF.getRpcMinConcurrentClientNum())
162158
.setRpcMaxConcurrentClientNum(CONF.getRpcMaxConcurrentClientNum())
163159
.setIsRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnable())
164160
.setThriftServerAwaitTimeForStopService(

iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -502,11 +502,6 @@ dn_rpc_advanced_compression_enable=false
502502
# Datatype: int
503503
dn_rpc_selector_thread_count=1
504504

505-
# The min number of concurrent clients that can be connected to the dataNode.
506-
# effectiveMode: restart
507-
# Datatype: int
508-
dn_rpc_min_concurrent_client_num=1
509-
510505
# The maximum number of concurrent clients that can be connected to the dataNode.
511506
# effectiveMode: restart
512507
# Datatype: int

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ protected AbstractThriftServiceThread(
113113
String bindAddress,
114114
int port,
115115
int selectorThreads,
116-
int minWorkerThreads,
117116
int maxWorkerThreads,
118117
int timeoutSecond,
119118
TServerEventHandler serverEventHandler,
@@ -134,7 +133,6 @@ protected AbstractThriftServiceThread(
134133
processor,
135134
threadsName,
136135
selectorThreads,
137-
minWorkerThreads,
138136
maxWorkerThreads,
139137
timeoutSecond,
140138
maxReadBufferBytes);
@@ -143,12 +141,7 @@ protected AbstractThriftServiceThread(
143141
case HSHA:
144142
THsHaServer.Args poolArgs1 =
145143
initAsyncedHshaPoolArgs(
146-
processor,
147-
threadsName,
148-
minWorkerThreads,
149-
maxWorkerThreads,
150-
timeoutSecond,
151-
maxReadBufferBytes);
144+
processor, threadsName, maxWorkerThreads, timeoutSecond, maxReadBufferBytes);
152145
poolServer = new THsHaServer(poolArgs1);
153146
break;
154147
default:
@@ -228,10 +221,7 @@ protected AbstractThriftServiceThread(
228221
private TThreadPoolServer.Args initSyncedPoolArgs(
229222
TProcessor processor, String threadsName, int maxWorkerThreads, int timeoutSecond) {
230223
TThreadPoolServer.Args poolArgs = new TThreadPoolServer.Args(serverTransport);
231-
poolArgs
232-
.maxWorkerThreads(maxWorkerThreads)
233-
.minWorkerThreads(Runtime.getRuntime().availableProcessors())
234-
.stopTimeoutVal(timeoutSecond);
224+
poolArgs.maxWorkerThreads(maxWorkerThreads).minWorkerThreads(0).stopTimeoutVal(timeoutSecond);
235225
executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs, threadsName);
236226
poolArgs.executorService = executorService;
237227
poolArgs.processor(processor);
@@ -244,7 +234,6 @@ private TThreadedSelectorServer.Args initAsyncedSelectorPoolArgs(
244234
TBaseAsyncProcessor<?> processor,
245235
String threadsName,
246236
int selectorThreads,
247-
int minWorkerThreads,
248237
int maxWorkerThreads,
249238
int timeoutSecond,
250239
int maxReadBufferBytes) {
@@ -254,7 +243,7 @@ private TThreadedSelectorServer.Args initAsyncedSelectorPoolArgs(
254243
poolArgs.selectorThreads(selectorThreads);
255244
executorService =
256245
IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(
257-
minWorkerThreads, maxWorkerThreads, timeoutSecond, TimeUnit.SECONDS, threadsName);
246+
0, maxWorkerThreads, timeoutSecond, TimeUnit.SECONDS, threadsName);
258247
poolArgs.executorService(executorService);
259248
poolArgs.processor(processor);
260249
poolArgs.protocolFactory(protocolFactory);
@@ -265,15 +254,14 @@ private TThreadedSelectorServer.Args initAsyncedSelectorPoolArgs(
265254
private THsHaServer.Args initAsyncedHshaPoolArgs(
266255
TBaseAsyncProcessor<?> processor,
267256
String threadsName,
268-
int minWorkerThreads,
269257
int maxWorkerThreads,
270258
int timeoutSecond,
271259
int maxReadBufferBytes) {
272260
THsHaServer.Args poolArgs = new THsHaServer.Args((TNonblockingServerTransport) serverTransport);
273261
poolArgs.maxReadBufferBytes = maxReadBufferBytes;
274262
executorService =
275263
IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(
276-
minWorkerThreads, maxWorkerThreads, timeoutSecond, TimeUnit.SECONDS, threadsName);
264+
0, maxWorkerThreads, timeoutSecond, TimeUnit.SECONDS, threadsName);
277265
poolArgs.executorService(executorService);
278266
poolArgs.processor(processor);
279267
poolArgs.protocolFactory(protocolFactory);

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ public ThriftServiceThread(
3535
String bindAddress,
3636
int port,
3737
int selectorThreads,
38-
int minWorkerThreads,
3938
int maxWorkerThreads,
4039
int timeoutSecond,
4140
TServerEventHandler serverEventHandler,
@@ -51,7 +50,6 @@ public ThriftServiceThread(
5150
bindAddress,
5251
port,
5352
selectorThreads,
54-
minWorkerThreads,
5553
maxWorkerThreads,
5654
timeoutSecond,
5755
serverEventHandler,

0 commit comments

Comments
 (0)