Skip to content

Commit f73dc7a

Browse files
author
Junfan Zhang
committed
BP: [apache#2354] feat(client): Explicitly setting grpc netty based event loop threads to avoid too much threads apache#2355
1 parent d2c9b00 commit f73dc7a

File tree

6 files changed

+22
-8
lines changed

6 files changed

+22
-8
lines changed

common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,4 +219,10 @@ public class RssClientConf {
219219
.asList()
220220
.noDefaultValue()
221221
.withDescription("the extra java properties could be configured by this option");
222+
223+
public static final ConfigOption<Integer> RSS_CLIENT_GRPC_EVENT_LOOP_THREADS =
224+
ConfigOptions.key("rss.client.grpc.nettyEventLoopThreads")
225+
.intType()
226+
.defaultValue(-1)
227+
.withDescription("the event loop threads of netty impl for grpc");
222228
}

internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public CoordinatorGrpcClient(String host, int port, int maxRetryAttempts) {
9090
}
9191

9292
public CoordinatorGrpcClient(String host, int port, int maxRetryAttempts, boolean usePlaintext) {
93-
super(host, port, maxRetryAttempts, usePlaintext);
93+
super(host, port, maxRetryAttempts, usePlaintext, -1);
9494
blockingStub = CoordinatorServerGrpc.newBlockingStub(channel);
9595
LOG.info(
9696
"Created CoordinatorGrpcClient, host:{}, port:{}, maxRetryAttempts:{}, usePlaintext:{}",

internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,17 @@ public abstract class GrpcClient {
3333
protected int maxRetryAttempts;
3434
protected ManagedChannel channel;
3535

36-
protected GrpcClient(String host, int port, int maxRetryAttempts, boolean usePlaintext) {
36+
protected GrpcClient(String host, int port, int maxRetryAttempts, boolean usePlaintext, int nettyEventLoopThreads) {
3737
this.host = host;
3838
this.port = port;
3939
this.maxRetryAttempts = maxRetryAttempts;
4040
this.usePlaintext = usePlaintext;
4141

42+
if (nettyEventLoopThreads > 0) {
43+
System.setProperty(
44+
"io.grpc.netty.shaded.io.netty.eventLoopThreads", String.valueOf(nettyEventLoopThreads));
45+
}
46+
4247
// build channel
4348
ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(host, port);
4449

internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public ShuffleManagerGrpcClient(String host, int port, int maxRetryAttempts) {
6262

6363
public ShuffleManagerGrpcClient(
6464
String host, int port, int maxRetryAttempts, boolean usePlaintext) {
65-
super(host, port, maxRetryAttempts, usePlaintext);
65+
super(host, port, maxRetryAttempts, usePlaintext, -1);
6666
blockingStub = ShuffleManagerGrpc.newBlockingStub(channel);
6767
}
6868

internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
import org.apache.uniffle.proto.ShuffleServerGrpc;
111111
import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerBlockingStub;
112112

113+
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_GRPC_EVENT_LOOP_THREADS;
113114
import static org.apache.uniffle.proto.RssProtos.StatusCode.NO_BUFFER;
114115

115116
public class ShuffleServerGrpcClient extends GrpcClient implements ShuffleServerClient {
@@ -149,16 +150,18 @@ public ShuffleServerGrpcClient(RssConf rssConf, String host, int port) {
149150
: rssConf.getInteger(RssClientConf.RPC_MAX_ATTEMPTS),
150151
rssConf == null
151152
? RssClientConf.RPC_TIMEOUT_MS.defaultValue()
152-
: rssConf.getLong(RssClientConf.RPC_TIMEOUT_MS));
153+
: rssConf.getLong(RssClientConf.RPC_TIMEOUT_MS),
154+
true,
155+
rssConf.get(RSS_CLIENT_GRPC_EVENT_LOOP_THREADS));
153156
}
154157

155158
public ShuffleServerGrpcClient(String host, int port, int maxRetryAttempts, long rpcTimeoutMs) {
156-
this(host, port, maxRetryAttempts, rpcTimeoutMs, true);
159+
this(host, port, maxRetryAttempts, rpcTimeoutMs, true, -1);
157160
}
158161

159162
public ShuffleServerGrpcClient(
160-
String host, int port, int maxRetryAttempts, long rpcTimeoutMs, boolean usePlaintext) {
161-
super(host, port, maxRetryAttempts, usePlaintext);
163+
String host, int port, int maxRetryAttempts, long rpcTimeoutMs, boolean usePlaintext, int nettyEventLoopThreads) {
164+
super(host, port, maxRetryAttempts, usePlaintext, nettyEventLoopThreads);
162165
blockingStub = ShuffleServerGrpc.newBlockingStub(channel);
163166
rpcTimeout = rpcTimeoutMs;
164167
}

internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerInternalGrpcClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public ShuffleServerInternalGrpcClient(String host, int port, int maxRetryAttemp
4949

5050
public ShuffleServerInternalGrpcClient(
5151
String host, int port, int maxRetryAttempts, boolean usePlaintext) {
52-
super(host, port, maxRetryAttempts, usePlaintext);
52+
super(host, port, maxRetryAttempts, usePlaintext, -1);
5353
// todo Add ClientInterceptor for authentication
5454
blockingStub = ShuffleServerInternalGrpc.newBlockingStub(channel);
5555
}

0 commit comments

Comments
 (0)