Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.CASSANDRA_SOURCE_TYPE;
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.MYSQL_SOURCE_TYPE;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.google.cloud.Timestamp;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
Expand All @@ -38,9 +41,14 @@
import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerSchema;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraConfigFileReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraDriverConfigLoader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SessionFileReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardFileReader;
import com.google.cloud.teleport.v2.spanner.sourceddl.CassandraInformationSchemaScanner;
import com.google.cloud.teleport.v2.spanner.sourceddl.MySqlInformationSchemaScanner;
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchemaScanner;
import com.google.cloud.teleport.v2.templates.SpannerToSourceDb.Options;
import com.google.cloud.teleport.v2.templates.changestream.TrimmedShardedDataChangeRecord;
import com.google.cloud.teleport.v2.templates.constants.Constants;
Expand All @@ -56,6 +64,10 @@
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -551,6 +563,9 @@ public static PipelineResult run(Options options) {
LOG.info("Cassandra config is: {}", shards.get(0));
shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD;
}
SourceSchema sourceSchema = fetchSourceSchema(options, shards);
LOG.info("Source schema: {}", sourceSchema);

if (shards.size() == 1 && !options.getIsShardedMigration()) {
shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD;
Shard shard = shards.get(0);
Expand All @@ -562,6 +577,7 @@ public static PipelineResult run(Options options) {
}

if (options.getSourceType().equals(CASSANDRA_SOURCE_TYPE)) {
// TODO: Remove this once session file support is removed.
Map<String, SpannerTable> spannerTableMap =
SpannerSchema.convertDDLTableToSpannerTable(ddl.allTables());
Map<String, NameAndCols> spannerTableNameColsMap =
Expand Down Expand Up @@ -668,6 +684,7 @@ public static PipelineResult run(Options options) {
spannerConfig,
schema,
ddl,
sourceSchema,
shardingMode,
shards.get(0).getLogicalShardId(),
options.getSkipDirectoryName(),
Expand All @@ -689,6 +706,7 @@ public static PipelineResult run(Options options) {
spannerMetadataConfig,
options.getSourceDbTimezoneOffset(),
ddl,
sourceSchema,
options.getShadowTablePrefix(),
options.getSkipDirectoryName(),
connectionPoolSizePerWorker,
Expand Down Expand Up @@ -825,4 +843,58 @@ private static DeadLetterQueueManager buildDlqManager(Options options) {
return DeadLetterQueueManager.create(dlqDirectory, retryDlqUri, 0);
}
}

private static Connection createJdbcConnection(Shard shard) {
try {
String sourceConnectionUrl =
"jdbc:mysql://" + shard.getHost() + ":" + shard.getPort() + "/" + shard.getDbName();
HikariConfig config = new HikariConfig();
config.setJdbcUrl(sourceConnectionUrl);
config.setUsername(shard.getUserName());
config.setPassword(shard.getPassword());
config.setDriverClassName("com.mysql.cj.jdbc.Driver");
HikariDataSource ds = new HikariDataSource(config);
return ds.getConnection();
} catch (java.sql.SQLException e) {
LOG.error("Sql error while discovering mysql schema", e);
throw new RuntimeException(e);
}
}

/**
* Creates a {@link CqlSession} for the given {@link CassandraShard}.
*
* @param cassandraShard The shard containing connection details.
* @return A {@link CqlSession} instance.
*/
private static CqlSession createCqlSession(CassandraShard cassandraShard) {
CqlSessionBuilder builder = CqlSession.builder();
DriverConfigLoader configLoader =
CassandraDriverConfigLoader.fromOptionsMap(cassandraShard.getOptionsMap());
builder.withConfigLoader(configLoader);
return builder.build();
}

private static SourceSchema fetchSourceSchema(Options options, List<Shard> shards) {
SourceSchemaScanner scanner = null;
SourceSchema sourceSchema = null;
try {
if (options.getSourceType().equals(MYSQL_SOURCE_TYPE)) {
Connection connection = createJdbcConnection(shards.get(0));
scanner = new MySqlInformationSchemaScanner(connection, shards.get(0).getDbName());
sourceSchema = scanner.scan();
connection.close();
} else {
try (CqlSession session = createCqlSession((CassandraShard) shards.get(0))) {
scanner =
new CassandraInformationSchemaScanner(
session, ((CassandraShard) shards.get(0)).getKeySpaceName());
sourceSchema = scanner.scan();
}
}
} catch (SQLException e) {
throw new RuntimeException("Unable to discover jdbc schema", e);
}
return sourceSchema;
}
}
Loading
Loading