Skip to content

Commit df11b8b

Browse files
committed
Read cassandra schema but not do anything
1 parent 80ad7f4 commit df11b8b

File tree

2 files changed

+70
-1
lines changed

2 files changed

+70
-1
lines changed

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDb.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.CASSANDRA_SOURCE_TYPE;
1919
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.MYSQL_SOURCE_TYPE;
2020

21+
import com.datastax.oss.driver.api.core.CqlSession;
2122
import com.google.cloud.Timestamp;
2223
import com.google.cloud.teleport.metadata.Template;
2324
import com.google.cloud.teleport.metadata.TemplateCategory;
@@ -41,9 +42,16 @@
4142
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
4243
import com.google.cloud.teleport.v2.spanner.migrations.utils.SessionFileReader;
4344
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardFileReader;
45+
import com.google.cloud.teleport.v2.spanner.sourceddl.CassandraInformationSchemaScanner;
46+
import com.google.cloud.teleport.v2.spanner.sourceddl.MySqlInformationSchemaScanner;
47+
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceInformationSchemaScanner;
48+
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;
4449
import com.google.cloud.teleport.v2.templates.SpannerToSourceDb.Options;
4550
import com.google.cloud.teleport.v2.templates.changestream.TrimmedShardedDataChangeRecord;
4651
import com.google.cloud.teleport.v2.templates.constants.Constants;
52+
import com.google.cloud.teleport.v2.templates.dbutils.processor.SourceProcessorFactory;
53+
import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
54+
import com.google.cloud.teleport.v2.templates.exceptions.UnsupportedSourceException;
4755
import com.google.cloud.teleport.v2.templates.transforms.AssignShardIdFn;
4856
import com.google.cloud.teleport.v2.templates.transforms.ConvertChangeStreamErrorRecordToFailsafeElementFn;
4957
import com.google.cloud.teleport.v2.templates.transforms.ConvertDlqRecordToTrimmedShardedDataChangeRecordFn;
@@ -56,6 +64,7 @@
5664
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
5765
import com.google.cloud.teleport.v2.values.FailsafeElement;
5866
import com.google.common.base.Strings;
67+
import java.sql.Connection;
5968
import java.util.ArrayList;
6069
import java.util.Arrays;
6170
import java.util.List;
@@ -551,6 +560,33 @@ public static PipelineResult run(Options options) {
551560
LOG.info("Cassandra config is: {}", shards.get(0));
552561
shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD;
553562
}
563+
SourceInformationSchemaScanner scanner = null;
564+
try {
565+
SourceProcessorFactory.initializeConnectionHelper(
566+
options.getSourceType(), shards, connectionPoolSizePerWorker);
567+
if (options.getSourceType().equals(MYSQL_SOURCE_TYPE)) {
568+
Connection connection =
569+
(Connection)
570+
SourceProcessorFactory.getConnectionToShard(options.getSourceType(), shards.get(0));
571+
scanner = new MySqlInformationSchemaScanner(connection, shards.get(0).getDbName());
572+
} else {
573+
CqlSession session =
574+
(CqlSession)
575+
SourceProcessorFactory.getConnectionToShard(options.getSourceType(), shards.get(0));
576+
scanner =
577+
new CassandraInformationSchemaScanner(
578+
session, ((CassandraShard) shards.get(0)).getKeySpaceName());
579+
}
580+
} catch (UnsupportedSourceException e) {
581+
LOG.error("Unsupported source type: {}", options.getSourceType());
582+
throw new IllegalArgumentException(e);
583+
} catch (ConnectionException e) {
584+
LOG.error("Error getting connection to shard: {}", shards.get(0));
585+
throw new IllegalArgumentException(e);
586+
}
587+
SourceSchema sourceSchema = scanner.scan();
588+
LOG.info("Source schema: {}", sourceSchema);
589+
554590
if (shards.size() == 1 && !options.getIsShardedMigration()) {
555591
shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD;
556592
Shard shard = shards.get(0);
@@ -562,6 +598,7 @@ public static PipelineResult run(Options options) {
562598
}
563599

564600
if (options.getSourceType().equals(CASSANDRA_SOURCE_TYPE)) {
601+
// TODO: Remove this once session file support is removed.
565602
Map<String, SpannerTable> spannerTableMap =
566603
SpannerSchema.convertDDLTableToSpannerTable(ddl.allTables());
567604
Map<String, NameAndCols> spannerTableNameColsMap =

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactory.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.google.cloud.teleport.v2.templates.dbutils.processor;
1717

18+
import com.datastax.oss.driver.api.core.CqlSession;
1819
import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard;
1920
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
2021
import com.google.cloud.teleport.v2.templates.constants.Constants;
@@ -27,8 +28,10 @@
2728
import com.google.cloud.teleport.v2.templates.dbutils.dml.CassandraDMLGenerator;
2829
import com.google.cloud.teleport.v2.templates.dbutils.dml.IDMLGenerator;
2930
import com.google.cloud.teleport.v2.templates.dbutils.dml.MySQLDMLGenerator;
31+
import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
3032
import com.google.cloud.teleport.v2.templates.exceptions.UnsupportedSourceException;
3133
import com.google.cloud.teleport.v2.templates.models.ConnectionHelperRequest;
34+
import java.sql.Connection;
3235
import java.util.HashMap;
3336
import java.util.List;
3437
import java.util.Map;
@@ -137,7 +140,7 @@ private static IConnectionHelper getConnectionHelper(String source)
137140
"Invalid source type for connection helper: " + source));
138141
}
139142

140-
private static void initializeConnectionHelper(
143+
public static void initializeConnectionHelper(
141144
String source, List<Shard> shards, int maxConnections) throws UnsupportedSourceException {
142145
IConnectionHelper connectionHelper = getConnectionHelper(source);
143146
if (!connectionHelper.isConnectionPoolInitialized()) {
@@ -177,4 +180,33 @@ private static Map<String, IDao> createSourceDaoMap(String source, List<Shard> s
177180
}
178181
return sourceDaoMap;
179182
}
183+
184+
/**
185+
* Returns a connection for the specified shard.
186+
*
187+
* @param source The source database type (e.g. "mysql", "cassandra")
188+
* @param shard The shard to get a connection for
189+
* @return The connection object for the specified shard
190+
* @throws UnsupportedSourceException If the source type is not supported
191+
* @throws ConnectionException If there is an error getting the connection
192+
*/
193+
public static Object getConnectionToShard(String source, Shard shard)
194+
throws UnsupportedSourceException, ConnectionException {
195+
IConnectionHelper connectionHelper = getConnectionHelper(source);
196+
Function<Shard, String> urlGenerator =
197+
Optional.ofNullable(connectionUrl.get(source))
198+
.orElseThrow(
199+
() ->
200+
new UnsupportedSourceException(
201+
"Invalid source type for URL generation: " + source));
202+
String sqlURL = urlGenerator.apply(shard);
203+
204+
if (source.equals(Constants.SOURCE_MYSQL)) {
205+
return (Connection) connectionHelper.getConnection(sqlURL + "/" + shard.getUserName());
206+
} else if (source.equals(Constants.SOURCE_CASSANDRA)) {
207+
return (CqlSession) connectionHelper.getConnection(sqlURL);
208+
} else {
209+
throw new UnsupportedSourceException("Unsupported source type: " + source);
210+
}
211+
}
180212
}

0 commit comments

Comments
 (0)