Skip to content

Commit d11604c

Browse files
authored
RATIS-2325. Create GrpcStubPool for GrpcServerProtocolClient (#1283)
1 parent a915428 commit d11604c

File tree

4 files changed

+179
-4
lines changed

4 files changed

+179
-4
lines changed

ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,15 @@ static GrpcTlsConfig tlsConf(Parameters parameters) {
282282
static void setTlsConf(Parameters parameters, GrpcTlsConfig conf) {
283283
parameters.put(TLS_CONF_PARAMETER, conf, TLS_CONF_CLASS);
284284
}
285+
286+
String STUB_POOL_SIZE_KEY = PREFIX + ".stub.pool.size";
287+
int STUB_POOL_SIZE_DEFAULT = 10;
288+
static int stubPoolSize(RaftProperties properties) {
289+
return get(properties::getInt, STUB_POOL_SIZE_KEY, STUB_POOL_SIZE_DEFAULT, getDefaultLog());
290+
}
291+
static void setStubPoolSize(RaftProperties properties, int size) {
292+
setInt(properties::setInt, STUB_POOL_SIZE_KEY, size);
293+
}
285294
}
286295

287296
String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";

ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
public class GrpcServerProtocolClient implements Closeable {
4848
// Common channel
4949
private final ManagedChannel channel;
50+
private final GrpcStubPool<RaftServerProtocolServiceStub> pool;
5051
// Channel and stub for heartbeat
5152
private ManagedChannel hbChannel;
5253
private RaftServerProtocolServiceStub hbAsyncStub;
@@ -59,7 +60,7 @@ public class GrpcServerProtocolClient implements Closeable {
5960
//visible for using in log / error messages AND to use in instrumented tests
6061
private final RaftPeerId raftPeerId;
6162

62-
public GrpcServerProtocolClient(RaftPeer target, int flowControlWindow,
63+
public GrpcServerProtocolClient(RaftPeer target, int connections, int flowControlWindow,
6364
TimeDuration requestTimeout, GrpcTlsConfig tlsConfig, boolean separateHBChannel) {
6465
raftPeerId = target.getId();
6566
LOG.info("Build channel for {}", target);
@@ -72,6 +73,8 @@ public GrpcServerProtocolClient(RaftPeer target, int flowControlWindow,
7273
hbAsyncStub = RaftServerProtocolServiceGrpc.newStub(hbChannel);
7374
}
7475
requestTimeoutDuration = requestTimeout;
76+
this.pool = new GrpcStubPool<RaftServerProtocolServiceStub>(target, connections,
77+
ch -> RaftServerProtocolServiceGrpc.newStub(ch), tlsConfig);
7578
}
7679

7780
private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow,
@@ -107,6 +110,7 @@ public void close() {
107110
GrpcUtil.shutdownManagedChannel(hbChannel);
108111
}
109112
GrpcUtil.shutdownManagedChannel(channel);
113+
pool.close();
110114
}
111115

112116
public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
@@ -125,8 +129,36 @@ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequ
125129
}
126130

127131
void readIndex(ReadIndexRequestProto request, StreamObserver<ReadIndexReplyProto> s) {
128-
asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
129-
.readIndex(request, s);
132+
GrpcStubPool.PooledStub<RaftServerProtocolServiceStub> p;
133+
try {
134+
p = pool.acquire();
135+
} catch (InterruptedException e) {
136+
Thread.currentThread().interrupt();
137+
s.onError(e); return;
138+
}
139+
p.getStub().withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
140+
.readIndex(request, new StreamObserver<ReadIndexReplyProto>() {
141+
@Override
142+
public void onNext(ReadIndexReplyProto v) {
143+
s.onNext(v);
144+
}
145+
@Override
146+
public void onError(Throwable t) {
147+
try {
148+
s.onError(t);
149+
} finally {
150+
p.release();
151+
}
152+
}
153+
@Override
154+
public void onCompleted() {
155+
try {
156+
s.onCompleted();
157+
} finally {
158+
p.release();
159+
}
160+
}
161+
});
130162
}
131163

132164
CallStreamObserver<AppendEntriesRequestProto> appendEntries(

ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public static final class Builder {
113113
private String serverHost;
114114
private int serverPort;
115115
private GrpcTlsConfig serverTlsConfig;
116+
private int serverStubPoolSize;
116117

117118
private SizeInBytes messageSizeMax;
118119
private SizeInBytes flowControlWindow;
@@ -135,6 +136,7 @@ public Builder setServer(RaftServer raftServer) {
135136
this.flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::info);
136137
this.requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(properties);
137138
this.separateHeartbeatChannel = GrpcConfigKeys.Server.heartbeatChannel(properties);
139+
this.serverStubPoolSize = GrpcConfigKeys.Server.stubPoolSize(properties);
138140

139141
final SizeInBytes appenderBufferSize = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
140142
final SizeInBytes gap = SizeInBytes.ONE_MB;
@@ -155,7 +157,7 @@ public Builder setCustomizer(Customizer customizer) {
155157
}
156158

157159
private GrpcServerProtocolClient newGrpcServerProtocolClient(RaftPeer target) {
158-
return new GrpcServerProtocolClient(target, flowControlWindow.getSizeInt(),
160+
return new GrpcServerProtocolClient(target, serverStubPoolSize, flowControlWindow.getSizeInt(),
159161
requestTimeoutDuration, serverTlsConfig, separateHeartbeatChannel);
160162
}
161163

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.ratis.grpc.server;
19+
20+
import org.apache.ratis.grpc.GrpcTlsConfig;
21+
import org.apache.ratis.grpc.GrpcUtil;
22+
import org.apache.ratis.protocol.RaftPeer;
23+
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
24+
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
25+
import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
26+
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
27+
import org.apache.ratis.thirdparty.io.grpc.stub.AbstractStub;
28+
import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
29+
import org.apache.ratis.thirdparty.io.netty.channel.WriteBufferWaterMark;
30+
import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
31+
import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
32+
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
import java.util.ArrayList;
37+
import java.util.Collections;
38+
import java.util.List;
39+
import java.util.concurrent.Semaphore;
40+
import java.util.concurrent.ThreadLocalRandom;
41+
import java.util.concurrent.TimeUnit;
42+
import java.util.concurrent.atomic.AtomicInteger;
43+
import java.util.function.Function;
44+
45+
final class GrpcStubPool<S extends AbstractStub<S>> {
46+
public static final Logger LOG = LoggerFactory.getLogger(GrpcStubPool.class);
47+
48+
static final class PooledStub<S extends AbstractStub<S>> {
49+
private final ManagedChannel ch;
50+
private final S stub;
51+
private final Semaphore permits;
52+
53+
PooledStub(ManagedChannel ch, S stub, int maxInflight) {
54+
this.ch = ch;
55+
this.stub = stub;
56+
this.permits = new Semaphore(maxInflight);
57+
}
58+
59+
S getStub() {
60+
return stub;
61+
}
62+
63+
void release() {
64+
permits.release();
65+
}
66+
}
67+
68+
private final List<PooledStub<S>> pool;
69+
private final AtomicInteger rr = new AtomicInteger();
70+
private final NioEventLoopGroup elg;
71+
private final int size;
72+
73+
GrpcStubPool(RaftPeer target, int n, Function<ManagedChannel, S> stubFactory, GrpcTlsConfig tlsConfig) {
74+
this(target, n, stubFactory, tlsConfig, Math.max(2, Runtime.getRuntime().availableProcessors() / 2), 16);
75+
}
76+
77+
GrpcStubPool(RaftPeer target, int n,
78+
Function<ManagedChannel, S> stubFactory, GrpcTlsConfig tlsConf,
79+
int elgThreads, int maxInflightPerConn) {
80+
this.elg = new NioEventLoopGroup(elgThreads);
81+
ArrayList<PooledStub<S>> tmp = new ArrayList<>(n);
82+
for (int i = 0; i < n; i++) {
83+
NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(target.getAddress())
84+
.eventLoopGroup(elg)
85+
.channelType(NioSocketChannel.class)
86+
.keepAliveTime(30, TimeUnit.SECONDS)
87+
.keepAliveWithoutCalls(true)
88+
.idleTimeout(24, TimeUnit.HOURS)
89+
.withOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(64 << 10, 128 << 10));
90+
if (tlsConf != null) {
91+
LOG.debug("Setting TLS for {}", target.getAddress());
92+
SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
93+
GrpcUtil.setTrustManager(sslContextBuilder, tlsConf.getTrustManager());
94+
if (tlsConf.getMtlsEnabled()) {
95+
GrpcUtil.setKeyManager(sslContextBuilder, tlsConf.getKeyManager());
96+
}
97+
try {
98+
channelBuilder.useTransportSecurity().sslContext(sslContextBuilder.build());
99+
} catch (Exception ex) {
100+
throw new RuntimeException(ex);
101+
}
102+
} else {
103+
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
104+
}
105+
ManagedChannel ch = channelBuilder.build();
106+
tmp.add(new PooledStub<>(ch, stubFactory.apply(ch), maxInflightPerConn));
107+
ch.getState(true);
108+
}
109+
this.pool = Collections.unmodifiableList(tmp);
110+
this.size = n;
111+
}
112+
113+
PooledStub<S> acquire() throws InterruptedException {
114+
final int start = ThreadLocalRandom.current().nextInt(size);
115+
for (int k = 0; k < size; k++) {
116+
PooledStub<S> p = pool.get((start + k) % size);
117+
if (p.permits.tryAcquire()) {
118+
return p;
119+
}
120+
}
121+
final PooledStub<S> p = pool.get(start);
122+
p.permits.acquire();
123+
return p;
124+
}
125+
126+
public void close() {
127+
for (PooledStub p : pool) {
128+
p.ch.shutdown();
129+
}
130+
elg.shutdownGracefully();
131+
}
132+
}

0 commit comments

Comments
 (0)