Skip to content

Upgraded to cassandra driver 4.0 #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@
<artifactId>cassandra-driver-core</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-query-builder</artifactId>
<version>4.14.0</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.14.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down Expand Up @@ -175,6 +185,14 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@
import com.contrastsecurity.cassandra.migration.config.ScriptsLocations;
import com.contrastsecurity.cassandra.migration.dao.SchemaVersionDAO;
import com.contrastsecurity.cassandra.migration.info.MigrationInfoService;
import com.contrastsecurity.cassandra.migration.info.MigrationVersion;
import com.contrastsecurity.cassandra.migration.logging.Log;
import com.contrastsecurity.cassandra.migration.logging.LogFactory;
import com.contrastsecurity.cassandra.migration.resolver.CompositeMigrationResolver;
import com.contrastsecurity.cassandra.migration.resolver.MigrationResolver;
import com.contrastsecurity.cassandra.migration.utils.VersionPrinter;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.Session;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;

import java.util.List;
import java.net.InetSocketAddress;
import java.util.Collection;

public class CassandraMigration {

Expand All @@ -29,12 +30,19 @@ public class CassandraMigration {
private ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
private Keyspace keyspace;
private MigrationConfigs configs;
private CqlSession session;

public CassandraMigration() {
this.keyspace = new Keyspace();
this.configs = new MigrationConfigs();
}

public CassandraMigration(CqlSession session, MigrationConfigs configs) {
this.session = session;
this.keyspace = configs.getKeyspace();
this.configs = configs;
}

public ClassLoader getClassLoader() {
return classLoader;
}
Expand Down Expand Up @@ -66,11 +74,11 @@ private MigrationResolver createMigrationResolver() {

public int migrate() {
return execute(new Action<Integer>() {
public Integer execute(Session session) {
new Initialize().run(session, keyspace, MigrationVersion.CURRENT.getTable());
public Integer execute(CqlSession session) {
new Initialize().run(session, keyspace);

MigrationResolver migrationResolver = createMigrationResolver();
SchemaVersionDAO schemaVersionDAO = new SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.getTable());
SchemaVersionDAO schemaVersionDAO = new SchemaVersionDAO(session, keyspace);
Migrate migrate = new Migrate(migrationResolver, configs.getTarget(), schemaVersionDAO, session,
keyspace.getCluster().getUsername(), configs.isAllowOutOfOrder());

Expand All @@ -81,9 +89,9 @@ public Integer execute(Session session) {

public MigrationInfoService info() {
return execute(new Action<MigrationInfoService>() {
public MigrationInfoService execute(Session session) {
public MigrationInfoService execute(CqlSession session) {
MigrationResolver migrationResolver = createMigrationResolver();
SchemaVersionDAO schemaVersionDAO = new SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.getTable());
SchemaVersionDAO schemaVersionDAO = new SchemaVersionDAO(session, keyspace);
MigrationInfoService migrationInfoService =
new MigrationInfoService(migrationResolver, schemaVersionDAO, configs.getTarget(), false, true);
migrationInfoService.refresh();
Expand All @@ -94,21 +102,21 @@ public MigrationInfoService execute(Session session) {
}

public void validate() {
String validationError = execute(new Action<String>() {
@Override
public String execute(Session session) {
MigrationResolver migrationResolver = createMigrationResolver();
SchemaVersionDAO schemaVersionDao = new SchemaVersionDAO(session, keyspace, MigrationVersion.CURRENT.getTable());
Validate validate = new Validate(migrationResolver, schemaVersionDao, configs.getTarget(), true, false);
return validate.run();
}
});
if (validationError != null) {
throw new CassandraMigrationException("Validation failed. " + validationError);
}
String validationError = execute(new Action<String>() {
@Override
public String execute(CqlSession session) {
MigrationResolver migrationResolver = createMigrationResolver();
SchemaVersionDAO schemaVersionDao = new SchemaVersionDAO(session, keyspace);
Validate validate = new Validate(migrationResolver, schemaVersionDao, configs.getTarget(), true, false);
return validate.run();
}
});

if (validationError != null) {
throw new CassandraMigrationException("Validation failed. " + validationError);
}
}

public void baseline() {
//TODO
throw new NotImplementedException();
Expand All @@ -119,11 +127,11 @@ private String getConnectionInfo(Metadata metadata) {
sb.append("Connected to cluster: ");
sb.append(metadata.getClusterName());
sb.append("\n");
for (Host host : metadata.getAllHosts()) {
for (Node node : metadata.getNodes().values()) {
sb.append("Data center: ");
sb.append(host.getDatacenter());
sb.append(node.getDatacenter());
sb.append("; Host: ");
sb.append(host.getAddress());
sb.append(node.getBroadcastAddress());
}
return sb.toString();
}
Expand All @@ -132,38 +140,46 @@ <T> T execute(Action<T> action) {
T result;

VersionPrinter.printVersion(classLoader);

com.datastax.driver.core.Cluster cluster = null;
Session session = null;
try {
if (null == keyspace)
throw new IllegalArgumentException("Unable to establish Cassandra session. Keyspace is not configured.");

if (null == keyspace.getCluster())
throw new IllegalArgumentException("Unable to establish Cassandra session. Cluster is not configured.");

com.datastax.driver.core.Cluster.Builder builder = new com.datastax.driver.core.Cluster.Builder();
builder.addContactPoints(keyspace.getCluster().getContactpoints()).withPort(keyspace.getCluster().getPort());
if (null != keyspace.getCluster().getUsername() && !keyspace.getCluster().getUsername().trim().isEmpty()) {
if (null != keyspace.getCluster().getPassword() && !keyspace.getCluster().getPassword().trim().isEmpty()) {
builder.withCredentials(keyspace.getCluster().getUsername(),
keyspace.getCluster().getPassword());
} else {
throw new IllegalArgumentException("Password must be provided with username.");

if (session == null) {

if (null == keyspace.getCluster())
throw new IllegalArgumentException("Unable to establish Cassandra session. Cluster is not configured.");

com.datastax.driver.core.Cluster.Builder builder = new com.datastax.driver.core.Cluster.Builder();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm performing an upgrage to cassandra 4 on my project too.
Do you have any reason to use com.datastax.driver.core.Cluster.Builder and not com.datastax.oss.driver.api.core.CqlSessionBuilder ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. There were more changes I just checked in all those files. Pls. take a look.

builder.addContactPoints(keyspace.getCluster().getContactpoints()).withPort(keyspace.getCluster().getPort());
if (null != keyspace.getCluster().getUsername() && !keyspace.getCluster().getUsername().trim().isEmpty()) {
if (null != keyspace.getCluster().getPassword() && !keyspace.getCluster().getPassword().trim().isEmpty()) {
builder.withCredentials(keyspace.getCluster().getUsername(),
keyspace.getCluster().getPassword());
} else {
throw new IllegalArgumentException("Password must be provided with username.");
}
}


CqlSessionBuilder cqlSessionBuilder = new CqlSessionBuilder()
.withKeyspace(keyspace.getName())
.withLocalDatacenter("datacenter1");
for (String contactPoint : keyspace.getCluster().getContactpoints()) {
cqlSessionBuilder.addContactPoint(new InetSocketAddress(contactPoint, keyspace.getCluster().getPort()));
}
session = cqlSessionBuilder.build();
}
cluster = builder.build();

Metadata metadata = cluster.getMetadata();
Metadata metadata = session.getMetadata();
LOG.info(getConnectionInfo(metadata));

session = cluster.newSession();
if (null == keyspace.getName() || keyspace.getName().trim().length() == 0)
throw new IllegalArgumentException("Keyspace not specified.");
List<KeyspaceMetadata> keyspaces = metadata.getKeyspaces();
Collection<KeyspaceMetadata> keyspaces = metadata.getKeyspaces().values();
boolean keyspaceExists = false;
for (KeyspaceMetadata keyspaceMetadata : keyspaces) {
if (keyspaceMetadata.getName().equalsIgnoreCase(keyspace.getName()))
if (keyspaceMetadata.getName().asInternal().equalsIgnoreCase(keyspace.getName()))
keyspaceExists = true;
}
if (keyspaceExists)
Expand All @@ -176,20 +192,14 @@ <T> T execute(Action<T> action) {
if (null != session && !session.isClosed())
try {
session.close();
} catch(Exception e) {
} catch (Exception e) {
LOG.warn("Error closing Cassandra session");
}
if (null != cluster && !cluster.isClosed())
try {
cluster.close();
} catch(Exception e) {
LOG.warn("Error closing Cassandra cluster");
}
}
return result;
}

interface Action<T> {
T execute(Session session);
T execute(CqlSession session);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import com.contrastsecurity.cassandra.migration.config.Keyspace;
import com.contrastsecurity.cassandra.migration.dao.SchemaVersionDAO;
import com.datastax.driver.core.Session;
import com.datastax.oss.driver.api.core.CqlSession;

public class Initialize {

public void run(Session session, Keyspace keyspace, String migrationVersionTableName) {
SchemaVersionDAO dao = new SchemaVersionDAO(session, keyspace, migrationVersionTableName);
public void run(CqlSession session, Keyspace keyspace) {
SchemaVersionDAO dao = new SchemaVersionDAO(session, keyspace);
dao.createTablesIfNotExist();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,20 @@
import com.contrastsecurity.cassandra.migration.utils.StopWatch;
import com.contrastsecurity.cassandra.migration.utils.TimeFormat;
import com.datastax.driver.core.Session;
import com.datastax.oss.driver.api.core.CqlSession;

public class Migrate {
private static final Log LOG = LogFactory.getLog(Migrate.class);

private final MigrationVersion target;
private final SchemaVersionDAO schemaVersionDAO;
private final MigrationResolver migrationResolver;
private final Session session;
private final CqlSession session;
private final String user;
private final boolean allowOutOfOrder;

public Migrate(MigrationResolver migrationResolver, MigrationVersion target, SchemaVersionDAO schemaVersionDAO,
Session session, String user, boolean allowOutOfOrder) {
CqlSession session, String user, boolean allowOutOfOrder) {
this.migrationResolver = migrationResolver;
this.schemaVersionDAO = schemaVersionDAO;
this.session = session;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.contrastsecurity.cassandra.migration.api;

import com.datastax.driver.core.Session;
import com.datastax.oss.driver.api.core.CqlSession;

public interface JavaMigration {
void migrate(Session session) throws Exception;
void migrate(CqlSession session) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
package com.contrastsecurity.cassandra.migration.config;


import com.contrastsecurity.cassandra.migration.utils.Ensure;

import static com.contrastsecurity.cassandra.migration.utils.Ensure.notNull;

/**
* This represents the definition of a key space and is basically
* a builder for the CQL statement that is required to create a keyspace
* before any migration can be executed.
*
* @author Patrick Kranz
*/
public class Keyspace {
private static final String PROPERTY_PREFIX = "cassandra.migration.keyspace.";

public enum KeyspaceProperty {
NAME(PROPERTY_PREFIX + "name", "Name of Cassandra keyspace");

private String name;
private String description;
private final String name;
private final String description;

KeyspaceProperty(String name, String description) {
this.name = name;
Expand All @@ -25,6 +37,8 @@ public String getDescription() {

private Cluster cluster;
private String name;
private boolean durableWrites;
private ReplicationStrategy replicationStrategy;

public Keyspace() {
cluster = new Cluster();
Expand All @@ -41,11 +55,48 @@ public void setCluster(Cluster cluster) {
this.cluster = cluster;
}

/**
* This creates a new instance of a keyspace using the provided keyspace name. It by default
* uses a {@link SimpleStrategy} for replication and sets durable writes to <code>true</code>.
* These default values can be overwritten by the provided methods.
*
* @param name the name of the keyspace to be used.
*/
public Keyspace(String name) {
this.name = Ensure.notNullOrEmpty(name, "keyspaceName");
this.replicationStrategy = new SimpleStrategy();
this.durableWrites = true;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Keyspace with(ReplicationStrategy replicationStrategy) {
this.replicationStrategy = notNull(replicationStrategy, "replicationStrategy");
return this;
}

public boolean isDurableWrites() {
return durableWrites;
}

public ReplicationStrategy getReplicationStrategy() {
return replicationStrategy;
}

public String getCqlStatement() {
StringBuilder builder = new StringBuilder(60);
builder.append("CREATE KEYSPACE IF NOT EXISTS ")
.append(getName())
.append(" WITH REPLICATION = ")
.append(getReplicationStrategy().createCqlStatement())
.append(" AND DURABLE_WRITES = ")
.append(Boolean.toString(isDurableWrites()));
return builder.toString();
}
}
Loading