Skip to content

Commit 9c7051c

Browse files
authored
Merge pull request #535 from alex268/master
Added endpoint pessimization when CreateSession returns OVERLOADED
2 parents 8f86524 + aff5950 commit 9c7051c

File tree

14 files changed

+255
-168
lines changed

14 files changed

+255
-168
lines changed

core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.time.Duration;
44
import java.util.ArrayList;
55
import java.util.List;
6+
import java.util.function.BooleanSupplier;
67
import java.util.function.Consumer;
78

89
import io.grpc.Metadata;
@@ -15,17 +16,21 @@
1516
public class GrpcRequestSettings {
1617
private final long deadlineAfter;
1718
private final Integer preferredNodeID;
19+
private final boolean directMode;
1820
private final String traceId;
1921
private final List<String> clientCapabilities;
2022
private final Consumer<Metadata> trailersHandler;
23+
private final BooleanSupplier pessimizationHook;
2124
private final GrpcFlowControl flowControl;
2225

2326
private GrpcRequestSettings(Builder builder) {
2427
this.deadlineAfter = builder.deadlineAfter;
2528
this.preferredNodeID = builder.preferredNodeID;
29+
this.directMode = builder.directMode;
2630
this.traceId = builder.traceId;
2731
this.clientCapabilities = builder.clientCapabilities;
2832
this.trailersHandler = builder.trailersHandler;
33+
this.pessimizationHook = builder.pessimizationHook;
2934
this.flowControl = builder.flowControl;
3035
}
3136

@@ -41,6 +46,10 @@ public Integer getPreferredNodeID() {
4146
return preferredNodeID;
4247
}
4348

49+
public boolean isDirectMode() {
50+
return directMode;
51+
}
52+
4453
public String getTraceId() {
4554
return traceId;
4655
}
@@ -53,16 +62,22 @@ public Consumer<Metadata> getTrailersHandler() {
5362
return trailersHandler;
5463
}
5564

65+
public BooleanSupplier getPessimizationHook() {
66+
return pessimizationHook;
67+
}
68+
5669
public GrpcFlowControl getFlowControl() {
5770
return flowControl;
5871
}
5972

6073
public static final class Builder {
6174
private long deadlineAfter = 0L;
6275
private Integer preferredNodeID = null;
76+
private boolean directMode = false;
6377
private String traceId = null;
6478
private List<String> clientCapabilities = null;
6579
private Consumer<Metadata> trailersHandler = null;
80+
private BooleanSupplier pessimizationHook = null;
6681
private GrpcFlowControl flowControl = GrpcFlows.SIMPLE_FLOW;
6782

6883
/**
@@ -109,6 +124,11 @@ public Builder withFlowControl(GrpcFlowControl flowCtrl) {
109124
return this;
110125
}
111126

127+
public Builder withDirectMode(boolean directMode) {
128+
this.directMode = directMode;
129+
return this;
130+
}
131+
112132
public Builder withClientCapabilities(List<String> clientCapabilities) {
113133
this.clientCapabilities = clientCapabilities;
114134
return this;
@@ -127,6 +147,11 @@ public Builder withTrailersHandler(Consumer<Metadata> handler) {
127147
return this;
128148
}
129149

150+
public Builder withPessimizationHook(BooleanSupplier pessimizationHook) {
151+
this.pessimizationHook = pessimizationHook;
152+
return this;
153+
}
154+
130155
public GrpcRequestSettings build() {
131156
return new GrpcRequestSettings(this);
132157
}

core/src/main/java/tech/ydb/core/impl/BaseGrpcTransport.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.concurrent.CompletableFuture;
44
import java.util.concurrent.TimeUnit;
55
import java.util.concurrent.atomic.AtomicBoolean;
6+
import java.util.function.BooleanSupplier;
67

78
import io.grpc.CallOptions;
89
import io.grpc.ClientCall;
@@ -29,6 +30,7 @@
2930
import tech.ydb.core.impl.call.ReadStreamCall;
3031
import tech.ydb.core.impl.call.ReadWriteStreamCall;
3132
import tech.ydb.core.impl.call.UnaryCall;
33+
import tech.ydb.core.impl.pool.EndpointRecord;
3234
import tech.ydb.core.impl.pool.GrpcChannel;
3335

3436
/**
@@ -47,7 +49,10 @@ public abstract class BaseGrpcTransport implements GrpcTransport {
4749

4850
protected abstract AuthCallOptions getAuthCallOptions();
4951
protected abstract GrpcChannel getChannel(GrpcRequestSettings settings);
50-
protected abstract void updateChannelStatus(GrpcChannel channel, io.grpc.Status status);
52+
53+
protected void pessimizeEndpoint(EndpointRecord endpoint, String reason) {
54+
// nothing to pessimize
55+
}
5156

5257
protected void shutdown() {
5358
// nothing to shutdown
@@ -226,10 +231,22 @@ private class ChannelStatusHandler implements GrpcStatusHandler {
226231

227232
@Override
228233
public void accept(io.grpc.Status status, Metadata trailers) {
229-
updateChannelStatus(channel, status);
234+
// Usually CANCELLED is received when ClientCall is canceled on client side
235+
if (!status.isOk() && status.getCode() != io.grpc.Status.Code.CANCELLED) {
236+
pessimizeEndpoint(channel.getEndpoint(), "by grpc code " + status.getCode());
237+
}
238+
230239
if (settings.getTrailersHandler() != null && trailers != null) {
231240
settings.getTrailersHandler().accept(trailers);
232241
}
233242
}
243+
244+
@Override
245+
public void postComplete() {
246+
BooleanSupplier hook = settings.getPessimizationHook();
247+
if (hook != null && hook.getAsBoolean()) {
248+
pessimizeEndpoint(channel.getEndpoint(), "by pessimization hook");
249+
}
250+
}
234251
}
235252
}

core/src/main/java/tech/ydb/core/impl/FixedCallOptionsTransport.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.util.concurrent.ScheduledExecutorService;
44

5-
import io.grpc.Status;
65
import org.slf4j.Logger;
76
import org.slf4j.LoggerFactory;
87

@@ -60,15 +59,4 @@ public AuthCallOptions getAuthCallOptions() {
6059
protected GrpcChannel getChannel(GrpcRequestSettings settings) {
6160
return channel;
6261
}
63-
64-
@Override
65-
protected void updateChannelStatus(GrpcChannel channel, Status status) {
66-
if (!status.isOk()) {
67-
logger.warn("grpc error {}[{}] on fixed channel {}",
68-
status.getCode(),
69-
status.getDescription(),
70-
channel.getEndpoint());
71-
}
72-
}
73-
7462
}

core/src/main/java/tech/ydb/core/impl/MultiChannelTransport.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import com.google.common.base.Strings;
99
import com.google.common.net.HostAndPort;
10-
import io.grpc.Status;
1110
import org.slf4j.Logger;
1211
import org.slf4j.LoggerFactory;
1312

@@ -83,13 +82,11 @@ protected GrpcChannel getChannel(GrpcRequestSettings settings) {
8382
}
8483

8584
@Override
86-
protected void updateChannelStatus(GrpcChannel channel, Status status) {
87-
if (!status.isOk()) {
88-
endpointPool.pessimizeEndpoint(channel.getEndpoint());
85+
protected void pessimizeEndpoint(EndpointRecord endpoint, String reason) {
86+
endpointPool.pessimizeEndpoint(endpoint, reason);
8987

90-
if (endpointPool.needToRunDiscovery()) {
91-
endpointPool.setNewState(null, endpoints);
92-
}
88+
if (endpointPool.needToRunDiscovery()) {
89+
endpointPool.setNewState(null, endpoints);
9390
}
9491
}
9592
}

core/src/main/java/tech/ydb/core/impl/SingleChannelTransport.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import java.util.concurrent.ScheduledExecutorService;
55

66
import com.google.common.base.Strings;
7-
import io.grpc.Status;
87
import org.slf4j.Logger;
98
import org.slf4j.LoggerFactory;
109

@@ -65,14 +64,4 @@ public AuthCallOptions getAuthCallOptions() {
6564
protected GrpcChannel getChannel(GrpcRequestSettings settings) {
6665
return channel;
6766
}
68-
69-
@Override
70-
protected void updateChannelStatus(GrpcChannel channel, Status status) {
71-
if (!status.isOk()) {
72-
logger.warn("grpc error {}[{}] on single channel {}",
73-
status.getCode(),
74-
status.getDescription(),
75-
channel.getEndpoint());
76-
}
77-
}
7867
}

core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -160,24 +160,21 @@ public AuthCallOptions getAuthCallOptions() {
160160

161161
@Override
162162
protected GrpcChannel getChannel(GrpcRequestSettings settings) {
163-
EndpointRecord endpoint = endpointPool.getEndpoint(settings.getPreferredNodeID());
163+
EndpointRecord endpoint = endpointPool.getEndpoint(settings);
164164
if (endpoint == null) {
165165
long timeout = -1;
166166
if (settings.getDeadlineAfter() != 0) {
167167
timeout = settings.getDeadlineAfter() - System.nanoTime();
168168
}
169169
discovery.waitReady(timeout);
170-
endpoint = endpointPool.getEndpoint(settings.getPreferredNodeID());
170+
endpoint = endpointPool.getEndpoint(settings);
171171
}
172172
return channelPool.getChannel(endpoint);
173173
}
174174

175175
@Override
176-
protected void updateChannelStatus(GrpcChannel channel, io.grpc.Status status) {
177-
// Usually CANCELLED is received when ClientCall is canceled on client side
178-
if (!status.isOk() && status.getCode() != io.grpc.Status.Code.CANCELLED) {
179-
endpointPool.pessimizeEndpoint(channel.getEndpoint());
180-
}
176+
protected void pessimizeEndpoint(EndpointRecord endpoint, String reason) {
177+
endpointPool.pessimizeEndpoint(endpoint, reason);
181178
}
182179

183180
private class DiscoveryHandler implements YdbDiscovery.Handler {

core/src/main/java/tech/ydb/core/impl/call/GrpcStatusHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,6 @@
99
*/
1010
public interface GrpcStatusHandler {
1111
void accept(Status status, Metadata trailers);
12+
13+
void postComplete();
1214
}

core/src/main/java/tech/ydb/core/impl/call/ReadStreamCall.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,5 +144,7 @@ public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
144144
} else {
145145
statusFuture.complete(GrpcStatuses.toStatus(status));
146146
}
147+
148+
statusConsumer.postComplete();
147149
}
148150
}

core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,5 +206,7 @@ public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
206206
} else {
207207
statusFuture.complete(GrpcStatuses.toStatus(status));
208208
}
209+
210+
statusConsumer.postComplete();
209211
}
210212
}

core/src/main/java/tech/ydb/core/impl/call/UnaryCall.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,5 +105,7 @@ public void onClose(io.grpc.Status status, @Nullable Metadata trailers) {
105105
} else {
106106
future.complete(GrpcStatuses.toResult(status));
107107
}
108+
109+
statusConsumer.postComplete();
108110
}
109111
}

0 commit comments

Comments
 (0)