Skip to content

Commit c89ae9d

Browse files
committed
Add support for connecting with Scylla Cloud clusters
Allows driver to build sessions with Scylla Cloud yaml configuration files and to establish connection with Scylla Cloud clusters. Takes care of control connection random node issue by replacing the endpoint after the first connection with one that points to a specific node in a slightly hacky way.
1 parent c549c25 commit c89ae9d

File tree

6 files changed

+166
-45
lines changed

6 files changed

+166
-45
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/session/ProgrammaticArguments.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public static Builder builder() {
6464
private final AuthProvider authProvider;
6565
private final SslEngineFactory sslEngineFactory;
6666
private final InetSocketAddress cloudProxyAddress;
67+
private final String scyllaCloudNodeDomain;
6768
private final UUID startupClientId;
6869
private final String startupApplicationName;
6970
private final String startupApplicationVersion;
@@ -82,6 +83,7 @@ private ProgrammaticArguments(
8283
@Nullable AuthProvider authProvider,
8384
@Nullable SslEngineFactory sslEngineFactory,
8485
@Nullable InetSocketAddress cloudProxyAddress,
86+
@Nullable String scyllaCloudNodeDomain,
8587
@Nullable UUID startupClientId,
8688
@Nullable String startupApplicationName,
8789
@Nullable String startupApplicationVersion,
@@ -99,6 +101,7 @@ private ProgrammaticArguments(
99101
this.authProvider = authProvider;
100102
this.sslEngineFactory = sslEngineFactory;
101103
this.cloudProxyAddress = cloudProxyAddress;
104+
this.scyllaCloudNodeDomain = scyllaCloudNodeDomain;
102105
this.startupClientId = startupClientId;
103106
this.startupApplicationName = startupApplicationName;
104107
this.startupApplicationVersion = startupApplicationVersion;
@@ -163,6 +166,11 @@ public InetSocketAddress getCloudProxyAddress() {
163166
return cloudProxyAddress;
164167
}
165168

169+
@Nullable
170+
public String getScyllaCloudNodeDomain() {
171+
return scyllaCloudNodeDomain;
172+
}
173+
166174
@Nullable
167175
public UUID getStartupClientId() {
168176
return startupClientId;
@@ -203,6 +211,7 @@ public static class Builder {
203211
private AuthProvider authProvider;
204212
private SslEngineFactory sslEngineFactory;
205213
private InetSocketAddress cloudProxyAddress;
214+
private String scyllaCloudNodeDomain;
206215
private UUID startupClientId;
207216
private String startupApplicationName;
208217
private String startupApplicationVersion;
@@ -366,6 +375,14 @@ public Builder withCloudProxyAddress(@Nullable InetSocketAddress cloudAddress) {
366375
return this;
367376
}
368377

378+
@NonNull
379+
public Builder withScyllaCloudProxyAddress(
380+
@Nullable InetSocketAddress cloudAddress, String scyllaCloudNodeDomain) {
381+
this.cloudProxyAddress = cloudAddress;
382+
this.scyllaCloudNodeDomain = scyllaCloudNodeDomain;
383+
return this;
384+
}
385+
369386
@NonNull
370387
public Builder withAuthProvider(@Nullable AuthProvider authProvider) {
371388
this.authProvider = authProvider;
@@ -422,6 +439,7 @@ public ProgrammaticArguments build() {
422439
authProvider,
423440
sslEngineFactory,
424441
cloudProxyAddress,
442+
scyllaCloudNodeDomain,
425443
startupClientId,
426444
startupApplicationName,
427445
startupApplicationVersion,

core/src/main/java/com/datastax/oss/driver/api/core/session/SessionBuilder.java

Lines changed: 50 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -44,23 +44,24 @@
4444
import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry;
4545
import com.datastax.oss.driver.api.core.uuid.Uuids;
4646
import com.datastax.oss.driver.internal.core.ContactPoints;
47-
import com.datastax.oss.driver.internal.core.config.cloud.CloudConfig;
48-
import com.datastax.oss.driver.internal.core.config.cloud.CloudConfigFactory;
47+
import com.datastax.oss.driver.internal.core.config.scyllacloud.ConfigurationBundle;
48+
import com.datastax.oss.driver.internal.core.config.scyllacloud.ScyllaCloudConnectionConfig;
4949
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader;
5050
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
5151
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
5252
import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint;
53+
import com.datastax.oss.driver.internal.core.metadata.SniEndPoint;
5354
import com.datastax.oss.driver.internal.core.session.DefaultSession;
5455
import com.datastax.oss.driver.internal.core.util.concurrent.BlockingOperation;
5556
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
57+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
5658
import edu.umd.cs.findbugs.annotations.NonNull;
5759
import edu.umd.cs.findbugs.annotations.Nullable;
5860
import java.io.InputStream;
5961
import java.net.InetSocketAddress;
6062
import java.net.MalformedURLException;
6163
import java.net.URL;
6264
import java.nio.file.Path;
63-
import java.nio.file.Paths;
6465
import java.util.Collection;
6566
import java.util.Collections;
6667
import java.util.HashSet;
@@ -96,6 +97,7 @@ public abstract class SessionBuilder<SelfT extends SessionBuilder, SessionT> {
9697
protected Set<EndPoint> programmaticContactPoints = new HashSet<>();
9798
protected CqlIdentifier keyspace;
9899
protected Callable<InputStream> cloudConfigInputStream;
100+
protected Callable<InputStream> scyllaCloudConfigInputStream;
99101

100102
protected ProgrammaticArguments.Builder programmaticArgumentsBuilder =
101103
ProgrammaticArguments.builder();
@@ -655,6 +657,17 @@ public SelfT withCloudSecureConnectBundle(@NonNull Path cloudConfigPath) {
655657
return self;
656658
}
657659

660+
@NonNull
661+
public SelfT withScyllaCloudSecureConnectBundle(@NonNull Path cloudConfigPath) {
662+
try {
663+
URL cloudConfigUrl = cloudConfigPath.toAbsolutePath().normalize().toUri().toURL();
664+
this.scyllaCloudConfigInputStream = cloudConfigUrl::openStream;
665+
} catch (MalformedURLException e) {
666+
throw new IllegalArgumentException("Incorrect format of cloudConfigPath", e);
667+
}
668+
return self;
669+
}
670+
658671
/**
659672
* Registers a CodecRegistry to use for the session.
660673
*
@@ -687,6 +700,12 @@ public SelfT withCloudSecureConnectBundle(@NonNull URL cloudConfigUrl) {
687700
return self;
688701
}
689702

703+
@NonNull
704+
public SelfT withScyllaCloudSecureConnectBundle(@NonNull URL cloudConfigUrl) {
705+
this.scyllaCloudConfigInputStream = cloudConfigUrl::openStream;
706+
return self;
707+
}
708+
690709
/**
691710
* Configures this SessionBuilder for Cloud deployments by retrieving connection information from
692711
* the provided {@link InputStream}.
@@ -711,6 +730,12 @@ public SelfT withCloudSecureConnectBundle(@NonNull InputStream cloudConfigInputS
711730
return self;
712731
}
713732

733+
@NonNull
734+
public SelfT withScyllaCloudSecureConnectBundle(@NonNull InputStream cloudConfigInputStream) {
735+
this.scyllaCloudConfigInputStream = () -> cloudConfigInputStream;
736+
return self;
737+
}
738+
714739
/**
715740
* Configures this SessionBuilder to use the provided Cloud proxy endpoint.
716741
*
@@ -733,6 +758,13 @@ public SelfT withCloudProxyAddress(@Nullable InetSocketAddress cloudProxyAddress
733758
return self;
734759
}
735760

761+
@NonNull
762+
public SelfT withScyllaCloudProxyAddress(
763+
@Nullable InetSocketAddress cloudProxyAddress, String nodeDomain) {
764+
this.programmaticArgumentsBuilder.withScyllaCloudProxyAddress(cloudProxyAddress, nodeDomain);
765+
return self;
766+
}
767+
736768
/**
737769
* A unique identifier for the created session.
738770
*
@@ -857,16 +889,9 @@ protected final CompletionStage<CqlSession> buildDefaultSessionAsync() {
857889
: defaultConfigLoader(programmaticArguments.getClassLoader());
858890

859891
DriverExecutionProfile defaultConfig = configLoader.getInitialConfig().getDefaultProfile();
860-
if (cloudConfigInputStream == null) {
861-
String configUrlString =
862-
defaultConfig.getString(DefaultDriverOption.CLOUD_SECURE_CONNECT_BUNDLE, null);
863-
if (configUrlString != null) {
864-
cloudConfigInputStream = () -> getURL(configUrlString).openStream();
865-
}
866-
}
867892
List<String> configContactPoints =
868893
defaultConfig.getStringList(DefaultDriverOption.CONTACT_POINTS, Collections.emptyList());
869-
if (cloudConfigInputStream != null) {
894+
if (scyllaCloudConfigInputStream != null) {
870895
if (!programmaticContactPoints.isEmpty() || !configContactPoints.isEmpty()) {
871896
LOG.info(
872897
"Both a secure connect bundle and contact points were provided. These are mutually exclusive. The contact points from the secure bundle will have priority.");
@@ -880,20 +905,27 @@ protected final CompletionStage<CqlSession> buildDefaultSessionAsync() {
880905
LOG.info(
881906
"Both a secure connect bundle and SSL options were provided. They are mutually exclusive. The SSL options from the secure bundle will have priority.");
882907
}
883-
CloudConfig cloudConfig =
884-
new CloudConfigFactory().createCloudConfig(cloudConfigInputStream.call());
885-
addContactEndPoints(cloudConfig.getEndPoints());
908+
ScyllaCloudConnectionConfig cloudConfig =
909+
ScyllaCloudConnectionConfig.fromInputStream(scyllaCloudConfigInputStream.call());
910+
InetSocketAddress proxyAddress = cloudConfig.getCurrentDatacenter().getServer();
911+
addContactEndPoints(
912+
ImmutableList.of(
913+
new SniEndPoint(proxyAddress, cloudConfig.getCurrentDatacenter().getNodeDomain())));
886914

887915
boolean localDataCenterDefined =
888916
anyProfileHasDatacenterDefined(configLoader.getInitialConfig());
889917
if (programmaticLocalDatacenter || localDataCenterDefined) {
890918
LOG.info(
891-
"Both a secure connect bundle and a local datacenter were provided. They are mutually exclusive. The local datacenter from the secure bundle will have priority.");
919+
"Both a secure connect bundle and a local datacenter were provided. They are mutually exclusive. The currentContext datacenter name from the secure bundle will be ignored.");
920+
} else {
892921
programmaticArgumentsBuilder.clearDatacenters();
922+
withLocalDatacenter(cloudConfig.getCurrentContext().getDatacenterName());
893923
}
894-
withLocalDatacenter(cloudConfig.getLocalDatacenter());
895-
withSslEngineFactory(cloudConfig.getSslEngineFactory());
896-
withCloudProxyAddress(cloudConfig.getProxyAddress());
924+
ConfigurationBundle bundle = cloudConfig.createBundle();
925+
withSslEngineFactory(bundle.getSSLEngineFactory());
926+
withScyllaCloudProxyAddress(
927+
proxyAddress, cloudConfig.getCurrentDatacenter().getNodeDomain());
928+
897929
programmaticArguments = programmaticArgumentsBuilder.build();
898930
}
899931

@@ -912,7 +944,6 @@ protected final CompletionStage<CqlSession> buildDefaultSessionAsync() {
912944
(InternalDriverContext) buildContext(configLoader, programmaticArguments),
913945
contactPoints,
914946
keyspace);
915-
916947
} catch (Throwable t) {
917948
// We construct the session synchronously (until the init() call), but async clients expect a
918949
// failed future if anything goes wrong. So wrap any error from that synchronous part.
@@ -929,27 +960,6 @@ private boolean anyProfileHasDatacenterDefined(DriverConfig driverConfig) {
929960
return false;
930961
}
931962

932-
/**
933-
* Returns URL based on the configUrl setting. If the configUrl has no protocol provided, the
934-
* method will fallback to file:// protocol and return URL that has file protocol specified.
935-
*
936-
* @param configUrl url to config secure bundle
937-
* @return URL with file protocol if there was not explicit protocol provided in the configUrl
938-
* setting
939-
*/
940-
private URL getURL(String configUrl) throws MalformedURLException {
941-
try {
942-
return new URL(configUrl);
943-
} catch (MalformedURLException e1) {
944-
try {
945-
return Paths.get(configUrl).toAbsolutePath().normalize().toUri().toURL();
946-
} catch (MalformedURLException e2) {
947-
e2.addSuppressed(e1);
948-
throw e2;
949-
}
950-
}
951-
}
952-
953963
/**
954964
* This <b>must</b> return an instance of {@code InternalDriverContext} (it's not expressed
955965
* directly in the signature to avoid leaking that type through the protected API).

core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public class DriverChannel {
6969
@SuppressWarnings("RedundantStringConstructorCall")
7070
static final Object FORCEFUL_CLOSE_MESSAGE = new String("FORCEFUL_CLOSE_MESSAGE");
7171

72-
private final EndPoint endPoint;
72+
private EndPoint endPoint;
7373
private final Channel channel;
7474
private final InFlightHandler inFlightHandler;
7575
private final WriteCoalescer writeCoalescer;
@@ -326,4 +326,9 @@ public SetKeyspaceEvent(CqlIdentifier keyspaceName, Promise<Void> promise) {
326326
this.promise = promise;
327327
}
328328
}
329+
330+
// Necessary for swapping ControlConnection endpoint when connecting with serverless clusters
331+
public void setEndPoint(EndPoint endPoint) {
332+
this.endPoint = endPoint;
333+
}
329334
}

core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import com.datastax.oss.driver.internal.core.metadata.MetadataManager;
6161
import com.datastax.oss.driver.internal.core.metadata.MultiplexingNodeStateListener;
6262
import com.datastax.oss.driver.internal.core.metadata.NoopNodeStateListener;
63+
import com.datastax.oss.driver.internal.core.metadata.ScyllaCloudTopologyMonitor;
6364
import com.datastax.oss.driver.internal.core.metadata.TopologyMonitor;
6465
import com.datastax.oss.driver.internal.core.metadata.schema.MultiplexingSchemaChangeListener;
6566
import com.datastax.oss.driver.internal.core.metadata.schema.NoopSchemaChangeListener;
@@ -236,6 +237,7 @@ public class DefaultDriverContext implements InternalDriverContext {
236237
private final Map<String, NodeDistanceEvaluator> nodeDistanceEvaluatorsFromBuilder;
237238
private final ClassLoader classLoader;
238239
private final InetSocketAddress cloudProxyAddress;
240+
private final String scyllaCloudNodeDomain;
239241
private final LazyReference<RequestLogFormatter> requestLogFormatterRef =
240242
new LazyReference<>("requestLogFormatter", this::buildRequestLogFormatter, cycleDetector);
241243
private final UUID startupClientId;
@@ -291,6 +293,7 @@ public DefaultDriverContext(
291293
this.nodeDistanceEvaluatorsFromBuilder = programmaticArguments.getNodeDistanceEvaluators();
292294
this.classLoader = programmaticArguments.getClassLoader();
293295
this.cloudProxyAddress = programmaticArguments.getCloudProxyAddress();
296+
this.scyllaCloudNodeDomain = programmaticArguments.getScyllaCloudNodeDomain();
294297
this.startupClientId = programmaticArguments.getStartupClientId();
295298
this.startupApplicationName = programmaticArguments.getStartupApplicationName();
296299
this.startupApplicationVersion = programmaticArguments.getStartupApplicationVersion();
@@ -492,6 +495,9 @@ protected TopologyMonitor buildTopologyMonitor() {
492495
if (cloudProxyAddress == null) {
493496
return new DefaultTopologyMonitor(this);
494497
}
498+
if (scyllaCloudNodeDomain != null) {
499+
return new ScyllaCloudTopologyMonitor(this, cloudProxyAddress, scyllaCloudNodeDomain);
500+
}
495501
return new CloudTopologyMonitor(this, cloudProxyAddress);
496502
}
497503

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,11 @@ public class DefaultTopologyMonitor implements TopologyMonitor {
6868
private static final int INFINITE_PAGE_SIZE = -1;
6969

7070
private final String logPrefix;
71-
private final InternalDriverContext context;
72-
private final ControlConnection controlConnection;
71+
protected final InternalDriverContext context;
72+
protected final ControlConnection controlConnection;
7373
private final Duration timeout;
74-
private final boolean reconnectOnInit;
75-
private final CompletableFuture<Void> closeFuture;
74+
protected final boolean reconnectOnInit;
75+
protected final CompletableFuture<Void> closeFuture;
7676

7777
@VisibleForTesting volatile boolean isSchemaV2;
7878
@VisibleForTesting volatile int port = -1;

0 commit comments

Comments
 (0)