Skip to content

Commit e6e0702

Browse files
zustonJunfan Zhang
andauthored
[#2354] feat(client): Explicitly setting grpc netty based event loop threads to avoid too much threads (#2355)
### What changes were proposed in this pull request? Introduing the config option to set the grpc netty based network impl event loop threads ### Why are the changes needed? Fix: #2354 ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Tests in the spark jobs Co-authored-by: Junfan Zhang <zhangjunfan@qiyi.com>
1 parent 3590940 commit e6e0702

File tree

4 files changed

+42
-7
lines changed

4 files changed

+42
-7
lines changed

common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,4 +317,10 @@ public class RssClientConf {
317317
.asList()
318318
.noDefaultValue()
319319
.withDescription("the report include properties could be configured by this option");
320+
321+
public static final ConfigOption<Integer> RSS_CLIENT_GRPC_EVENT_LOOP_THREADS =
322+
ConfigOptions.key("rss.client.grpc.nettyEventLoopThreads")
323+
.intType()
324+
.defaultValue(-1)
325+
.withDescription("the event loop threads of netty impl for grpc");
320326
}

internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public abstract class GrpcClient {
3737
protected ManagedChannel channel;
3838

3939
protected GrpcClient(String host, int port, int maxRetryAttempts, boolean usePlaintext) {
40-
this(host, port, maxRetryAttempts, usePlaintext, 0, 0, 0);
40+
this(host, port, maxRetryAttempts, usePlaintext, 0, 0, 0, -1);
4141
}
4242

4343
protected GrpcClient(
@@ -47,12 +47,18 @@ protected GrpcClient(
4747
boolean usePlaintext,
4848
int pageSize,
4949
int maxOrder,
50-
int smallCacheSize) {
50+
int smallCacheSize,
51+
int nettyEventLoopThreads) {
5152
this.host = host;
5253
this.port = port;
5354
this.maxRetryAttempts = maxRetryAttempts;
5455
this.usePlaintext = usePlaintext;
5556

57+
if (nettyEventLoopThreads > 0) {
58+
System.setProperty(
59+
"io.grpc.netty.shaded.io.netty.eventLoopThreads", String.valueOf(nettyEventLoopThreads));
60+
}
61+
5662
NettyChannelBuilder channelBuilder =
5763
NettyChannelBuilder.forAddress(host, port)
5864
.withOption(

internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@
118118
import org.apache.uniffle.proto.ShuffleServerGrpc;
119119
import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerBlockingStub;
120120

121+
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_GRPC_EVENT_LOOP_THREADS;
121122
import static org.apache.uniffle.proto.RssProtos.StatusCode.NO_BUFFER;
122123

123124
public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServerClient {
@@ -155,7 +156,8 @@ public ShuffleServerGrpcClient(String host, int port) {
155156
true,
156157
0,
157158
0,
158-
0);
159+
0,
160+
-1);
159161
}
160162

161163
public ShuffleServerGrpcClient(RssConf rssConf, String host, int port) {
@@ -171,7 +173,8 @@ public ShuffleServerGrpcClient(RssConf rssConf, String host, int port) {
171173
true,
172174
0,
173175
0,
174-
0);
176+
0,
177+
rssConf == null ? -1 : rssConf.get(RSS_CLIENT_GRPC_EVENT_LOOP_THREADS));
175178
}
176179

177180
public ShuffleServerGrpcClient(
@@ -182,8 +185,17 @@ public ShuffleServerGrpcClient(
182185
boolean usePlaintext,
183186
int pageSize,
184187
int maxOrder,
185-
int smallCacheSize) {
186-
super(host, port, maxRetryAttempts, usePlaintext, pageSize, maxOrder, smallCacheSize);
188+
int smallCacheSize,
189+
int nettyEventLoopThreads) {
190+
super(
191+
host,
192+
port,
193+
maxRetryAttempts,
194+
usePlaintext,
195+
pageSize,
196+
maxOrder,
197+
smallCacheSize,
198+
nettyEventLoopThreads);
187199
blockingStub = ShuffleServerGrpc.newBlockingStub(channel);
188200
rpcTimeout = rpcTimeoutMs;
189201
}

internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@
6464
import org.apache.uniffle.common.rpc.StatusCode;
6565
import org.apache.uniffle.common.util.RetryUtils;
6666

67+
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_GRPC_EVENT_LOOP_THREADS;
68+
6769
public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient {
6870
private static final Logger LOG = LoggerFactory.getLogger(ShuffleServerGrpcNettyClient.class);
6971
private int nettyPort;
@@ -107,7 +109,16 @@ public ShuffleServerGrpcNettyClient(
107109
int pageSize,
108110
int maxOrder,
109111
int smallCacheSize) {
110-
super(host, grpcPort, maxRetryAttempts, rpcTimeoutMs, true, pageSize, maxOrder, smallCacheSize);
112+
super(
113+
host,
114+
grpcPort,
115+
maxRetryAttempts,
116+
rpcTimeoutMs,
117+
true,
118+
pageSize,
119+
maxOrder,
120+
smallCacheSize,
121+
rssConf.get(RSS_CLIENT_GRPC_EVENT_LOOP_THREADS));
111122
this.nettyPort = nettyPort;
112123
TransportContext transportContext = new TransportContext(new TransportConf(rssConf));
113124
this.clientFactory = new TransportClientFactory(transportContext);

0 commit comments

Comments
 (0)