Skip to content

Commit a31684f

Browse files
committed
Initial draft untested
1 parent 51fa0a0 commit a31684f

35 files changed

+3515
-1593
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.spanner.migrations.utils;
17+
18+
import com.fasterxml.jackson.databind.ObjectMapper;
19+
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
20+
import com.google.cloud.teleport.v2.spanner.ddl.IndexColumn;
21+
import com.google.cloud.teleport.v2.spanner.ddl.Table;
22+
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceColumn;
23+
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceDatabaseType;
24+
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;
25+
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceTable;
26+
import com.google.common.collect.ImmutableList;
27+
import com.google.common.collect.ImmutableMap;
28+
import java.io.File;
29+
import java.util.List;
30+
import java.util.Map;
31+
32+
public class SchemaUtils {
33+
34+
public static Ddl buildDdlFromSessionFile(String sessionFile) {
35+
try {
36+
ObjectMapper mapper = new ObjectMapper();
37+
Map<String, Object> session = mapper.readValue(new File(sessionFile), Map.class);
38+
Map<String, Object> spSchema = (Map<String, Object>) session.get("SpSchema");
39+
Ddl.Builder ddlBuilder = Ddl.builder();
40+
for (Map.Entry<String, Object> entry : spSchema.entrySet()) {
41+
Map<String, Object> tableMap = (Map<String, Object>) entry.getValue();
42+
String tableName = (String) tableMap.get("Name");
43+
Table.Builder tableBuilder = ddlBuilder.createTable(tableName);
44+
Map<String, Object> colDefs = (Map<String, Object>) tableMap.get("ColDefs");
45+
for (String colId : ((Map<String, Object>) colDefs).keySet()) {
46+
Map<String, Object> colMap = (Map<String, Object>) colDefs.get(colId);
47+
String colName = (String) colMap.get("Name");
48+
Map<String, Object> typeMap = (Map<String, Object>) colMap.get("T");
49+
String typeName = (String) typeMap.get("Name");
50+
if (typeName.equals("STRING")) {
51+
tableBuilder.column(colName).string().max().endColumn();
52+
} else if (typeName.equals("INT64")) {
53+
tableBuilder.column(colName).int64().endColumn();
54+
} else if (typeName.equals("TIMESTAMP")) {
55+
tableBuilder.column(colName).timestamp().endColumn();
56+
} else {
57+
tableBuilder.column(colName).string().max().endColumn();
58+
}
59+
}
60+
List<Map<String, Object>> pks = (List<Map<String, Object>>) tableMap.get("PrimaryKeys");
61+
if (pks != null && !pks.isEmpty()) {
62+
IndexColumn.IndexColumnsBuilder<Table.Builder> pkBuilder = tableBuilder.primaryKey();
63+
for (Map<String, Object> pk : pks) {
64+
String colId = (String) pk.get("ColId");
65+
Map<String, Object> colMap = (Map<String, Object>) colDefs.get(colId);
66+
String colName = (String) colMap.get("Name");
67+
pkBuilder.asc(colName);
68+
}
69+
pkBuilder.end();
70+
}
71+
tableBuilder.endTable();
72+
}
73+
return ddlBuilder.build();
74+
} catch (Exception e) {
75+
throw new RuntimeException(e);
76+
}
77+
}
78+
79+
public static SourceSchema buildSourceSchemaFromSessionFile(String sessionFile) {
80+
try {
81+
ObjectMapper mapper = new ObjectMapper();
82+
Map<String, Object> session = mapper.readValue(new File(sessionFile), Map.class);
83+
84+
SourceDatabaseType dbType = SourceDatabaseType.MYSQL;
85+
String dbName = "test-db";
86+
Map<String, Object> srcSchema = (Map<String, Object>) session.get("SrcSchema");
87+
SourceSchema.Builder schemaBuilder = SourceSchema.builder(dbType).databaseName(dbName);
88+
ImmutableMap.Builder<String, SourceTable> tablesBuilder = ImmutableMap.builder();
89+
if (srcSchema != null) {
90+
for (Map.Entry<String, Object> entry : srcSchema.entrySet()) {
91+
Map<String, Object> tableMap = (Map<String, Object>) entry.getValue();
92+
SourceTable.Builder tableBuilder =
93+
SourceTable.builder(dbType)
94+
.name((String) tableMap.get("Name"))
95+
.schema((String) tableMap.get("Schema"));
96+
Map<String, Object> colDefs = (Map<String, Object>) tableMap.get("ColDefs");
97+
ImmutableList.Builder<SourceColumn> columnsBuilder = ImmutableList.builder();
98+
if (colDefs != null) {
99+
for (String colId : ((Map<String, Object>) colDefs).keySet()) {
100+
Map<String, Object> colMap = (Map<String, Object>) colDefs.get(colId);
101+
Map<String, Object> typeMap = (Map<String, Object>) colMap.get("Type");
102+
String typeName = (String) typeMap.get("Name");
103+
Integer size = null;
104+
if (typeMap.get("Len") instanceof Number) {
105+
size = ((Number) typeMap.get("Len")).intValue();
106+
}
107+
SourceColumn.Builder colBuilder =
108+
SourceColumn.builder(dbType)
109+
.name((String) colMap.get("Name"))
110+
.type(typeName)
111+
.isNullable(
112+
!(colMap.get("NotNull") != null && (Boolean) colMap.get("NotNull")))
113+
.size(size);
114+
columnsBuilder.add(colBuilder.build());
115+
}
116+
}
117+
tableBuilder.columns(columnsBuilder.build());
118+
ImmutableList.Builder<String> pkCols = ImmutableList.builder();
119+
if (tableMap.get("PrimaryKeys") != null) {
120+
for (Map<String, Object> pk :
121+
(Iterable<Map<String, Object>>) tableMap.get("PrimaryKeys")) {
122+
String colId = (String) pk.get("ColId");
123+
String colName = ((Map<String, Object>) colDefs.get(colId)).get("Name").toString();
124+
pkCols.add(colName);
125+
}
126+
}
127+
tableBuilder.primaryKeyColumns(pkCols.build());
128+
tablesBuilder.put(entry.getKey(), tableBuilder.build());
129+
}
130+
}
131+
schemaBuilder.tables(tablesBuilder.build());
132+
return schemaBuilder.build();
133+
} catch (Exception e) {
134+
throw new RuntimeException(e);
135+
}
136+
}
137+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.spanner.sourceddl;
17+
18+
import com.google.common.collect.ImmutableList;
19+
import com.google.common.collect.ImmutableMap;
20+
import java.sql.Connection;
21+
import java.sql.ResultSet;
22+
import java.sql.SQLException;
23+
import java.sql.Statement;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
/**
31+
* Abstract base class for source information schema scanners. Provides common functionality for
32+
* scanning database schemas.
33+
*/
34+
public abstract class AbstractSourceInformationSchemaScanner
35+
implements SourceInformationSchemaScanner {
36+
37+
private static final Logger LOG =
38+
LoggerFactory.getLogger(AbstractSourceInformationSchemaScanner.class);
39+
40+
protected final Connection connection;
41+
protected final String databaseName;
42+
protected final SourceDatabaseType sourceType;
43+
44+
protected AbstractSourceInformationSchemaScanner(
45+
Connection connection, String databaseName, SourceDatabaseType sourceType) {
46+
this.connection = connection;
47+
this.databaseName = databaseName;
48+
this.sourceType = sourceType;
49+
}
50+
51+
@Override
52+
public SourceSchema scan() {
53+
SourceSchema.Builder schemaBuilder =
54+
SourceSchema.builder(sourceType).databaseName(databaseName);
55+
56+
try {
57+
Map<String, SourceTable> tables = scanTables();
58+
schemaBuilder.tables(ImmutableMap.copyOf(tables));
59+
} catch (SQLException e) {
60+
throw new RuntimeException("Error scanning database schema", e);
61+
}
62+
63+
return schemaBuilder.build();
64+
}
65+
66+
/** Scans all tables in the database. */
67+
protected Map<String, SourceTable> scanTables() throws SQLException {
68+
Map<String, SourceTable> tables = new HashMap<>();
69+
try (Statement stmt = connection.createStatement();
70+
ResultSet rs = stmt.executeQuery(getTablesQuery())) {
71+
while (rs.next()) {
72+
String tableName = rs.getString(1);
73+
String schema = rs.getString(2);
74+
75+
SourceTable table = scanTable(tableName, schema);
76+
tables.put(tableName, table);
77+
}
78+
}
79+
return tables;
80+
}
81+
82+
/** Scans a single table's structure. */
83+
protected SourceTable scanTable(String tableName, String schema) throws SQLException {
84+
SourceTable.Builder tableBuilder =
85+
SourceTable.builder(sourceType).name(tableName).schema(schema);
86+
87+
// Scan columns
88+
List<SourceColumn> columns = scanColumns(tableName, schema);
89+
tableBuilder.columns(ImmutableList.copyOf(columns));
90+
91+
// Scan primary keys
92+
List<String> primaryKeys = scanPrimaryKeys(tableName, schema);
93+
tableBuilder.primaryKeyColumns(ImmutableList.copyOf(primaryKeys));
94+
95+
return tableBuilder.build();
96+
}
97+
98+
/** Gets the SQL query to list all tables. */
99+
protected abstract String getTablesQuery();
100+
101+
/** Scans columns for a given table. */
102+
protected abstract List<SourceColumn> scanColumns(String tableName, String schema)
103+
throws SQLException;
104+
105+
/** Scans primary keys for a given table. */
106+
protected abstract List<String> scanPrimaryKeys(String tableName, String schema)
107+
throws SQLException;
108+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.spanner.sourceddl;
17+
18+
import com.datastax.oss.driver.api.core.CqlSession;
19+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
20+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
21+
import com.datastax.oss.driver.api.core.cql.ResultSet;
22+
import com.google.common.collect.ImmutableList;
23+
import com.google.common.collect.ImmutableMap;
24+
import java.util.ArrayList;
25+
import java.util.HashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
/**
32+
* Scanner for reading Cassandra database schema information. Implements
33+
* SourceInformationSchemaScanner to provide schema scanning functionality for Cassandra.
34+
*/
35+
public class CassandraInformationSchemaScanner implements SourceInformationSchemaScanner {
36+
private static final Logger LOG =
37+
LoggerFactory.getLogger(CassandraInformationSchemaScanner.class);
38+
39+
private final CqlSession session;
40+
private final String keyspaceName;
41+
42+
public CassandraInformationSchemaScanner(CqlSession session, String keyspaceName) {
43+
this.session = session;
44+
this.keyspaceName = keyspaceName;
45+
}
46+
47+
@Override
48+
public SourceSchema scan() {
49+
SourceSchema.Builder schemaBuilder =
50+
SourceSchema.builder(SourceDatabaseType.CASSANDRA).databaseName(keyspaceName);
51+
52+
try {
53+
Map<String, SourceTable> tables = scanTables();
54+
schemaBuilder.tables(ImmutableMap.copyOf(tables));
55+
} catch (Exception e) {
56+
throw new RuntimeException("Error scanning Cassandra schema", e);
57+
}
58+
59+
return schemaBuilder.build();
60+
}
61+
62+
private Map<String, SourceTable> scanTables() throws Exception {
63+
Map<String, SourceTable> tables = new HashMap<>();
64+
65+
// Query to get all tables in the keyspace
66+
String tableQuery = "SELECT table_name FROM system_schema.tables WHERE keyspace_name = ?";
67+
PreparedStatement tableStmt = session.prepare(tableQuery);
68+
BoundStatement tableBoundStmt = tableStmt.bind(keyspaceName);
69+
ResultSet tableResult = session.execute(tableBoundStmt);
70+
71+
// For each table, get its columns
72+
while (tableResult.iterator().hasNext()) {
73+
String tableName = tableResult.iterator().next().getString("table_name");
74+
tables.put(tableName, scanTableColumns(tableName));
75+
}
76+
77+
return tables;
78+
}
79+
80+
private SourceTable scanTableColumns(String tableName) throws Exception {
81+
// Query to get column information for the table
82+
String columnQuery =
83+
"SELECT column_name, type, kind FROM system_schema.columns "
84+
+ "WHERE keyspace_name = ? AND table_name = ?";
85+
PreparedStatement columnStmt = session.prepare(columnQuery);
86+
BoundStatement columnBoundStmt = columnStmt.bind(keyspaceName, tableName);
87+
ResultSet columnResult = session.execute(columnBoundStmt);
88+
89+
List<SourceColumn> columns = new ArrayList<>();
90+
List<String> primaryKeyColumns = new ArrayList<>();
91+
92+
while (columnResult.iterator().hasNext()) {
93+
var row = columnResult.iterator().next();
94+
String columnName = row.getString("column_name");
95+
String type = row.getString("type");
96+
String kind = row.getString("kind");
97+
98+
// Create SourceColumn with appropriate type information
99+
SourceColumn.Builder columnBuilder =
100+
SourceColumn.builder(SourceDatabaseType.CASSANDRA)
101+
.name(columnName)
102+
.type(type.toUpperCase().replaceAll("\\s+", "")) // Normalize type name
103+
.isNullable(true) // Cassandra columns are nullable by default
104+
.isPrimaryKey("partition_key".equals(kind) || "clustering".equals(kind));
105+
106+
if ("partition_key".equals(kind) || "clustering".equals(kind)) {
107+
primaryKeyColumns.add(columnName);
108+
}
109+
110+
columns.add(columnBuilder.build());
111+
}
112+
113+
return SourceTable.builder(SourceDatabaseType.CASSANDRA)
114+
.name(tableName)
115+
.schema(keyspaceName)
116+
.columns(ImmutableList.copyOf(columns))
117+
.primaryKeyColumns(ImmutableList.copyOf(primaryKeyColumns))
118+
.build();
119+
}
120+
}

0 commit comments

Comments
 (0)