Skip to content

Commit 4c0c22e

Browse files
committed
Added option to prefer ready channels to GrpcRequestSettings
1 parent 87981c5 commit 4c0c22e

File tree

6 files changed

+243
-126
lines changed

6 files changed

+243
-126
lines changed

core/src/main/java/tech/ydb/core/grpc/GrpcRequestSettings.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class GrpcRequestSettings {
1717
private final long deadlineAfter;
1818
private final Integer preferredNodeID;
1919
private final boolean directMode;
20+
private final boolean prefferReadyChannel;
2021
private final boolean deadlineDisabled;
2122
private final String traceId;
2223
private final List<String> clientCapabilities;
@@ -28,6 +29,7 @@ private GrpcRequestSettings(Builder builder) {
2829
this.deadlineAfter = builder.deadlineAfter;
2930
this.preferredNodeID = builder.preferredNodeID;
3031
this.directMode = builder.directMode;
32+
this.prefferReadyChannel = builder.preferReadyChannel;
3133
this.deadlineDisabled = builder.deadlineDisabled;
3234
this.traceId = builder.traceId;
3335
this.clientCapabilities = builder.clientCapabilities;
@@ -56,6 +58,10 @@ public boolean isDirectMode() {
5658
return directMode;
5759
}
5860

61+
public boolean isPreferReadyChannel() {
62+
return prefferReadyChannel;
63+
}
64+
5965
public String getTraceId() {
6066
return traceId;
6167
}
@@ -78,6 +84,7 @@ public GrpcFlowControl getFlowControl() {
7884

7985
public static final class Builder {
8086
private long deadlineAfter = 0L;
87+
private boolean preferReadyChannel = false;
8188
private boolean deadlineDisabled = false;
8289
private Integer preferredNodeID = null;
8390
private boolean directMode = false;
@@ -138,6 +145,11 @@ public Builder withDirectMode(boolean directMode) {
138145
return this;
139146
}
140147

148+
public Builder withPreferReadyChannel(boolean preferReady) {
149+
this.preferReadyChannel = preferReady;
150+
return this;
151+
}
152+
141153
public Builder withClientCapabilities(List<String> clientCapabilities) {
142154
this.clientCapabilities = clientCapabilities;
143155
return this;

core/src/main/java/tech/ydb/core/impl/MultiChannelTransport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package tech.ydb.core.impl;
22

3+
import java.util.Collections;
34
import java.util.List;
45
import java.util.Objects;
56
import java.util.concurrent.ScheduledExecutorService;
@@ -77,7 +78,7 @@ public AuthCallOptions getAuthCallOptions() {
7778

7879
@Override
7980
protected GrpcChannel getChannel(GrpcRequestSettings settings) {
80-
EndpointRecord endpoint = endpointPool.getEndpoint(null);
81+
EndpointRecord endpoint = endpointPool.getEndpoint(Collections.emptySet(), settings);
8182
return channelPool.getChannel(endpoint);
8283
}
8384

core/src/main/java/tech/ydb/core/impl/YdbTransportImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,14 +165,14 @@ public AuthCallOptions getAuthCallOptions() {
165165

166166
@Override
167167
protected GrpcChannel getChannel(GrpcRequestSettings settings) {
168-
EndpointRecord endpoint = endpointPool.getEndpoint(settings);
168+
EndpointRecord endpoint = endpointPool.getEndpoint(channelPool.getReadyEndpoints(), settings);
169169
if (endpoint == null) {
170170
long timeout = -1;
171171
if (settings.getDeadlineAfter() != 0) {
172172
timeout = settings.getDeadlineAfter() - System.nanoTime();
173173
}
174174
discovery.waitReady(timeout);
175-
endpoint = endpointPool.getEndpoint(settings);
175+
endpoint = endpointPool.getEndpoint(Collections.emptySet(), settings);
176176
}
177177
return channelPool.getChannel(endpoint);
178178
}

core/src/main/java/tech/ydb/core/impl/pool/EndpointPool.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
import java.util.HashMap;
66
import java.util.List;
77
import java.util.Map;
8+
import java.util.Set;
89
import java.util.concurrent.ThreadLocalRandom;
910
import java.util.concurrent.locks.ReadWriteLock;
1011
import java.util.concurrent.locks.ReentrantReadWriteLock;
12+
import java.util.stream.Collectors;
1113

1214
import javax.annotation.Nullable;
1315

@@ -50,12 +52,13 @@ public EndpointPool(BalancingSettings balancingSettings) {
5052
}
5153

5254
@Nullable
53-
public EndpointRecord getEndpoint(GrpcRequestSettings settings) {
55+
public EndpointRecord getEndpoint(Set<String> readyEnpoints, GrpcRequestSettings settings) {
56+
Integer nodeId = settings.getPreferredNodeID();
57+
boolean directMode = settings.isDirectMode();
58+
boolean prefferReady = settings.isPreferReadyChannel();
59+
5460
recordsLock.readLock().lock();
5561
try {
56-
Integer nodeId = settings.getPreferredNodeID();
57-
boolean directMode = settings.isDirectMode();
58-
5962
if (nodeId != null) {
6063
PriorityEndpoint knownEndpoint = recordsByNodeId.get(nodeId);
6164
if (knownEndpoint != null) {
@@ -65,17 +68,32 @@ public EndpointRecord getEndpoint(GrpcRequestSettings settings) {
6568
throw new UnexpectedResultException("Node " + nodeId + " not found", NODE_NOT_FOUND_ERROR_CODE);
6669
}
6770
}
71+
6872
if (directMode) {
6973
throw new UnexpectedResultException("Cannot use direct mode without NodeId", DIRECT_REQUEST_ERROR_CODE);
7074
}
7175

72-
if (bestEndpointsCount > 0) {
73-
// returns value in range [0, n)
74-
int idx = ThreadLocalRandom.current().nextInt(bestEndpointsCount);
75-
return records.get(idx).record;
76-
} else {
76+
if (bestEndpointsCount <= 0) {
77+
// pool is not ready
7778
return null;
7879
}
80+
81+
if (prefferReady && !readyEnpoints.isEmpty()) {
82+
List<PriorityEndpoint> ready = readyEnpoints.stream()
83+
.map(recordsByEndpoint::get)
84+
.filter(pr -> pr != null && !pr.isPessimized())
85+
.collect(Collectors.toList());
86+
87+
if (!ready.isEmpty()) {
88+
// returns value in range [0, n)
89+
int idx = ThreadLocalRandom.current().nextInt(ready.size());
90+
return ready.get(idx).record;
91+
}
92+
}
93+
94+
// returns value in range [0, n)
95+
int idx = ThreadLocalRandom.current().nextInt(bestEndpointsCount);
96+
return records.get(idx).record;
7997
} finally {
8098
recordsLock.readLock().unlock();
8199
}

core/src/main/java/tech/ydb/core/impl/pool/GrpcChannelPool.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.util.List;
55
import java.util.Map;
66
import java.util.Objects;
7+
import java.util.Set;
78
import java.util.concurrent.CompletableFuture;
89
import java.util.concurrent.ConcurrentHashMap;
910
import java.util.concurrent.ScheduledExecutorService;
@@ -29,6 +30,10 @@ public GrpcChannelPool(ManagedChannelFactory channelFactory, ScheduledExecutorSe
2930
this.executor = executor;
3031
}
3132

33+
public Set<String> getReadyEndpoints() {
34+
return channels.keySet();
35+
}
36+
3237
public GrpcChannel getChannel(EndpointRecord endpoint) {
3338
// Workaround for https://bugs.openjdk.java.net/browse/JDK-8161372 to prevent unnecessary locks in Java 8
3439
// Was fixed in Java 9+

0 commit comments

Comments
 (0)