Skip to content

Commit f517822

Browse files
Refactor local DC do driver baggage
1 parent 9b11d53 commit f517822

File tree

6 files changed

+171
-29
lines changed

6 files changed

+171
-29
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/loadbalancing/LocalDcAwareLoadBalancingPolicy.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package com.datastax.oss.driver.api.core.loadbalancing;
1919

20+
import edu.umd.cs.findbugs.annotations.NonNull;
2021
import edu.umd.cs.findbugs.annotations.Nullable;
2122

2223
/** Load balancing policy taking into account local datacenter of the application. */
@@ -25,4 +26,8 @@ public interface LocalDcAwareLoadBalancingPolicy extends LoadBalancingPolicy {
2526
/** Returns the local datacenter name, if known; empty otherwise. */
2627
@Nullable
2728
String getLocalDatacenter();
29+
30+
/** Returns JSON string containing all properties that impact C* node connectivity. */
31+
@NonNull
32+
String getStartupConfiguration();
2833
}

core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.datastax.oss.driver.api.core.loadbalancing.LocalDcAwareLoadBalancingPolicy;
2424
import com.datastax.oss.driver.api.core.session.Session;
2525
import com.datastax.oss.driver.api.core.uuid.Uuids;
26+
import com.datastax.oss.driver.internal.core.util.Strings;
2627
import com.datastax.oss.driver.shaded.guava.common.base.Joiner;
2728
import com.datastax.oss.driver.shaded.guava.common.collect.Maps;
2829
import com.datastax.oss.protocol.internal.request.Startup;
@@ -39,7 +40,7 @@ public class StartupOptionsBuilder {
3940

4041
public static final String DRIVER_NAME_KEY = "DRIVER_NAME";
4142
public static final String DRIVER_VERSION_KEY = "DRIVER_VERSION";
42-
public static final String DRIVER_LOCAL_DC = "DRIVER_LOCAL_DC";
43+
public static final String DRIVER_BAGGAGE = "DRIVER_BAGGAGE";
4344
public static final String APPLICATION_NAME_KEY = "APPLICATION_NAME";
4445
public static final String APPLICATION_VERSION_KEY = "APPLICATION_VERSION";
4546
public static final String CLIENT_ID_KEY = "CLIENT_ID";
@@ -127,7 +128,7 @@ public Map<String, String> build() {
127128
builder.put(APPLICATION_VERSION_KEY, applicationVersion);
128129
}
129130
// do not cache local DC as it can change within LBP implementation
130-
localDcs().ifPresent(s -> builder.put(DRIVER_LOCAL_DC, s));
131+
driverBaggage().ifPresent(s -> builder.put(DRIVER_BAGGAGE, s));
131132

132133
return builder.build();
133134
}
@@ -152,26 +153,26 @@ protected String getDriverVersion() {
152153
return Session.OSS_DRIVER_COORDINATES.getVersion().toString();
153154
}
154155

155-
private Optional<String> localDcs() {
156+
private Optional<String> driverBaggage() {
156157
Joiner joiner = Joiner.on(": ");
157-
Map<String, Optional<String>> lbpToDc =
158-
Maps.transformValues(context.getLoadBalancingPolicies(), this::getLocalDc);
159-
if (lbpToDc.isEmpty()) {
160-
return Optional.empty();
161-
}
158+
Map<String, Optional<String>> lbpToBag =
159+
Maps.transformValues(context.getLoadBalancingPolicies(), this::getDriverBaggage);
162160
return Optional.of(
163-
lbpToDc.entrySet().stream()
164-
.filter(e -> e.getValue().isPresent())
165-
.map(entry -> joiner.join(entry.getKey(), entry.getValue().get()))
166-
.collect(Collectors.joining(", ")));
161+
"{"
162+
+ lbpToBag.entrySet().stream()
163+
.filter(e -> e.getValue().isPresent())
164+
.map(
165+
entry ->
166+
joiner.join(Strings.doubleQuote(entry.getKey()), entry.getValue().get()))
167+
.collect(Collectors.joining(", "))
168+
+ "}");
167169
}
168170

169-
private Optional<String> getLocalDc(LoadBalancingPolicy loadBalancingPolicy) {
171+
private Optional<String> getDriverBaggage(LoadBalancingPolicy loadBalancingPolicy) {
170172
if (loadBalancingPolicy instanceof LocalDcAwareLoadBalancingPolicy) {
171-
String dc = ((LocalDcAwareLoadBalancingPolicy) loadBalancingPolicy).getLocalDatacenter();
172-
if (dc != null) {
173-
return Optional.of(dc);
174-
}
173+
LocalDcAwareLoadBalancingPolicy dcAwareLbp =
174+
(LocalDcAwareLoadBalancingPolicy) loadBalancingPolicy;
175+
return Optional.of(dcAwareLbp.getStartupConfiguration());
175176
}
176177
return Optional.empty();
177178
}

core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.NodeSet;
4242
import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.SingleDcNodeSet;
4343
import com.datastax.oss.driver.internal.core.util.ArrayUtils;
44+
import com.datastax.oss.driver.internal.core.util.Strings;
4445
import com.datastax.oss.driver.internal.core.util.collection.CompositeQueryPlan;
4546
import com.datastax.oss.driver.internal.core.util.collection.LazyQueryPlan;
4647
import com.datastax.oss.driver.internal.core.util.collection.QueryPlan;
@@ -62,6 +63,7 @@
6263
import java.util.UUID;
6364
import java.util.concurrent.atomic.AtomicInteger;
6465
import java.util.function.IntUnaryOperator;
66+
import java.util.stream.Collectors;
6567
import net.jcip.annotations.ThreadSafe;
6668
import org.slf4j.Logger;
6769
import org.slf4j.LoggerFactory;
@@ -161,6 +163,47 @@ public String getLocalDatacenter() {
161163
return localDc;
162164
}
163165

166+
@NonNull
167+
@Override
168+
public String getStartupConfiguration() {
169+
StringBuilder builder = new StringBuilder();
170+
builder
171+
.append("{")
172+
.append(Strings.doubleQuote(BasicLoadBalancingPolicy.class.getSimpleName()))
173+
.append(":")
174+
.append("{")
175+
.append(Strings.doubleQuote("localDc"))
176+
.append(":")
177+
.append(Strings.doubleQuoteNullable(localDc));
178+
if (!preferredRemoteDcs.isEmpty()) {
179+
builder
180+
.append(",")
181+
.append(Strings.doubleQuote("preferredRemoteDcs"))
182+
.append(":[")
183+
.append(
184+
preferredRemoteDcs.stream()
185+
.map(Strings::doubleQuote)
186+
.collect(Collectors.joining(", ")))
187+
.append("]");
188+
}
189+
if (allowDcFailoverForLocalCl) {
190+
builder
191+
.append(",")
192+
.append(Strings.doubleQuote("allowDcFailoverForLocalCl"))
193+
.append(":")
194+
.append(allowDcFailoverForLocalCl);
195+
}
196+
if (maxNodesPerRemoteDc > 0) {
197+
builder
198+
.append(",")
199+
.append(Strings.doubleQuote("maxNodesPerRemoteDc"))
200+
.append(":")
201+
.append(maxNodesPerRemoteDc);
202+
}
203+
builder.append("}}");
204+
return builder.toString();
205+
}
206+
164207
/** @return The nodes currently considered as live. */
165208
protected NodeSet getLiveNodes() {
166209
return liveNodes;

core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,4 +350,15 @@ private boolean hasSufficientResponses(long now) {
350350
return this.oldest - threshold >= 0;
351351
}
352352
}
353+
354+
@NonNull
355+
@Override
356+
public String getStartupConfiguration() {
357+
String result = super.getStartupConfiguration();
358+
result =
359+
result.replaceFirst(
360+
BasicLoadBalancingPolicy.class.getSimpleName(),
361+
DefaultLoadBalancingPolicy.class.getSimpleName());
362+
return result;
363+
}
353364
}

core/src/main/java/com/datastax/oss/driver/internal/core/util/Strings.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,18 @@ public static String doubleQuote(String value) {
8181
return quote(value, '"');
8282
}
8383

84+
/**
85+
* Double quote the given string; double quotes are escaped. If the given string is null, this
86+
* method returns ({@code null}).
87+
*
88+
* @param value The value to double quote.
89+
* @return The double quoted string.
90+
*/
91+
public static String doubleQuoteNullable(String value) {
92+
if (value == null) return null;
93+
return quote(value, '"');
94+
}
95+
8496
/**
8597
* Unquote the given string if it is double quoted; double quotes are unescaped. If the given
8698
* string is not double quoted, it is returned without any modification.

core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java

Lines changed: 82 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,27 @@
3131
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
3232
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
3333
import com.datastax.oss.driver.api.core.loadbalancing.LocalDcAwareLoadBalancingPolicy;
34+
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistanceEvaluator;
35+
import com.datastax.oss.driver.api.core.metadata.Node;
3436
import com.datastax.oss.driver.api.core.session.ProgrammaticArguments;
3537
import com.datastax.oss.driver.api.core.session.Session;
3638
import com.datastax.oss.driver.api.core.uuid.Uuids;
39+
import com.datastax.oss.driver.internal.core.ConsistencyLevelRegistry;
3740
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
41+
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
3842
import com.datastax.oss.driver.internal.core.context.StartupOptionsBuilder;
43+
import com.datastax.oss.driver.internal.core.loadbalancing.DefaultLoadBalancingPolicy;
44+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
3945
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
4046
import com.datastax.oss.protocol.internal.request.Startup;
4147
import com.tngtech.java.junit.dataprovider.DataProvider;
4248
import com.tngtech.java.junit.dataprovider.DataProviderRunner;
4349
import edu.umd.cs.findbugs.annotations.NonNull;
50+
import edu.umd.cs.findbugs.annotations.Nullable;
51+
import java.util.Collections;
52+
import java.util.List;
4453
import java.util.Map;
54+
import java.util.Optional;
4555
import java.util.UUID;
4656
import org.junit.Before;
4757
import org.junit.Test;
@@ -86,8 +96,7 @@ public Map<String, LoadBalancingPolicy> getLoadBalancingPolicies() {
8696
localDcPerProfile.forEach(
8797
(profile, dc) -> {
8898
LocalDcAwareLoadBalancingPolicy loadBalancingPolicy =
89-
mock(LocalDcAwareLoadBalancingPolicy.class);
90-
when(loadBalancingPolicy.getLocalDatacenter()).thenReturn(dc);
99+
mockLoadBalancingPolicy(profile, dc, 0, false, Collections.emptyList());
91100
map.put(profile, loadBalancingPolicy);
92101
});
93102
return map.build();
@@ -97,6 +106,55 @@ public Map<String, LoadBalancingPolicy> getLoadBalancingPolicies() {
97106
};
98107
}
99108

109+
private LocalDcAwareLoadBalancingPolicy mockLoadBalancingPolicy(
110+
String profile,
111+
String localDc,
112+
int maxNodesPerRemoteDc,
113+
boolean allowRemoteSatisfyLocalDc,
114+
List<String> preferredRemoteDcs) {
115+
// mock execution profile
116+
DriverExecutionProfile executionProfile = mock(DriverExecutionProfile.class);
117+
when(executionProfile.getInt(
118+
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_MAX_NODES_PER_REMOTE_DC))
119+
.thenReturn(maxNodesPerRemoteDc);
120+
when(executionProfile.getBoolean(
121+
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS))
122+
.thenReturn(allowRemoteSatisfyLocalDc);
123+
when(executionProfile.getStringList(
124+
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS))
125+
.thenReturn(preferredRemoteDcs);
126+
127+
// mock driver config
128+
DriverConfig driverConfig = mock(DriverConfig.class);
129+
when(driverConfig.getProfile(profile)).thenReturn(executionProfile);
130+
131+
// mock driver context
132+
InternalDriverContext driverContext = mock(InternalDriverContext.class);
133+
when(driverContext.getConfig()).thenReturn(driverConfig);
134+
when(driverContext.getConsistencyLevelRegistry())
135+
.thenReturn(mock(ConsistencyLevelRegistry.class));
136+
137+
// mock load balancing policy
138+
LocalDcAwareLoadBalancingPolicy loadBalancingPolicy =
139+
new DefaultLoadBalancingPolicy(driverContext, profile) {
140+
@NonNull
141+
@Override
142+
protected Optional<String> discoverLocalDc(@NonNull Map<UUID, Node> nodes) {
143+
return Optional.of(localDc);
144+
}
145+
146+
@NonNull
147+
@Override
148+
protected NodeDistanceEvaluator createNodeDistanceEvaluator(
149+
@Nullable String localDc, @NonNull Map<UUID, Node> nodes) {
150+
return mock(NodeDistanceEvaluator.class);
151+
}
152+
};
153+
loadBalancingPolicy.init(
154+
Collections.emptyMap(), mock(LoadBalancingPolicy.DistanceReporter.class));
155+
return loadBalancingPolicy;
156+
}
157+
100158
private void assertDefaultStartupOptions(Startup startup) {
101159
assertThat(startup.options).containsEntry(Startup.CQL_VERSION_KEY, "3.0.0");
102160
assertThat(startup.options)
@@ -188,7 +246,9 @@ public void should_build_startup_options_with_all_options() {
188246
.containsEntry(StartupOptionsBuilder.CLIENT_ID_KEY, customClientId.toString())
189247
.containsEntry(StartupOptionsBuilder.APPLICATION_NAME_KEY, "Custom_App_Name")
190248
.containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version")
191-
.containsEntry(StartupOptionsBuilder.DRIVER_LOCAL_DC, "default: dc6");
249+
.containsEntry(
250+
StartupOptionsBuilder.DRIVER_BAGGAGE,
251+
"{\"default\": {\"DefaultLoadBalancingPolicy\":{\"localDc\":\"dc6\"}}}");
192252
assertThat(startup.options).containsEntry(Startup.COMPRESSION_KEY, "snappy");
193253
assertDefaultStartupOptions(startup);
194254
}
@@ -226,7 +286,9 @@ public void should_ignore_configuration_when_programmatic_values_provided() {
226286
assertThat(startup.options)
227287
.containsEntry(StartupOptionsBuilder.APPLICATION_NAME_KEY, "Custom_App_Name")
228288
.containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version")
229-
.containsEntry(StartupOptionsBuilder.DRIVER_LOCAL_DC, "default: us-west-2");
289+
.containsEntry(
290+
StartupOptionsBuilder.DRIVER_BAGGAGE,
291+
"{\"default\": {\"DefaultLoadBalancingPolicy\":{\"localDc\":\"us-west-2\"}}}");
230292
}
231293

232294
@Test
@@ -246,12 +308,14 @@ public void should_include_all_local_dc_in_startup_message() {
246308
.containsEntry(StartupOptionsBuilder.APPLICATION_NAME_KEY, "Custom_App_Name")
247309
.containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version")
248310
.containsEntry(
249-
StartupOptionsBuilder.DRIVER_LOCAL_DC,
250-
"default: us-west-2, oltp: us-east-2, olap: eu-central-1");
311+
StartupOptionsBuilder.DRIVER_BAGGAGE,
312+
"{\"default\": {\"DefaultLoadBalancingPolicy\":{\"localDc\":\"us-west-2\"}}, "
313+
+ "\"oltp\": {\"DefaultLoadBalancingPolicy\":{\"localDc\":\"us-east-2\"}}, "
314+
+ "\"olap\": {\"DefaultLoadBalancingPolicy\":{\"localDc\":\"eu-central-1\"}}}");
251315
}
252316

253317
@Test
254-
public void should_skip_non_local_dc_lbp_in_startup_message() {
318+
public void should_include_all_lbp_details_in_startup_message() {
255319
when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none"))
256320
.thenReturn("none");
257321
when(defaultProfile.getName()).thenReturn(DriverExecutionProfile.DEFAULT_NAME);
@@ -263,16 +327,22 @@ public void should_skip_non_local_dc_lbp_in_startup_message() {
263327
@Override
264328
public Map<String, LoadBalancingPolicy> getLoadBalancingPolicies() {
265329
ImmutableMap.Builder<String, LoadBalancingPolicy> map = ImmutableMap.builder();
266-
LocalDcAwareLoadBalancingPolicy loadBalancingPolicy =
267-
mock(LocalDcAwareLoadBalancingPolicy.class);
268-
when(loadBalancingPolicy.getLocalDatacenter()).thenReturn("dc1");
269-
map.put("oltp", loadBalancingPolicy);
330+
map.put(
331+
"oltp",
332+
mockLoadBalancingPolicy("oltp", "dc1", 2, true, ImmutableList.of("dc2", "dc3")));
270333
map.put("default", mock(LoadBalancingPolicy.class));
271334
return map.build();
272335
}
273336
};
274337
Startup startup = new Startup(driverContext.getStartupOptions());
275338

276-
assertThat(startup.options).containsEntry(StartupOptionsBuilder.DRIVER_LOCAL_DC, "oltp: dc1");
339+
assertThat(startup.options)
340+
.containsEntry(
341+
StartupOptionsBuilder.DRIVER_BAGGAGE,
342+
"{\"oltp\": {\"DefaultLoadBalancingPolicy\":{"
343+
+ "\"localDc\":\"dc1\","
344+
+ "\"preferredRemoteDcs\":[\"dc2\", \"dc3\"],"
345+
+ "\"allowDcFailoverForLocalCl\":true,"
346+
+ "\"maxNodesPerRemoteDc\":2}}}");
277347
}
278348
}

0 commit comments

Comments
 (0)