Skip to content

Commit ce12473

Browse files
authored
Refactor template to avoid using session file for type info (#2529)
* Use tmp cql session builder * Refactor code to avoid session file for anything other than name mappings * Rebase onto sourceddl branch with resolved comments * Move schema utils to test folder and resolve comments
1 parent 9a98cb4 commit ce12473

File tree

17 files changed

+3051
-2101
lines changed

17 files changed

+3051
-2101
lines changed

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDb.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.CASSANDRA_SOURCE_TYPE;
1919
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.MYSQL_SOURCE_TYPE;
2020

21+
import com.datastax.oss.driver.api.core.CqlSession;
22+
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
23+
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
2124
import com.google.cloud.Timestamp;
2225
import com.google.cloud.teleport.metadata.Template;
2326
import com.google.cloud.teleport.metadata.TemplateCategory;
@@ -38,9 +41,14 @@
3841
import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerSchema;
3942
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
4043
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraConfigFileReader;
44+
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraDriverConfigLoader;
4145
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
4246
import com.google.cloud.teleport.v2.spanner.migrations.utils.SessionFileReader;
4347
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardFileReader;
48+
import com.google.cloud.teleport.v2.spanner.sourceddl.CassandraInformationSchemaScanner;
49+
import com.google.cloud.teleport.v2.spanner.sourceddl.MySqlInformationSchemaScanner;
50+
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;
51+
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchemaScanner;
4452
import com.google.cloud.teleport.v2.templates.SpannerToSourceDb.Options;
4553
import com.google.cloud.teleport.v2.templates.changestream.TrimmedShardedDataChangeRecord;
4654
import com.google.cloud.teleport.v2.templates.constants.Constants;
@@ -56,6 +64,10 @@
5664
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
5765
import com.google.cloud.teleport.v2.values.FailsafeElement;
5866
import com.google.common.base.Strings;
67+
import com.zaxxer.hikari.HikariConfig;
68+
import com.zaxxer.hikari.HikariDataSource;
69+
import java.sql.Connection;
70+
import java.sql.SQLException;
5971
import java.util.ArrayList;
6072
import java.util.Arrays;
6173
import java.util.List;
@@ -551,6 +563,9 @@ public static PipelineResult run(Options options) {
551563
LOG.info("Cassandra config is: {}", shards.get(0));
552564
shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD;
553565
}
566+
SourceSchema sourceSchema = fetchSourceSchema(options, shards);
567+
LOG.info("Source schema: {}", sourceSchema);
568+
554569
if (shards.size() == 1 && !options.getIsShardedMigration()) {
555570
shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD;
556571
Shard shard = shards.get(0);
@@ -562,6 +577,7 @@ public static PipelineResult run(Options options) {
562577
}
563578

564579
if (options.getSourceType().equals(CASSANDRA_SOURCE_TYPE)) {
580+
// TODO: Remove this once session file support is removed.
565581
Map<String, SpannerTable> spannerTableMap =
566582
SpannerSchema.convertDDLTableToSpannerTable(ddl.allTables());
567583
Map<String, NameAndCols> spannerTableNameColsMap =
@@ -668,6 +684,7 @@ public static PipelineResult run(Options options) {
668684
spannerConfig,
669685
schema,
670686
ddl,
687+
sourceSchema,
671688
shardingMode,
672689
shards.get(0).getLogicalShardId(),
673690
options.getSkipDirectoryName(),
@@ -689,6 +706,7 @@ public static PipelineResult run(Options options) {
689706
spannerMetadataConfig,
690707
options.getSourceDbTimezoneOffset(),
691708
ddl,
709+
sourceSchema,
692710
options.getShadowTablePrefix(),
693711
options.getSkipDirectoryName(),
694712
connectionPoolSizePerWorker,
@@ -825,4 +843,58 @@ private static DeadLetterQueueManager buildDlqManager(Options options) {
825843
return DeadLetterQueueManager.create(dlqDirectory, retryDlqUri, 0);
826844
}
827845
}
846+
847+
private static Connection createJdbcConnection(Shard shard) {
848+
try {
849+
String sourceConnectionUrl =
850+
"jdbc:mysql://" + shard.getHost() + ":" + shard.getPort() + "/" + shard.getDbName();
851+
HikariConfig config = new HikariConfig();
852+
config.setJdbcUrl(sourceConnectionUrl);
853+
config.setUsername(shard.getUserName());
854+
config.setPassword(shard.getPassword());
855+
config.setDriverClassName("com.mysql.cj.jdbc.Driver");
856+
HikariDataSource ds = new HikariDataSource(config);
857+
return ds.getConnection();
858+
} catch (java.sql.SQLException e) {
859+
LOG.error("Sql error while discovering mysql schema", e);
860+
throw new RuntimeException(e);
861+
}
862+
}
863+
864+
/**
865+
* Creates a {@link CqlSession} for the given {@link CassandraShard}.
866+
*
867+
* @param cassandraShard The shard containing connection details.
868+
* @return A {@link CqlSession} instance.
869+
*/
870+
private static CqlSession createCqlSession(CassandraShard cassandraShard) {
871+
CqlSessionBuilder builder = CqlSession.builder();
872+
DriverConfigLoader configLoader =
873+
CassandraDriverConfigLoader.fromOptionsMap(cassandraShard.getOptionsMap());
874+
builder.withConfigLoader(configLoader);
875+
return builder.build();
876+
}
877+
878+
private static SourceSchema fetchSourceSchema(Options options, List<Shard> shards) {
879+
SourceSchemaScanner scanner = null;
880+
SourceSchema sourceSchema = null;
881+
try {
882+
if (options.getSourceType().equals(MYSQL_SOURCE_TYPE)) {
883+
Connection connection = createJdbcConnection(shards.get(0));
884+
scanner = new MySqlInformationSchemaScanner(connection, shards.get(0).getDbName());
885+
sourceSchema = scanner.scan();
886+
connection.close();
887+
} else {
888+
try (CqlSession session = createCqlSession((CassandraShard) shards.get(0))) {
889+
scanner =
890+
new CassandraInformationSchemaScanner(
891+
session, ((CassandraShard) shards.get(0)).getKeySpaceName());
892+
sourceSchema = scanner.scan();
893+
}
894+
}
895+
} catch (SQLException e) {
896+
throw new RuntimeException("Unable to discover jdbc schema", e);
897+
}
898+
return sourceSchema;
899+
}
828900
}

0 commit comments

Comments
 (0)