From 1afcf843e5604722dbe6b94c4f57acb95d542374 Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Mon, 14 Apr 2025 16:40:53 +0200 Subject: [PATCH 1/6] CASSJAVA-92: Local DC provided for nodetool clientstats --- .../core/context/StartupOptionsBuilder.java | 19 +++++++ .../context/DseStartupOptionsBuilderTest.java | 52 ++++++++++++------- 2 files changed, 51 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java b/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java index 684d6b01b9c..7a9907b73f2 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java @@ -18,6 +18,7 @@ package com.datastax.oss.driver.internal.core.context; import com.datastax.dse.driver.api.core.config.DseDriverOption; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.session.Session; import com.datastax.oss.driver.api.core.uuid.Uuids; @@ -33,6 +34,7 @@ public class StartupOptionsBuilder { public static final String DRIVER_NAME_KEY = "DRIVER_NAME"; public static final String DRIVER_VERSION_KEY = "DRIVER_VERSION"; + public static final String DRIVER_LOCAL_DC = "DRIVER_LOCAL_DC"; public static final String APPLICATION_NAME_KEY = "APPLICATION_NAME"; public static final String APPLICATION_VERSION_KEY = "APPLICATION_VERSION"; public static final String CLIENT_ID_KEY = "CLIENT_ID"; @@ -41,6 +43,7 @@ public class StartupOptionsBuilder { private UUID clientId; private String applicationName; private String applicationVersion; + private String applicationLocalDc; public StartupOptionsBuilder(InternalDriverContext context) { this.context = context; @@ -119,6 +122,12 @@ public Map build() { if (applicationVersion != null) { builder.put(APPLICATION_VERSION_KEY, applicationVersion); } + if (applicationLocalDc == null) { + applicationLocalDc = localDc(config); + } + if (applicationLocalDc != null) { + builder.put(DRIVER_LOCAL_DC, applicationLocalDc); + } return builder.build(); } @@ -142,4 +151,14 @@ protected String getDriverName() { protected String getDriverVersion() { return Session.OSS_DRIVER_COORDINATES.getVersion().toString(); } + + private String localDc(DriverExecutionProfile profile) { + String dc = context.getLocalDatacenter(profile.getName()); // DC set programmatically + if (dc == null && profile.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER)) { + dc = + profile.getString( + DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER); // DC from configuration + } + return dc; + } } diff --git a/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java b/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java index 9e4556e528d..f64b3a77a34 100644 --- a/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java +++ b/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java @@ -60,15 +60,17 @@ public void before() { when(defaultProfile.isDefined(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE)).thenReturn(true); } - private void buildContext(UUID clientId, String applicationName, String applicationVersion) { - this.driverContext = - new DefaultDriverContext( - configLoader, - ProgrammaticArguments.builder() - .withStartupClientId(clientId) - .withStartupApplicationName(applicationName) - .withStartupApplicationVersion(applicationVersion) - .build()); + private void buildContext( + UUID clientId, String applicationName, String applicationVersion, String localDc) { + ProgrammaticArguments.Builder builder = + ProgrammaticArguments.builder() + .withStartupClientId(clientId) + .withStartupApplicationName(applicationName) + .withStartupApplicationVersion(applicationVersion); + if (localDc != null) { + builder.withLocalDatacenter(DriverExecutionProfile.DEFAULT_NAME, localDc); + } + this.driverContext = new DefaultDriverContext(configLoader, builder.build()); } private void assertDefaultStartupOptions(Startup startup) { @@ -86,7 +88,7 @@ private void assertDefaultStartupOptions(Startup startup) { public void should_build_startup_options_with_no_compression_if_undefined() { when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) .thenReturn("none"); - buildContext(null, null, null); + buildContext(null, null, null, null); Startup startup = new Startup(driverContext.getStartupOptions()); assertThat(startup.options).doesNotContainKey(Startup.COMPRESSION_KEY); assertDefaultStartupOptions(startup); @@ -97,7 +99,7 @@ public void should_build_startup_options_with_no_compression_if_undefined() { public void should_build_startup_options_with_compression(String compression) { when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) .thenReturn(compression); - buildContext(null, null, null); + buildContext(null, null, null, null); Startup startup = new Startup(driverContext.getStartupOptions()); // assert the compression option is present assertThat(startup.options).containsEntry(Startup.COMPRESSION_KEY, compression); @@ -110,7 +112,7 @@ public void should_build_startup_options_with_compression(String compression) { public void should_fail_to_build_startup_options_with_invalid_compression() { when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) .thenReturn("foobar"); - buildContext(null, null, null); + buildContext(null, null, null, null); assertThatIllegalArgumentException() .isThrownBy(() -> new Startup(driverContext.getStartupOptions())); } @@ -120,7 +122,7 @@ public void should_build_startup_options_with_client_id() { when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) .thenReturn("none"); UUID customClientId = Uuids.random(); - buildContext(customClientId, null, null); + buildContext(customClientId, null, null, null); Startup startup = new Startup(driverContext.getStartupOptions()); // assert the client id is present assertThat(startup.options) @@ -135,7 +137,7 @@ public void should_build_startup_options_with_client_id() { public void should_build_startup_options_with_application_version_and_name() { when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) .thenReturn("none"); - buildContext(null, "Custom_App_Name", "Custom_App_Version"); + buildContext(null, "Custom_App_Name", "Custom_App_Version", null); Startup startup = new Startup(driverContext.getStartupOptions()); // assert the app name and version are present assertThat(startup.options) @@ -151,15 +153,17 @@ public void should_build_startup_options_with_all_options() { // mock config to specify "snappy" compression when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) .thenReturn("snappy"); + when(defaultProfile.getName()).thenReturn(DriverExecutionProfile.DEFAULT_NAME); UUID customClientId = Uuids.random(); - buildContext(customClientId, "Custom_App_Name", "Custom_App_Version"); + buildContext(customClientId, "Custom_App_Name", "Custom_App_Version", "dc6"); Startup startup = new Startup(driverContext.getStartupOptions()); assertThat(startup.options) .containsEntry(StartupOptionsBuilder.CLIENT_ID_KEY, customClientId.toString()) .containsEntry(StartupOptionsBuilder.APPLICATION_NAME_KEY, "Custom_App_Name") - .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version"); + .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version") + .containsEntry(StartupOptionsBuilder.DRIVER_LOCAL_DC, "dc6"); assertThat(startup.options).containsEntry(Startup.COMPRESSION_KEY, "snappy"); assertDefaultStartupOptions(startup); } @@ -172,25 +176,33 @@ public void should_use_configuration_when_no_programmatic_values_provided() { .thenReturn("Config_App_Version"); when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) .thenReturn("none"); + when(defaultProfile.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER)) + .thenReturn("Config_DC_1"); + when(defaultProfile.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER)) + .thenReturn(true); + when(defaultProfile.getName()).thenReturn(DriverExecutionProfile.DEFAULT_NAME); - buildContext(null, null, null); + buildContext(null, null, null, null); Startup startup = new Startup(driverContext.getStartupOptions()); assertThat(startup.options) .containsEntry(StartupOptionsBuilder.APPLICATION_NAME_KEY, "Config_App_Name") - .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Config_App_Version"); + .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Config_App_Version") + .containsEntry(StartupOptionsBuilder.DRIVER_LOCAL_DC, "Config_DC_1"); } @Test public void should_ignore_configuration_when_programmatic_values_provided() { when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) .thenReturn("none"); + when(defaultProfile.getName()).thenReturn(DriverExecutionProfile.DEFAULT_NAME); - buildContext(null, "Custom_App_Name", "Custom_App_Version"); + buildContext(null, "Custom_App_Name", "Custom_App_Version", "us-west-2"); Startup startup = new Startup(driverContext.getStartupOptions()); assertThat(startup.options) .containsEntry(StartupOptionsBuilder.APPLICATION_NAME_KEY, "Custom_App_Name") - .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version"); + .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version") + .containsEntry(StartupOptionsBuilder.DRIVER_LOCAL_DC, "us-west-2"); } } From 101621a9623cb9d8bcfd9c4065563e2959edcc53 Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Thu, 17 Apr 2025 11:48:38 +0200 Subject: [PATCH 2/6] Lookup data center from LBP --- .../LocalDcAwareLoadBalancingPolicy.java | 28 +++++++++++++++++++ .../core/context/StartupOptionsBuilder.java | 20 ++++++++----- .../BasicLoadBalancingPolicy.java | 6 ++-- 3 files changed, 45 insertions(+), 9 deletions(-) create mode 100644 core/src/main/java/com/datastax/oss/driver/api/core/loadbalancing/LocalDcAwareLoadBalancingPolicy.java diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/loadbalancing/LocalDcAwareLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/api/core/loadbalancing/LocalDcAwareLoadBalancingPolicy.java new file mode 100644 index 00000000000..f69d35f182d --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/api/core/loadbalancing/LocalDcAwareLoadBalancingPolicy.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.api.core.loadbalancing; + +import edu.umd.cs.findbugs.annotations.Nullable; + +/** Load balancing policy taking into account local datacenter of the application. */ +public interface LocalDcAwareLoadBalancingPolicy extends LoadBalancingPolicy { + + /** Returns the local datacenter name, if known; empty otherwise. */ + @Nullable + String getLocalDatacenter(); +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java b/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java index 7a9907b73f2..f0e1a98bd60 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java @@ -20,6 +20,8 @@ import com.datastax.dse.driver.api.core.config.DseDriverOption; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy; +import com.datastax.oss.driver.api.core.loadbalancing.LocalDcAwareLoadBalancingPolicy; import com.datastax.oss.driver.api.core.session.Session; import com.datastax.oss.driver.api.core.uuid.Uuids; import com.datastax.oss.protocol.internal.request.Startup; @@ -43,7 +45,6 @@ public class StartupOptionsBuilder { private UUID clientId; private String applicationName; private String applicationVersion; - private String applicationLocalDc; public StartupOptionsBuilder(InternalDriverContext context) { this.context = context; @@ -122,9 +123,8 @@ public Map build() { if (applicationVersion != null) { builder.put(APPLICATION_VERSION_KEY, applicationVersion); } - if (applicationLocalDc == null) { - applicationLocalDc = localDc(config); - } + // do not cache local DC as it can change within LBP implementation + String applicationLocalDc = localDc(config); if (applicationLocalDc != null) { builder.put(DRIVER_LOCAL_DC, applicationLocalDc); } @@ -154,10 +154,16 @@ protected String getDriverVersion() { private String localDc(DriverExecutionProfile profile) { String dc = context.getLocalDatacenter(profile.getName()); // DC set programmatically + if (dc == null) { + // DC from load balancing policy + LoadBalancingPolicy lbp = context.getLoadBalancingPolicy(DriverExecutionProfile.DEFAULT_NAME); + if (lbp instanceof LocalDcAwareLoadBalancingPolicy) { + dc = ((LocalDcAwareLoadBalancingPolicy) lbp).getLocalDatacenter(); + } + } if (dc == null && profile.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER)) { - dc = - profile.getString( - DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER); // DC from configuration + // DC from configuration + dc = profile.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER); } return dc; } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java index 587ef4183bd..04a031f11ac 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java @@ -24,6 +24,7 @@ import com.datastax.oss.driver.api.core.context.DriverContext; import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy; +import com.datastax.oss.driver.api.core.loadbalancing.LocalDcAwareLoadBalancingPolicy; import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance; import com.datastax.oss.driver.api.core.loadbalancing.NodeDistanceEvaluator; import com.datastax.oss.driver.api.core.metadata.Node; @@ -99,7 +100,7 @@ * DefaultLoadBalancingPolicy}. */ @ThreadSafe -public class BasicLoadBalancingPolicy implements LoadBalancingPolicy { +public class BasicLoadBalancingPolicy implements LocalDcAwareLoadBalancingPolicy { private static final Logger LOG = LoggerFactory.getLogger(BasicLoadBalancingPolicy.class); @@ -155,7 +156,8 @@ public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String * Before initialization, this method always returns null. */ @Nullable - protected String getLocalDatacenter() { + @Override + public String getLocalDatacenter() { return localDc; } From 382905305c6cdf82df4b775b290c6bbfea67b770 Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Thu, 22 May 2025 11:53:02 +0200 Subject: [PATCH 3/6] Return local DC for all profiles --- .../core/context/StartupOptionsBuilder.java | 38 ++++++---- .../context/DseStartupOptionsBuilderTest.java | 70 +++++++++++++++---- 2 files changed, 80 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java b/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java index f0e1a98bd60..b5f5b9774c6 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java @@ -18,7 +18,6 @@ package com.datastax.oss.driver.internal.core.context; import com.datastax.dse.driver.api.core.config.DseDriverOption; -import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy; import com.datastax.oss.driver.api.core.loadbalancing.LocalDcAwareLoadBalancingPolicy; @@ -124,9 +123,9 @@ public Map build() { builder.put(APPLICATION_VERSION_KEY, applicationVersion); } // do not cache local DC as it can change within LBP implementation - String applicationLocalDc = localDc(config); - if (applicationLocalDc != null) { - builder.put(DRIVER_LOCAL_DC, applicationLocalDc); + String applicationLocalDcs = localDcs(); + if (applicationLocalDcs != null) { + builder.put(DRIVER_LOCAL_DC, applicationLocalDcs); } return builder.build(); @@ -152,19 +151,28 @@ protected String getDriverVersion() { return Session.OSS_DRIVER_COORDINATES.getVersion().toString(); } - private String localDc(DriverExecutionProfile profile) { - String dc = context.getLocalDatacenter(profile.getName()); // DC set programmatically - if (dc == null) { - // DC from load balancing policy - LoadBalancingPolicy lbp = context.getLoadBalancingPolicy(DriverExecutionProfile.DEFAULT_NAME); - if (lbp instanceof LocalDcAwareLoadBalancingPolicy) { - dc = ((LocalDcAwareLoadBalancingPolicy) lbp).getLocalDatacenter(); + private String localDcs() { + StringBuilder result = new StringBuilder(); + boolean first = true; + for (Map.Entry entry : + context.getLoadBalancingPolicies().entrySet()) { + String dc = getLocalDc(entry.getValue()); + if (dc != null) { + if (!first) { + result.append(", "); + } else { + first = false; + } + result.append(entry.getKey()).append(": ").append(dc); } } - if (dc == null && profile.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER)) { - // DC from configuration - dc = profile.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER); + return first ? null : result.toString(); + } + + private String getLocalDc(LoadBalancingPolicy loadBalancingPolicy) { + if (loadBalancingPolicy instanceof LocalDcAwareLoadBalancingPolicy) { + return ((LocalDcAwareLoadBalancingPolicy) loadBalancingPolicy).getLocalDatacenter(); } - return dc; + return null; } } diff --git a/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java b/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java index f64b3a77a34..e296924375a 100644 --- a/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java +++ b/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java @@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; @@ -28,14 +29,19 @@ import com.datastax.oss.driver.api.core.config.DriverConfig; import com.datastax.oss.driver.api.core.config.DriverConfigLoader; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy; +import com.datastax.oss.driver.api.core.loadbalancing.LocalDcAwareLoadBalancingPolicy; import com.datastax.oss.driver.api.core.session.ProgrammaticArguments; import com.datastax.oss.driver.api.core.session.Session; import com.datastax.oss.driver.api.core.uuid.Uuids; import com.datastax.oss.driver.internal.core.context.DefaultDriverContext; import com.datastax.oss.driver.internal.core.context.StartupOptionsBuilder; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap; import com.datastax.oss.protocol.internal.request.Startup; import com.tngtech.java.junit.dataprovider.DataProvider; import com.tngtech.java.junit.dataprovider.DataProviderRunner; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.util.Map; import java.util.UUID; import org.junit.Before; import org.junit.Test; @@ -61,16 +67,34 @@ public void before() { } private void buildContext( - UUID clientId, String applicationName, String applicationVersion, String localDc) { + UUID clientId, + String applicationName, + String applicationVersion, + Map localDcPerProfile) { ProgrammaticArguments.Builder builder = ProgrammaticArguments.builder() .withStartupClientId(clientId) .withStartupApplicationName(applicationName) .withStartupApplicationVersion(applicationVersion); - if (localDc != null) { - builder.withLocalDatacenter(DriverExecutionProfile.DEFAULT_NAME, localDc); - } - this.driverContext = new DefaultDriverContext(configLoader, builder.build()); + this.driverContext = + new DefaultDriverContext(configLoader, builder.build()) { + @NonNull + @Override + public Map getLoadBalancingPolicies() { + if (localDcPerProfile != null) { + ImmutableMap.Builder map = ImmutableMap.builder(); + localDcPerProfile.forEach( + (profile, dc) -> { + LocalDcAwareLoadBalancingPolicy loadBalancingPolicy = + mock(LocalDcAwareLoadBalancingPolicy.class); + when(loadBalancingPolicy.getLocalDatacenter()).thenReturn(dc); + map.put(profile, loadBalancingPolicy); + }); + return map.build(); + } + return super.getLoadBalancingPolicies(); + } + }; } private void assertDefaultStartupOptions(Startup startup) { @@ -157,13 +181,14 @@ public void should_build_startup_options_with_all_options() { UUID customClientId = Uuids.random(); - buildContext(customClientId, "Custom_App_Name", "Custom_App_Version", "dc6"); + buildContext( + customClientId, "Custom_App_Name", "Custom_App_Version", ImmutableMap.of("default", "dc6")); Startup startup = new Startup(driverContext.getStartupOptions()); assertThat(startup.options) .containsEntry(StartupOptionsBuilder.CLIENT_ID_KEY, customClientId.toString()) .containsEntry(StartupOptionsBuilder.APPLICATION_NAME_KEY, "Custom_App_Name") .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version") - .containsEntry(StartupOptionsBuilder.DRIVER_LOCAL_DC, "dc6"); + .containsEntry(StartupOptionsBuilder.DRIVER_LOCAL_DC, "default: dc6"); assertThat(startup.options).containsEntry(Startup.COMPRESSION_KEY, "snappy"); assertDefaultStartupOptions(startup); } @@ -176,8 +201,6 @@ public void should_use_configuration_when_no_programmatic_values_provided() { .thenReturn("Config_App_Version"); when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) .thenReturn("none"); - when(defaultProfile.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER)) - .thenReturn("Config_DC_1"); when(defaultProfile.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER)) .thenReturn(true); when(defaultProfile.getName()).thenReturn(DriverExecutionProfile.DEFAULT_NAME); @@ -187,8 +210,7 @@ public void should_use_configuration_when_no_programmatic_values_provided() { assertThat(startup.options) .containsEntry(StartupOptionsBuilder.APPLICATION_NAME_KEY, "Config_App_Name") - .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Config_App_Version") - .containsEntry(StartupOptionsBuilder.DRIVER_LOCAL_DC, "Config_DC_1"); + .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Config_App_Version"); } @Test @@ -197,12 +219,34 @@ public void should_ignore_configuration_when_programmatic_values_provided() { .thenReturn("none"); when(defaultProfile.getName()).thenReturn(DriverExecutionProfile.DEFAULT_NAME); - buildContext(null, "Custom_App_Name", "Custom_App_Version", "us-west-2"); + buildContext( + null, "Custom_App_Name", "Custom_App_Version", ImmutableMap.of("default", "us-west-2")); Startup startup = new Startup(driverContext.getStartupOptions()); assertThat(startup.options) .containsEntry(StartupOptionsBuilder.APPLICATION_NAME_KEY, "Custom_App_Name") .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version") - .containsEntry(StartupOptionsBuilder.DRIVER_LOCAL_DC, "us-west-2"); + .containsEntry(StartupOptionsBuilder.DRIVER_LOCAL_DC, "default: us-west-2"); + } + + @Test + public void should_include_all_local_dc_in_startup_message() { + when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) + .thenReturn("none"); + when(defaultProfile.getName()).thenReturn(DriverExecutionProfile.DEFAULT_NAME); + + buildContext( + null, + "Custom_App_Name", + "Custom_App_Version", + ImmutableMap.of("default", "us-west-2", "oltp", "us-east-2", "olap", "eu-central-1")); + Startup startup = new Startup(driverContext.getStartupOptions()); + + assertThat(startup.options) + .containsEntry(StartupOptionsBuilder.APPLICATION_NAME_KEY, "Custom_App_Name") + .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version") + .containsEntry( + StartupOptionsBuilder.DRIVER_LOCAL_DC, + "default: us-west-2, oltp: us-east-2, olap: eu-central-1"); } } From 9b11d53bdf509bce13054926f2e8b1da24e77516 Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Wed, 4 Jun 2025 11:38:50 +0200 Subject: [PATCH 4/6] Apply review comments --- .../core/context/StartupOptionsBuilder.java | 44 +++++++++---------- .../context/DseStartupOptionsBuilderTest.java | 26 +++++++++++ 2 files changed, 48 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java b/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java index b5f5b9774c6..544849c3070 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java @@ -23,11 +23,15 @@ import com.datastax.oss.driver.api.core.loadbalancing.LocalDcAwareLoadBalancingPolicy; import com.datastax.oss.driver.api.core.session.Session; import com.datastax.oss.driver.api.core.uuid.Uuids; +import com.datastax.oss.driver.shaded.guava.common.base.Joiner; +import com.datastax.oss.driver.shaded.guava.common.collect.Maps; import com.datastax.oss.protocol.internal.request.Startup; import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap; import edu.umd.cs.findbugs.annotations.Nullable; import java.util.Map; +import java.util.Optional; import java.util.UUID; +import java.util.stream.Collectors; import net.jcip.annotations.Immutable; @Immutable @@ -123,10 +127,7 @@ public Map build() { builder.put(APPLICATION_VERSION_KEY, applicationVersion); } // do not cache local DC as it can change within LBP implementation - String applicationLocalDcs = localDcs(); - if (applicationLocalDcs != null) { - builder.put(DRIVER_LOCAL_DC, applicationLocalDcs); - } + localDcs().ifPresent(s -> builder.put(DRIVER_LOCAL_DC, s)); return builder.build(); } @@ -151,28 +152,27 @@ protected String getDriverVersion() { return Session.OSS_DRIVER_COORDINATES.getVersion().toString(); } - private String localDcs() { - StringBuilder result = new StringBuilder(); - boolean first = true; - for (Map.Entry entry : - context.getLoadBalancingPolicies().entrySet()) { - String dc = getLocalDc(entry.getValue()); - if (dc != null) { - if (!first) { - result.append(", "); - } else { - first = false; - } - result.append(entry.getKey()).append(": ").append(dc); - } + private Optional localDcs() { + Joiner joiner = Joiner.on(": "); + Map> lbpToDc = + Maps.transformValues(context.getLoadBalancingPolicies(), this::getLocalDc); + if (lbpToDc.isEmpty()) { + return Optional.empty(); } - return first ? null : result.toString(); + return Optional.of( + lbpToDc.entrySet().stream() + .filter(e -> e.getValue().isPresent()) + .map(entry -> joiner.join(entry.getKey(), entry.getValue().get())) + .collect(Collectors.joining(", "))); } - private String getLocalDc(LoadBalancingPolicy loadBalancingPolicy) { + private Optional getLocalDc(LoadBalancingPolicy loadBalancingPolicy) { if (loadBalancingPolicy instanceof LocalDcAwareLoadBalancingPolicy) { - return ((LocalDcAwareLoadBalancingPolicy) loadBalancingPolicy).getLocalDatacenter(); + String dc = ((LocalDcAwareLoadBalancingPolicy) loadBalancingPolicy).getLocalDatacenter(); + if (dc != null) { + return Optional.of(dc); + } } - return null; + return Optional.empty(); } } diff --git a/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java b/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java index e296924375a..b7260266611 100644 --- a/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java +++ b/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java @@ -249,4 +249,30 @@ public void should_include_all_local_dc_in_startup_message() { StartupOptionsBuilder.DRIVER_LOCAL_DC, "default: us-west-2, oltp: us-east-2, olap: eu-central-1"); } + + @Test + public void should_skip_non_local_dc_lbp_in_startup_message() { + when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) + .thenReturn("none"); + when(defaultProfile.getName()).thenReturn(DriverExecutionProfile.DEFAULT_NAME); + + ProgrammaticArguments.Builder builder = ProgrammaticArguments.builder(); + DefaultDriverContext driverContext = + new DefaultDriverContext(configLoader, builder.build()) { + @NonNull + @Override + public Map getLoadBalancingPolicies() { + ImmutableMap.Builder map = ImmutableMap.builder(); + LocalDcAwareLoadBalancingPolicy loadBalancingPolicy = + mock(LocalDcAwareLoadBalancingPolicy.class); + when(loadBalancingPolicy.getLocalDatacenter()).thenReturn("dc1"); + map.put("oltp", loadBalancingPolicy); + map.put("default", mock(LoadBalancingPolicy.class)); + return map.build(); + } + }; + Startup startup = new Startup(driverContext.getStartupOptions()); + + assertThat(startup.options).containsEntry(StartupOptionsBuilder.DRIVER_LOCAL_DC, "oltp: dc1"); + } } From 3ce91e36bba88462027d111a9ac74d5ad87dfe9e Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Thu, 17 Jul 2025 16:31:21 +0200 Subject: [PATCH 5/6] Refactor local DC to driver baggage --- .../LocalDcAwareLoadBalancingPolicy.java | 5 + .../core/context/StartupOptionsBuilder.java | 35 +++---- .../BasicLoadBalancingPolicy.java | 43 +++++++++ .../DefaultLoadBalancingPolicy.java | 11 +++ .../driver/internal/core/util/Strings.java | 12 +++ .../context/DseStartupOptionsBuilderTest.java | 94 ++++++++++++++++--- 6 files changed, 171 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/loadbalancing/LocalDcAwareLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/api/core/loadbalancing/LocalDcAwareLoadBalancingPolicy.java index f69d35f182d..d5604b4bf53 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/loadbalancing/LocalDcAwareLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/loadbalancing/LocalDcAwareLoadBalancingPolicy.java @@ -17,6 +17,7 @@ */ package com.datastax.oss.driver.api.core.loadbalancing; +import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; /** Load balancing policy taking into account local datacenter of the application. */ @@ -25,4 +26,8 @@ public interface LocalDcAwareLoadBalancingPolicy extends LoadBalancingPolicy { /** Returns the local datacenter name, if known; empty otherwise. */ @Nullable String getLocalDatacenter(); + + /** Returns JSON string containing all properties that impact C* node connectivity. */ + @NonNull + String getStartupConfiguration(); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java b/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java index 544849c3070..f64472761a8 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java @@ -23,6 +23,7 @@ import com.datastax.oss.driver.api.core.loadbalancing.LocalDcAwareLoadBalancingPolicy; import com.datastax.oss.driver.api.core.session.Session; import com.datastax.oss.driver.api.core.uuid.Uuids; +import com.datastax.oss.driver.internal.core.util.Strings; import com.datastax.oss.driver.shaded.guava.common.base.Joiner; import com.datastax.oss.driver.shaded.guava.common.collect.Maps; import com.datastax.oss.protocol.internal.request.Startup; @@ -39,7 +40,7 @@ public class StartupOptionsBuilder { public static final String DRIVER_NAME_KEY = "DRIVER_NAME"; public static final String DRIVER_VERSION_KEY = "DRIVER_VERSION"; - public static final String DRIVER_LOCAL_DC = "DRIVER_LOCAL_DC"; + public static final String DRIVER_BAGGAGE = "DRIVER_BAGGAGE"; public static final String APPLICATION_NAME_KEY = "APPLICATION_NAME"; public static final String APPLICATION_VERSION_KEY = "APPLICATION_VERSION"; public static final String CLIENT_ID_KEY = "CLIENT_ID"; @@ -127,7 +128,7 @@ public Map build() { builder.put(APPLICATION_VERSION_KEY, applicationVersion); } // do not cache local DC as it can change within LBP implementation - localDcs().ifPresent(s -> builder.put(DRIVER_LOCAL_DC, s)); + driverBaggage().ifPresent(s -> builder.put(DRIVER_BAGGAGE, s)); return builder.build(); } @@ -152,26 +153,26 @@ protected String getDriverVersion() { return Session.OSS_DRIVER_COORDINATES.getVersion().toString(); } - private Optional localDcs() { + private Optional driverBaggage() { Joiner joiner = Joiner.on(": "); - Map> lbpToDc = - Maps.transformValues(context.getLoadBalancingPolicies(), this::getLocalDc); - if (lbpToDc.isEmpty()) { - return Optional.empty(); - } + Map> lbpToBag = + Maps.transformValues(context.getLoadBalancingPolicies(), this::getDriverBaggage); return Optional.of( - lbpToDc.entrySet().stream() - .filter(e -> e.getValue().isPresent()) - .map(entry -> joiner.join(entry.getKey(), entry.getValue().get())) - .collect(Collectors.joining(", "))); + "{" + + lbpToBag.entrySet().stream() + .filter(e -> e.getValue().isPresent()) + .map( + entry -> + joiner.join(Strings.doubleQuote(entry.getKey()), entry.getValue().get())) + .collect(Collectors.joining(", ")) + + "}"); } - private Optional getLocalDc(LoadBalancingPolicy loadBalancingPolicy) { + private Optional getDriverBaggage(LoadBalancingPolicy loadBalancingPolicy) { if (loadBalancingPolicy instanceof LocalDcAwareLoadBalancingPolicy) { - String dc = ((LocalDcAwareLoadBalancingPolicy) loadBalancingPolicy).getLocalDatacenter(); - if (dc != null) { - return Optional.of(dc); - } + LocalDcAwareLoadBalancingPolicy dcAwareLbp = + (LocalDcAwareLoadBalancingPolicy) loadBalancingPolicy; + return Optional.of(dcAwareLbp.getStartupConfiguration()); } return Optional.empty(); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java index 04a031f11ac..777fa66ce3e 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java @@ -41,6 +41,7 @@ import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.NodeSet; import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.SingleDcNodeSet; import com.datastax.oss.driver.internal.core.util.ArrayUtils; +import com.datastax.oss.driver.internal.core.util.Strings; import com.datastax.oss.driver.internal.core.util.collection.CompositeQueryPlan; import com.datastax.oss.driver.internal.core.util.collection.LazyQueryPlan; import com.datastax.oss.driver.internal.core.util.collection.QueryPlan; @@ -62,6 +63,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntUnaryOperator; +import java.util.stream.Collectors; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -161,6 +163,47 @@ public String getLocalDatacenter() { return localDc; } + @NonNull + @Override + public String getStartupConfiguration() { + StringBuilder builder = new StringBuilder(); + builder + .append("{") + .append(Strings.doubleQuote(BasicLoadBalancingPolicy.class.getSimpleName())) + .append(":") + .append("{") + .append(Strings.doubleQuote("localDc")) + .append(":") + .append(Strings.doubleQuoteNullable(localDc)); + if (!preferredRemoteDcs.isEmpty()) { + builder + .append(",") + .append(Strings.doubleQuote("preferredRemoteDcs")) + .append(":[") + .append( + preferredRemoteDcs.stream() + .map(Strings::doubleQuote) + .collect(Collectors.joining(", "))) + .append("]"); + } + if (allowDcFailoverForLocalCl) { + builder + .append(",") + .append(Strings.doubleQuote("allowDcFailoverForLocalCl")) + .append(":") + .append(allowDcFailoverForLocalCl); + } + if (maxNodesPerRemoteDc > 0) { + builder + .append(",") + .append(Strings.doubleQuote("maxNodesPerRemoteDc")) + .append(":") + .append(maxNodesPerRemoteDc); + } + builder.append("}}"); + return builder.toString(); + } + /** @return The nodes currently considered as live. */ protected NodeSet getLiveNodes() { return liveNodes; diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java index 0f03cbb3643..533239a6d4e 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java @@ -350,4 +350,15 @@ private boolean hasSufficientResponses(long now) { return this.oldest - threshold >= 0; } } + + @NonNull + @Override + public String getStartupConfiguration() { + String result = super.getStartupConfiguration(); + result = + result.replaceFirst( + BasicLoadBalancingPolicy.class.getSimpleName(), + DefaultLoadBalancingPolicy.class.getSimpleName()); + return result; + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/util/Strings.java b/core/src/main/java/com/datastax/oss/driver/internal/core/util/Strings.java index 2e85b451c75..d6d833d983f 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/util/Strings.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/util/Strings.java @@ -81,6 +81,18 @@ public static String doubleQuote(String value) { return quote(value, '"'); } + /** + * Double quote the given string; double quotes are escaped. If the given string is null, this + * method returns ({@code null}). + * + * @param value The value to double quote. + * @return The double quoted string. + */ + public static String doubleQuoteNullable(String value) { + if (value == null) return null; + return quote(value, '"'); + } + /** * Unquote the given string if it is double quoted; double quotes are unescaped. If the given * string is not double quoted, it is returned without any modification. diff --git a/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java b/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java index b7260266611..15e4bcbfc60 100644 --- a/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java +++ b/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java @@ -31,17 +31,27 @@ import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy; import com.datastax.oss.driver.api.core.loadbalancing.LocalDcAwareLoadBalancingPolicy; +import com.datastax.oss.driver.api.core.loadbalancing.NodeDistanceEvaluator; +import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.ProgrammaticArguments; import com.datastax.oss.driver.api.core.session.Session; import com.datastax.oss.driver.api.core.uuid.Uuids; +import com.datastax.oss.driver.internal.core.ConsistencyLevelRegistry; import com.datastax.oss.driver.internal.core.context.DefaultDriverContext; +import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.context.StartupOptionsBuilder; +import com.datastax.oss.driver.internal.core.loadbalancing.DefaultLoadBalancingPolicy; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap; import com.datastax.oss.protocol.internal.request.Startup; import com.tngtech.java.junit.dataprovider.DataProvider; import com.tngtech.java.junit.dataprovider.DataProviderRunner; import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import org.junit.Before; import org.junit.Test; @@ -86,8 +96,7 @@ public Map getLoadBalancingPolicies() { localDcPerProfile.forEach( (profile, dc) -> { LocalDcAwareLoadBalancingPolicy loadBalancingPolicy = - mock(LocalDcAwareLoadBalancingPolicy.class); - when(loadBalancingPolicy.getLocalDatacenter()).thenReturn(dc); + mockLoadBalancingPolicy(profile, dc, 0, false, Collections.emptyList()); map.put(profile, loadBalancingPolicy); }); return map.build(); @@ -97,6 +106,55 @@ public Map getLoadBalancingPolicies() { }; } + private LocalDcAwareLoadBalancingPolicy mockLoadBalancingPolicy( + String profile, + String localDc, + int maxNodesPerRemoteDc, + boolean allowRemoteSatisfyLocalDc, + List preferredRemoteDcs) { + // mock execution profile + DriverExecutionProfile executionProfile = mock(DriverExecutionProfile.class); + when(executionProfile.getInt( + DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_MAX_NODES_PER_REMOTE_DC)) + .thenReturn(maxNodesPerRemoteDc); + when(executionProfile.getBoolean( + DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS)) + .thenReturn(allowRemoteSatisfyLocalDc); + when(executionProfile.getStringList( + DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS)) + .thenReturn(preferredRemoteDcs); + + // mock driver config + DriverConfig driverConfig = mock(DriverConfig.class); + when(driverConfig.getProfile(profile)).thenReturn(executionProfile); + + // mock driver context + InternalDriverContext driverContext = mock(InternalDriverContext.class); + when(driverContext.getConfig()).thenReturn(driverConfig); + when(driverContext.getConsistencyLevelRegistry()) + .thenReturn(mock(ConsistencyLevelRegistry.class)); + + // mock load balancing policy + LocalDcAwareLoadBalancingPolicy loadBalancingPolicy = + new DefaultLoadBalancingPolicy(driverContext, profile) { + @NonNull + @Override + protected Optional discoverLocalDc(@NonNull Map nodes) { + return Optional.of(localDc); + } + + @NonNull + @Override + protected NodeDistanceEvaluator createNodeDistanceEvaluator( + @Nullable String localDc, @NonNull Map nodes) { + return mock(NodeDistanceEvaluator.class); + } + }; + loadBalancingPolicy.init( + Collections.emptyMap(), mock(LoadBalancingPolicy.DistanceReporter.class)); + return loadBalancingPolicy; + } + private void assertDefaultStartupOptions(Startup startup) { assertThat(startup.options).containsEntry(Startup.CQL_VERSION_KEY, "3.0.0"); assertThat(startup.options) @@ -188,7 +246,9 @@ public void should_build_startup_options_with_all_options() { .containsEntry(StartupOptionsBuilder.CLIENT_ID_KEY, customClientId.toString()) .containsEntry(StartupOptionsBuilder.APPLICATION_NAME_KEY, "Custom_App_Name") .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version") - .containsEntry(StartupOptionsBuilder.DRIVER_LOCAL_DC, "default: dc6"); + .containsEntry( + StartupOptionsBuilder.DRIVER_BAGGAGE, + "{\"default\": {\"DefaultLoadBalancingPolicy\":{\"localDc\":\"dc6\"}}}"); assertThat(startup.options).containsEntry(Startup.COMPRESSION_KEY, "snappy"); assertDefaultStartupOptions(startup); } @@ -226,7 +286,9 @@ public void should_ignore_configuration_when_programmatic_values_provided() { assertThat(startup.options) .containsEntry(StartupOptionsBuilder.APPLICATION_NAME_KEY, "Custom_App_Name") .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version") - .containsEntry(StartupOptionsBuilder.DRIVER_LOCAL_DC, "default: us-west-2"); + .containsEntry( + StartupOptionsBuilder.DRIVER_BAGGAGE, + "{\"default\": {\"DefaultLoadBalancingPolicy\":{\"localDc\":\"us-west-2\"}}}"); } @Test @@ -246,12 +308,14 @@ public void should_include_all_local_dc_in_startup_message() { .containsEntry(StartupOptionsBuilder.APPLICATION_NAME_KEY, "Custom_App_Name") .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version") .containsEntry( - StartupOptionsBuilder.DRIVER_LOCAL_DC, - "default: us-west-2, oltp: us-east-2, olap: eu-central-1"); + StartupOptionsBuilder.DRIVER_BAGGAGE, + "{\"default\": {\"DefaultLoadBalancingPolicy\":{\"localDc\":\"us-west-2\"}}, " + + "\"oltp\": {\"DefaultLoadBalancingPolicy\":{\"localDc\":\"us-east-2\"}}, " + + "\"olap\": {\"DefaultLoadBalancingPolicy\":{\"localDc\":\"eu-central-1\"}}}"); } @Test - public void should_skip_non_local_dc_lbp_in_startup_message() { + public void should_include_all_lbp_details_in_startup_message() { when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) .thenReturn("none"); when(defaultProfile.getName()).thenReturn(DriverExecutionProfile.DEFAULT_NAME); @@ -263,16 +327,22 @@ public void should_skip_non_local_dc_lbp_in_startup_message() { @Override public Map getLoadBalancingPolicies() { ImmutableMap.Builder map = ImmutableMap.builder(); - LocalDcAwareLoadBalancingPolicy loadBalancingPolicy = - mock(LocalDcAwareLoadBalancingPolicy.class); - when(loadBalancingPolicy.getLocalDatacenter()).thenReturn("dc1"); - map.put("oltp", loadBalancingPolicy); + map.put( + "oltp", + mockLoadBalancingPolicy("oltp", "dc1", 2, true, ImmutableList.of("dc2", "dc3"))); map.put("default", mock(LoadBalancingPolicy.class)); return map.build(); } }; Startup startup = new Startup(driverContext.getStartupOptions()); - assertThat(startup.options).containsEntry(StartupOptionsBuilder.DRIVER_LOCAL_DC, "oltp: dc1"); + assertThat(startup.options) + .containsEntry( + StartupOptionsBuilder.DRIVER_BAGGAGE, + "{\"oltp\": {\"DefaultLoadBalancingPolicy\":{" + + "\"localDc\":\"dc1\"," + + "\"preferredRemoteDcs\":[\"dc2\", \"dc3\"]," + + "\"allowDcFailoverForLocalCl\":true," + + "\"maxNodesPerRemoteDc\":2}}}"); } } From 95aa6980f959ac7147cbf392c117b0855842ab5a Mon Sep 17 00:00:00 2001 From: Lukasz Antoniak Date: Mon, 11 Aug 2025 14:42:16 +0200 Subject: [PATCH 6/6] Improve JSON serialization --- .../LocalDcAwareLoadBalancingPolicy.java | 5 ++- .../core/context/StartupOptionsBuilder.java | 36 ++++++++-------- .../BasicLoadBalancingPolicy.java | 41 ++++--------------- .../DefaultLoadBalancingPolicy.java | 13 +++--- .../context/DseStartupOptionsBuilderTest.java | 14 +++---- 5 files changed, 43 insertions(+), 66 deletions(-) diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/loadbalancing/LocalDcAwareLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/api/core/loadbalancing/LocalDcAwareLoadBalancingPolicy.java index d5604b4bf53..a94c9cc439b 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/loadbalancing/LocalDcAwareLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/loadbalancing/LocalDcAwareLoadBalancingPolicy.java @@ -19,6 +19,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; +import java.util.Map; /** Load balancing policy taking into account local datacenter of the application. */ public interface LocalDcAwareLoadBalancingPolicy extends LoadBalancingPolicy { @@ -27,7 +28,7 @@ public interface LocalDcAwareLoadBalancingPolicy extends LoadBalancingPolicy { @Nullable String getLocalDatacenter(); - /** Returns JSON string containing all properties that impact C* node connectivity. */ + /** Returns map containing details that impact C* node connectivity. */ @NonNull - String getStartupConfiguration(); + Map getStartupConfiguration(); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java b/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java index f64472761a8..aadfd2187e4 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java @@ -23,17 +23,17 @@ import com.datastax.oss.driver.api.core.loadbalancing.LocalDcAwareLoadBalancingPolicy; import com.datastax.oss.driver.api.core.session.Session; import com.datastax.oss.driver.api.core.uuid.Uuids; -import com.datastax.oss.driver.internal.core.util.Strings; -import com.datastax.oss.driver.shaded.guava.common.base.Joiner; -import com.datastax.oss.driver.shaded.guava.common.collect.Maps; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap; import com.datastax.oss.protocol.internal.request.Startup; import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap; +import com.fasterxml.jackson.databind.ObjectMapper; import edu.umd.cs.findbugs.annotations.Nullable; import java.util.Map; import java.util.Optional; import java.util.UUID; -import java.util.stream.Collectors; import net.jcip.annotations.Immutable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Immutable public class StartupOptionsBuilder { @@ -45,6 +45,9 @@ public class StartupOptionsBuilder { public static final String APPLICATION_VERSION_KEY = "APPLICATION_VERSION"; public static final String CLIENT_ID_KEY = "CLIENT_ID"; + private static final Logger LOG = LoggerFactory.getLogger(StartupOptionsBuilder.class); + private static final ObjectMapper mapper = new ObjectMapper(); + protected final InternalDriverContext context; private UUID clientId; private String applicationName; @@ -154,21 +157,20 @@ protected String getDriverVersion() { } private Optional driverBaggage() { - Joiner joiner = Joiner.on(": "); - Map> lbpToBag = - Maps.transformValues(context.getLoadBalancingPolicies(), this::getDriverBaggage); - return Optional.of( - "{" - + lbpToBag.entrySet().stream() - .filter(e -> e.getValue().isPresent()) - .map( - entry -> - joiner.join(Strings.doubleQuote(entry.getKey()), entry.getValue().get())) - .collect(Collectors.joining(", ")) - + "}"); + ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); + for (Map.Entry entry : + context.getLoadBalancingPolicies().entrySet()) { + getDriverBaggage(entry.getValue()).ifPresent(baggage -> builder.put(entry.getKey(), baggage)); + } + try { + return Optional.of(mapper.writeValueAsString(builder.build())); + } catch (Exception e) { + LOG.warn("Failed to construct startup driver baggage", e); + return Optional.empty(); + } } - private Optional getDriverBaggage(LoadBalancingPolicy loadBalancingPolicy) { + private Optional> getDriverBaggage(LoadBalancingPolicy loadBalancingPolicy) { if (loadBalancingPolicy instanceof LocalDcAwareLoadBalancingPolicy) { LocalDcAwareLoadBalancingPolicy dcAwareLbp = (LocalDcAwareLoadBalancingPolicy) loadBalancingPolicy; diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java index 777fa66ce3e..3819c6c6b37 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java @@ -41,12 +41,12 @@ import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.NodeSet; import com.datastax.oss.driver.internal.core.loadbalancing.nodeset.SingleDcNodeSet; import com.datastax.oss.driver.internal.core.util.ArrayUtils; -import com.datastax.oss.driver.internal.core.util.Strings; import com.datastax.oss.driver.internal.core.util.collection.CompositeQueryPlan; import com.datastax.oss.driver.internal.core.util.collection.LazyQueryPlan; import com.datastax.oss.driver.internal.core.util.collection.QueryPlan; import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan; import com.datastax.oss.driver.shaded.guava.common.base.Predicates; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap; import com.datastax.oss.driver.shaded.guava.common.collect.Lists; import com.datastax.oss.driver.shaded.guava.common.collect.Sets; import edu.umd.cs.findbugs.annotations.NonNull; @@ -63,7 +63,6 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntUnaryOperator; -import java.util.stream.Collectors; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -165,43 +164,19 @@ public String getLocalDatacenter() { @NonNull @Override - public String getStartupConfiguration() { - StringBuilder builder = new StringBuilder(); - builder - .append("{") - .append(Strings.doubleQuote(BasicLoadBalancingPolicy.class.getSimpleName())) - .append(":") - .append("{") - .append(Strings.doubleQuote("localDc")) - .append(":") - .append(Strings.doubleQuoteNullable(localDc)); + public Map getStartupConfiguration() { + ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); + builder.put("localDc", localDc); if (!preferredRemoteDcs.isEmpty()) { - builder - .append(",") - .append(Strings.doubleQuote("preferredRemoteDcs")) - .append(":[") - .append( - preferredRemoteDcs.stream() - .map(Strings::doubleQuote) - .collect(Collectors.joining(", "))) - .append("]"); + builder.put("preferredRemoteDcs", preferredRemoteDcs); } if (allowDcFailoverForLocalCl) { - builder - .append(",") - .append(Strings.doubleQuote("allowDcFailoverForLocalCl")) - .append(":") - .append(allowDcFailoverForLocalCl); + builder.put("allowDcFailoverForLocalCl", allowDcFailoverForLocalCl); } if (maxNodesPerRemoteDc > 0) { - builder - .append(",") - .append(Strings.doubleQuote("maxNodesPerRemoteDc")) - .append(":") - .append(maxNodesPerRemoteDc); + builder.put("maxNodesPerRemoteDc", maxNodesPerRemoteDc); } - builder.append("}}"); - return builder.toString(); + return ImmutableMap.of(BasicLoadBalancingPolicy.class.getSimpleName(), builder.build()); } /** @return The nodes currently considered as live. */ diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java index 533239a6d4e..9c31b606f18 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java @@ -34,6 +34,7 @@ import com.datastax.oss.driver.internal.core.util.collection.QueryPlan; import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan; import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap; import com.datastax.oss.driver.shaded.guava.common.collect.MapMaker; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; @@ -353,12 +354,10 @@ private boolean hasSufficientResponses(long now) { @NonNull @Override - public String getStartupConfiguration() { - String result = super.getStartupConfiguration(); - result = - result.replaceFirst( - BasicLoadBalancingPolicy.class.getSimpleName(), - DefaultLoadBalancingPolicy.class.getSimpleName()); - return result; + public Map getStartupConfiguration() { + Map parent = super.getStartupConfiguration(); + return ImmutableMap.of( + DefaultLoadBalancingPolicy.class.getSimpleName(), + parent.get(BasicLoadBalancingPolicy.class.getSimpleName())); } } diff --git a/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java b/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java index 15e4bcbfc60..4e885a77149 100644 --- a/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java +++ b/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java @@ -248,7 +248,7 @@ public void should_build_startup_options_with_all_options() { .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version") .containsEntry( StartupOptionsBuilder.DRIVER_BAGGAGE, - "{\"default\": {\"DefaultLoadBalancingPolicy\":{\"localDc\":\"dc6\"}}}"); + "{\"default\":{\"DefaultLoadBalancingPolicy\":{\"localDc\":\"dc6\"}}}"); assertThat(startup.options).containsEntry(Startup.COMPRESSION_KEY, "snappy"); assertDefaultStartupOptions(startup); } @@ -288,7 +288,7 @@ public void should_ignore_configuration_when_programmatic_values_provided() { .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version") .containsEntry( StartupOptionsBuilder.DRIVER_BAGGAGE, - "{\"default\": {\"DefaultLoadBalancingPolicy\":{\"localDc\":\"us-west-2\"}}}"); + "{\"default\":{\"DefaultLoadBalancingPolicy\":{\"localDc\":\"us-west-2\"}}}"); } @Test @@ -309,9 +309,9 @@ public void should_include_all_local_dc_in_startup_message() { .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version") .containsEntry( StartupOptionsBuilder.DRIVER_BAGGAGE, - "{\"default\": {\"DefaultLoadBalancingPolicy\":{\"localDc\":\"us-west-2\"}}, " - + "\"oltp\": {\"DefaultLoadBalancingPolicy\":{\"localDc\":\"us-east-2\"}}, " - + "\"olap\": {\"DefaultLoadBalancingPolicy\":{\"localDc\":\"eu-central-1\"}}}"); + "{\"default\":{\"DefaultLoadBalancingPolicy\":{\"localDc\":\"us-west-2\"}}," + + "\"oltp\":{\"DefaultLoadBalancingPolicy\":{\"localDc\":\"us-east-2\"}}," + + "\"olap\":{\"DefaultLoadBalancingPolicy\":{\"localDc\":\"eu-central-1\"}}}"); } @Test @@ -339,9 +339,9 @@ public Map getLoadBalancingPolicies() { assertThat(startup.options) .containsEntry( StartupOptionsBuilder.DRIVER_BAGGAGE, - "{\"oltp\": {\"DefaultLoadBalancingPolicy\":{" + "{\"oltp\":{\"DefaultLoadBalancingPolicy\":{" + "\"localDc\":\"dc1\"," - + "\"preferredRemoteDcs\":[\"dc2\", \"dc3\"]," + + "\"preferredRemoteDcs\":[\"dc2\",\"dc3\"]," + "\"allowDcFailoverForLocalCl\":true," + "\"maxNodesPerRemoteDc\":2}}}"); }