Skip to content

Commit 6ab22b3

Browse files
authored
Add source ddl classes (#2525)
* Add source ddl classes * Update soruceddl with columna and table getter * Resolve comments
1 parent 4597785 commit 6ab22b3

File tree

15 files changed

+1391
-0
lines changed

15 files changed

+1391
-0
lines changed

v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/ddl/Column.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,10 @@ public Table.Builder endColumn() {
321321
}
322322

323323
public abstract Builder columnOptions(ImmutableList<String> options);
324+
325+
public Builder array(Type t) {
326+
return type(Type.array(t));
327+
}
324328
}
325329

326330
private static class SizedType {

v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/Schema.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ public Schema(
8989
this.spSchema = spSchema;
9090
this.syntheticPKeys = syntheticPKeys;
9191
this.srcSchema = srcSchema;
92+
this.toSpanner = new HashMap<String, NameAndCols>();
93+
this.toSource = new HashMap<String, NameAndCols>();
94+
this.srcToID = new HashMap<String, NameAndCols>();
95+
this.spannerToID = new HashMap<String, NameAndCols>();
9296
this.empty = (spSchema == null || srcSchema == null);
9397
}
9498

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.spanner.sourceddl;
17+
18+
import com.datastax.oss.driver.api.core.CqlSession;
19+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
20+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
21+
import com.datastax.oss.driver.api.core.cql.ResultSet;
22+
import com.google.common.collect.ImmutableList;
23+
import com.google.common.collect.ImmutableMap;
24+
import java.util.ArrayList;
25+
import java.util.HashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
/**
32+
* Scanner for reading Cassandra database schema information. Implements SourceSchemaScanner to
33+
* provide schema scanning functionality for Cassandra.
34+
*/
35+
public class CassandraInformationSchemaScanner implements SourceSchemaScanner {
36+
private static final Logger LOG =
37+
LoggerFactory.getLogger(CassandraInformationSchemaScanner.class);
38+
39+
private final CqlSession session;
40+
private final String keyspaceName;
41+
private final SourceDatabaseType sourceType = SourceDatabaseType.CASSANDRA;
42+
43+
public CassandraInformationSchemaScanner(CqlSession session, String keyspaceName) {
44+
this.session = session;
45+
this.keyspaceName = keyspaceName;
46+
}
47+
48+
@Override
49+
public SourceSchema scan() {
50+
SourceSchema.Builder schemaBuilder =
51+
SourceSchema.builder(sourceType).databaseName(keyspaceName);
52+
53+
try {
54+
Map<String, SourceTable> tables = scanTables();
55+
schemaBuilder.tables(ImmutableMap.copyOf(tables));
56+
} catch (Exception e) {
57+
throw new RuntimeException("Error scanning Cassandra schema", e);
58+
}
59+
60+
return schemaBuilder.build();
61+
}
62+
63+
private Map<String, SourceTable> scanTables() throws Exception {
64+
Map<String, SourceTable> tables = new HashMap<>();
65+
66+
// Query to get all tables in the keyspace
67+
String tableQuery = "SELECT table_name FROM system_schema.tables WHERE keyspace_name = ?";
68+
PreparedStatement tableStmt = session.prepare(tableQuery);
69+
BoundStatement tableBoundStmt = tableStmt.bind(keyspaceName);
70+
ResultSet tableResult = session.execute(tableBoundStmt);
71+
72+
// For each table, get its columns
73+
while (tableResult.iterator().hasNext()) {
74+
String tableName = tableResult.iterator().next().getString("table_name");
75+
if (tableName == null) {
76+
continue;
77+
}
78+
tables.put(tableName, scanTableColumns(tableName));
79+
}
80+
81+
return tables;
82+
}
83+
84+
private SourceTable scanTableColumns(String tableName) throws Exception {
85+
// Query to get column information for the table
86+
String columnQuery =
87+
"SELECT column_name, type, kind FROM system_schema.columns "
88+
+ "WHERE keyspace_name = ? AND table_name = ?";
89+
PreparedStatement columnStmt = session.prepare(columnQuery);
90+
BoundStatement columnBoundStmt = columnStmt.bind(keyspaceName, tableName);
91+
ResultSet columnResult = session.execute(columnBoundStmt);
92+
93+
List<SourceColumn> columns = new ArrayList<>();
94+
List<String> primaryKeyColumns = new ArrayList<>();
95+
96+
while (columnResult.iterator().hasNext()) {
97+
var row = columnResult.iterator().next();
98+
String columnName = row.getString("column_name");
99+
String type = row.getString("type");
100+
String kind = row.getString("kind");
101+
102+
// Create SourceColumn with appropriate type information
103+
SourceColumn.Builder columnBuilder =
104+
SourceColumn.builder(sourceType)
105+
.name(columnName)
106+
.type(type.toUpperCase().replaceAll("\\s+", "")) // Normalize type name
107+
.isNullable(true) // Cassandra columns are nullable by default
108+
.isPrimaryKey("partition_key".equals(kind) || "clustering".equals(kind));
109+
110+
if ("partition_key".equals(kind) || "clustering".equals(kind)) {
111+
primaryKeyColumns.add(columnName);
112+
}
113+
114+
columns.add(columnBuilder.build());
115+
}
116+
117+
return SourceTable.builder(sourceType)
118+
.name(tableName)
119+
.schema(keyspaceName)
120+
.columns(ImmutableList.copyOf(columns))
121+
.primaryKeyColumns(ImmutableList.copyOf(primaryKeyColumns))
122+
.build();
123+
}
124+
}
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.spanner.sourceddl;
17+
18+
import com.google.common.collect.ImmutableList;
19+
import java.sql.Connection;
20+
import java.sql.ResultSet;
21+
import java.sql.SQLException;
22+
import java.sql.Statement;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.Map;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
/** MySQL implementation of {@link SourceSchemaScanner}. */
30+
public class MySqlInformationSchemaScanner implements SourceSchemaScanner {
31+
32+
private static final Logger LOG = LoggerFactory.getLogger(MySqlInformationSchemaScanner.class);
33+
34+
private final Connection connection;
35+
private final String databaseName;
36+
private final SourceDatabaseType sourceType = SourceDatabaseType.MYSQL;
37+
38+
public MySqlInformationSchemaScanner(Connection connection, String databaseName) {
39+
this.connection = connection;
40+
this.databaseName = databaseName;
41+
}
42+
43+
@Override
44+
public SourceSchema scan() {
45+
SourceSchema.Builder schemaBuilder =
46+
SourceSchema.builder(sourceType).databaseName(databaseName);
47+
48+
try {
49+
Map<String, SourceTable> tables = scanTables();
50+
schemaBuilder.tables(com.google.common.collect.ImmutableMap.copyOf(tables));
51+
} catch (SQLException e) {
52+
throw new RuntimeException("Error scanning database schema", e);
53+
}
54+
55+
return schemaBuilder.build();
56+
}
57+
58+
private Map<String, SourceTable> scanTables() throws SQLException {
59+
Map<String, SourceTable> tables = new java.util.HashMap<>();
60+
try (Statement stmt = connection.createStatement();
61+
ResultSet rs = stmt.executeQuery(getTablesQuery())) {
62+
while (rs.next()) {
63+
String tableName = rs.getString(1);
64+
String schema = rs.getString(2);
65+
if (tableName == null) {
66+
continue;
67+
}
68+
SourceTable table = scanTable(tableName, schema);
69+
tables.put(tableName, table);
70+
}
71+
}
72+
return tables;
73+
}
74+
75+
private SourceTable scanTable(String tableName, String schema) throws SQLException {
76+
SourceTable.Builder tableBuilder =
77+
SourceTable.builder(sourceType).name(tableName).schema(schema);
78+
79+
// Scan columns
80+
List<SourceColumn> columns = scanColumns(tableName, schema);
81+
tableBuilder.columns(com.google.common.collect.ImmutableList.copyOf(columns));
82+
83+
// Scan primary keys
84+
List<String> primaryKeys = scanPrimaryKeys(tableName, schema);
85+
tableBuilder.primaryKeyColumns(com.google.common.collect.ImmutableList.copyOf(primaryKeys));
86+
87+
return tableBuilder.build();
88+
}
89+
90+
private String getTablesQuery() {
91+
return String.format(
92+
"SELECT table_name, table_schema "
93+
+ "FROM information_schema.tables "
94+
+ "WHERE table_schema = '%s' "
95+
+ "AND table_type = 'BASE TABLE'",
96+
databaseName);
97+
}
98+
99+
private List<SourceColumn> scanColumns(String tableName, String schema) throws SQLException {
100+
List<SourceColumn> columns = new ArrayList<>();
101+
String query =
102+
String.format(
103+
"SELECT column_name, data_type, character_maximum_length, "
104+
+ "numeric_precision, numeric_scale, is_nullable, column_key "
105+
+ "FROM information_schema.columns "
106+
+ "WHERE table_schema = '%s' AND table_name = '%s' "
107+
+ "ORDER BY ordinal_position",
108+
schema, tableName);
109+
110+
try (Statement stmt = connection.createStatement();
111+
ResultSet rs = stmt.executeQuery(query)) {
112+
while (rs.next()) {
113+
SourceColumn.Builder columnBuilder =
114+
SourceColumn.builder(sourceType)
115+
.name(rs.getString("column_name"))
116+
.type(rs.getString("data_type"))
117+
.isNullable("YES".equals(rs.getString("is_nullable")))
118+
.isPrimaryKey("PRI".equals(rs.getString("column_key")));
119+
120+
// Handle size/precision/scale
121+
String maxLength = rs.getString("character_maximum_length");
122+
if (maxLength != null) {
123+
columnBuilder.size(Long.parseLong(maxLength));
124+
}
125+
126+
String precision = rs.getString("numeric_precision");
127+
if (precision != null) {
128+
columnBuilder.precision(Integer.parseInt(precision));
129+
}
130+
131+
String scale = rs.getString("numeric_scale");
132+
if (scale != null) {
133+
columnBuilder.scale(Integer.parseInt(scale));
134+
}
135+
136+
columnBuilder.columnOptions(ImmutableList.of());
137+
columns.add(columnBuilder.build());
138+
}
139+
}
140+
return columns;
141+
}
142+
143+
private List<String> scanPrimaryKeys(String tableName, String schema) throws SQLException {
144+
List<String> primaryKeys = new ArrayList<>();
145+
String query =
146+
String.format(
147+
"SELECT column_name "
148+
+ "FROM information_schema.key_column_usage "
149+
+ "WHERE table_schema = '%s' AND table_name = '%s' "
150+
+ "AND constraint_name = 'PRIMARY' "
151+
+ "ORDER BY ordinal_position",
152+
schema, tableName);
153+
154+
try (Statement stmt = connection.createStatement();
155+
ResultSet rs = stmt.executeQuery(query)) {
156+
while (rs.next()) {
157+
primaryKeys.add(rs.getString("column_name"));
158+
}
159+
}
160+
return primaryKeys;
161+
}
162+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.spanner.sourceddl;
17+
18+
import com.google.auto.value.AutoValue;
19+
import com.google.common.collect.ImmutableList;
20+
import java.io.Serializable;
21+
import javax.annotation.Nullable;
22+
23+
/** Represents a column in a source database table. */
24+
@AutoValue
25+
public abstract class SourceColumn implements Serializable {
26+
27+
private static final long serialVersionUID = 1L;
28+
29+
public abstract String name();
30+
31+
public abstract String type();
32+
33+
public abstract boolean isNullable();
34+
35+
public abstract boolean isPrimaryKey();
36+
37+
@Nullable
38+
public abstract Long size();
39+
40+
@Nullable
41+
public abstract Integer precision();
42+
43+
@Nullable
44+
public abstract Integer scale();
45+
46+
public abstract ImmutableList<String> columnOptions();
47+
48+
public abstract SourceDatabaseType sourceType();
49+
50+
public static Builder builder(SourceDatabaseType sourceType) {
51+
return new AutoValue_SourceColumn.Builder()
52+
.sourceType(sourceType)
53+
.isNullable(true)
54+
.isPrimaryKey(false)
55+
.columnOptions(ImmutableList.of());
56+
}
57+
58+
public abstract Builder toBuilder();
59+
60+
/** A builder for {@link SourceColumn}. */
61+
@AutoValue.Builder
62+
public abstract static class Builder {
63+
public abstract Builder name(String name);
64+
65+
public abstract Builder type(String type);
66+
67+
public abstract Builder isNullable(boolean isNullable);
68+
69+
public abstract Builder isPrimaryKey(boolean isPrimaryKey);
70+
71+
public abstract Builder size(Long size);
72+
73+
public abstract Builder precision(Integer precision);
74+
75+
public abstract Builder scale(Integer scale);
76+
77+
public abstract Builder columnOptions(ImmutableList<String> columnOptions);
78+
79+
public abstract Builder sourceType(SourceDatabaseType sourceType);
80+
81+
public abstract SourceColumn build();
82+
}
83+
}

0 commit comments

Comments
 (0)