Skip to content

Commit 454f3a4

Browse files
committed
Initial draft untested
1 parent 51fa0a0 commit 454f3a4

File tree

42 files changed

+4635
-2926
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+4635
-2926
lines changed

v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/ddl/Column.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,10 @@ public Table.Builder endColumn() {
321321
}
322322

323323
public abstract Builder columnOptions(ImmutableList<String> options);
324+
325+
public Builder array(Type t) {
326+
return type(Type.array(t));
327+
}
324328
}
325329

326330
private static class SizedType {

v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/Schema.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ public Schema(
8989
this.spSchema = spSchema;
9090
this.syntheticPKeys = syntheticPKeys;
9191
this.srcSchema = srcSchema;
92+
this.toSpanner = new HashMap<String, NameAndCols>();
93+
this.toSource = new HashMap<String, NameAndCols>();
94+
this.srcToID = new HashMap<String, NameAndCols>();
95+
this.spannerToID = new HashMap<String, NameAndCols>();
9296
this.empty = (spSchema == null || srcSchema == null);
9397
}
9498

v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/schema/SessionBasedMapper.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,13 @@ public SessionBasedMapper(Schema schema, Ddl ddl, boolean strictCheckSchema)
7070
throws InputMismatchException {
7171
this.schema = schema;
7272
this.ddl = ddl;
73-
try {
74-
validateSchemaAndDdl(schema, ddl);
75-
LOG.info("schema matches between session file and spanner");
76-
} catch (InputMismatchException e) {
77-
if (strictCheckSchema) {
73+
if (strictCheckSchema) {
74+
try {
75+
validateSchemaAndDdl(schema, ddl);
76+
LOG.info("schema matches between session file and spanner");
77+
} catch (InputMismatchException e) {
7878
LOG.warn("schema does not match between session and spanner: {}", e.getMessage());
7979
throw e;
80-
} else {
81-
LOG.warn("proceeding without schema match between session and spanner");
8280
}
8381
}
8482
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.spanner.migrations.utils;
17+
18+
import com.fasterxml.jackson.databind.ObjectMapper;
19+
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
20+
import com.google.cloud.teleport.v2.spanner.ddl.IndexColumn;
21+
import com.google.cloud.teleport.v2.spanner.ddl.Table;
22+
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceColumn;
23+
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceDatabaseType;
24+
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;
25+
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceTable;
26+
import com.google.cloud.teleport.v2.spanner.type.Type;
27+
import com.google.common.collect.ImmutableList;
28+
import com.google.common.collect.ImmutableMap;
29+
import java.io.File;
30+
import java.util.List;
31+
import java.util.Map;
32+
33+
public class SchemaUtils {
34+
35+
public static Ddl buildDdlFromSessionFile(String sessionFile) {
36+
try {
37+
ObjectMapper mapper = new ObjectMapper();
38+
Map<String, Object> session = mapper.readValue(new File(sessionFile), Map.class);
39+
Map<String, Object> spSchema = (Map<String, Object>) session.get("SpSchema");
40+
Ddl.Builder ddlBuilder = Ddl.builder();
41+
for (Map.Entry<String, Object> entry : spSchema.entrySet()) {
42+
Map<String, Object> tableMap = (Map<String, Object>) entry.getValue();
43+
String tableName = (String) tableMap.get("Name");
44+
Table.Builder tableBuilder = ddlBuilder.createTable(tableName);
45+
Map<String, Object> colDefs = (Map<String, Object>) tableMap.get("ColDefs");
46+
for (String colId : ((Map<String, Object>) colDefs).keySet()) {
47+
Map<String, Object> colMap = (Map<String, Object>) colDefs.get(colId);
48+
String colName = (String) colMap.get("Name");
49+
Map<String, Object> typeMap = (Map<String, Object>) colMap.get("T");
50+
String typeName = (String) typeMap.get("Name");
51+
Boolean isArray = (Boolean) typeMap.get("IsArray");
52+
if (typeName.equals("STRING")) {
53+
if (isArray) {
54+
tableBuilder.column(colName).array(Type.string()).endColumn();
55+
} else {
56+
tableBuilder.column(colName).string().max().endColumn();
57+
}
58+
} else if (typeName.equals("INT64")) {
59+
tableBuilder.column(colName).int64().endColumn();
60+
} else if (typeName.equals("FLOAT32")) {
61+
tableBuilder.column(colName).float32().endColumn();
62+
} else if (typeName.equals("FLOAT64")) {
63+
tableBuilder.column(colName).float64().endColumn();
64+
} else if (typeName.equals("BOOL")) {
65+
tableBuilder.column(colName).bool().endColumn();
66+
} else if (typeName.equals("BYTES")) {
67+
tableBuilder.column(colName).bytes().max().endColumn();
68+
} else if (typeName.equals("TIMESTAMP")) {
69+
tableBuilder.column(colName).timestamp().endColumn();
70+
} else if (typeName.equals("DATE")) {
71+
tableBuilder.column(colName).date().endColumn();
72+
} else if (typeName.equals("NUMERIC")) {
73+
tableBuilder.column(colName).numeric().endColumn();
74+
} else if (typeName.equals("JSON")) {
75+
tableBuilder.column(colName).json().endColumn();
76+
} else {
77+
throw new IllegalArgumentException(
78+
"This spanner type in session file is not yet supported");
79+
}
80+
// TODO: Add other types like arrays etc.
81+
}
82+
List<Map<String, Object>> pks = (List<Map<String, Object>>) tableMap.get("PrimaryKeys");
83+
if (pks != null && !pks.isEmpty()) {
84+
IndexColumn.IndexColumnsBuilder<Table.Builder> pkBuilder = tableBuilder.primaryKey();
85+
for (Map<String, Object> pk : pks) {
86+
String colId = (String) pk.get("ColId");
87+
Map<String, Object> colMap = (Map<String, Object>) colDefs.get(colId);
88+
String colName = (String) colMap.get("Name");
89+
pkBuilder.asc(colName);
90+
}
91+
pkBuilder.end();
92+
}
93+
tableBuilder.endTable();
94+
}
95+
return ddlBuilder.build();
96+
} catch (Exception e) {
97+
throw new RuntimeException(e);
98+
}
99+
}
100+
101+
public static SourceSchema buildSourceSchemaFromSessionFile(String sessionFile) {
102+
try {
103+
ObjectMapper mapper = new ObjectMapper();
104+
Map<String, Object> session = mapper.readValue(new File(sessionFile), Map.class);
105+
106+
SourceDatabaseType dbType = SourceDatabaseType.MYSQL;
107+
String dbName = "test-db";
108+
Map<String, Object> srcSchema = (Map<String, Object>) session.get("SrcSchema");
109+
SourceSchema.Builder schemaBuilder = SourceSchema.builder(dbType).databaseName(dbName);
110+
ImmutableMap.Builder<String, SourceTable> tablesBuilder = ImmutableMap.builder();
111+
if (srcSchema != null) {
112+
for (Map.Entry<String, Object> entry : srcSchema.entrySet()) {
113+
Map<String, Object> tableMap = (Map<String, Object>) entry.getValue();
114+
SourceTable.Builder tableBuilder =
115+
SourceTable.builder(dbType)
116+
.name((String) tableMap.get("Name"))
117+
.schema((String) tableMap.get("Schema"));
118+
Map<String, Object> colDefs = (Map<String, Object>) tableMap.get("ColDefs");
119+
ImmutableList.Builder<SourceColumn> columnsBuilder = ImmutableList.builder();
120+
if (colDefs != null) {
121+
for (String colId : ((Map<String, Object>) colDefs).keySet()) {
122+
Map<String, Object> colMap = (Map<String, Object>) colDefs.get(colId);
123+
Map<String, Object> typeMap = (Map<String, Object>) colMap.get("Type");
124+
String typeName = (String) typeMap.get("Name");
125+
Integer size = null;
126+
if (typeMap.get("Len") instanceof Number) {
127+
size = ((Number) typeMap.get("Len")).intValue();
128+
}
129+
SourceColumn.Builder colBuilder =
130+
SourceColumn.builder(dbType)
131+
.name((String) colMap.get("Name"))
132+
.type(typeName)
133+
.isNullable(
134+
!(colMap.get("NotNull") != null && (Boolean) colMap.get("NotNull")))
135+
.size(size);
136+
columnsBuilder.add(colBuilder.build());
137+
}
138+
}
139+
tableBuilder.columns(columnsBuilder.build());
140+
ImmutableList.Builder<String> pkCols = ImmutableList.builder();
141+
if (tableMap.get("PrimaryKeys") != null) {
142+
for (Map<String, Object> pk :
143+
(Iterable<Map<String, Object>>) tableMap.get("PrimaryKeys")) {
144+
String colId = (String) pk.get("ColId");
145+
String colName = ((Map<String, Object>) colDefs.get(colId)).get("Name").toString();
146+
pkCols.add(colName);
147+
}
148+
}
149+
tableBuilder.primaryKeyColumns(pkCols.build());
150+
tablesBuilder.put((String) tableMap.get("Name"), tableBuilder.build());
151+
}
152+
}
153+
schemaBuilder.tables(tablesBuilder.build());
154+
return schemaBuilder.build();
155+
} catch (Exception e) {
156+
throw new RuntimeException(e);
157+
}
158+
}
159+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.spanner.sourceddl;
17+
18+
import com.google.common.collect.ImmutableList;
19+
import com.google.common.collect.ImmutableMap;
20+
import java.sql.Connection;
21+
import java.sql.ResultSet;
22+
import java.sql.SQLException;
23+
import java.sql.Statement;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
/**
31+
* Abstract base class for source information schema scanners. Provides common functionality for
32+
* scanning database schemas.
33+
*/
34+
public abstract class AbstractSourceInformationSchemaScanner
35+
implements SourceInformationSchemaScanner {
36+
37+
private static final Logger LOG =
38+
LoggerFactory.getLogger(AbstractSourceInformationSchemaScanner.class);
39+
40+
protected final Connection connection;
41+
protected final String databaseName;
42+
protected final SourceDatabaseType sourceType;
43+
44+
protected AbstractSourceInformationSchemaScanner(
45+
Connection connection, String databaseName, SourceDatabaseType sourceType) {
46+
this.connection = connection;
47+
this.databaseName = databaseName;
48+
this.sourceType = sourceType;
49+
}
50+
51+
@Override
52+
public SourceSchema scan() {
53+
SourceSchema.Builder schemaBuilder =
54+
SourceSchema.builder(sourceType).databaseName(databaseName);
55+
56+
try {
57+
Map<String, SourceTable> tables = scanTables();
58+
schemaBuilder.tables(ImmutableMap.copyOf(tables));
59+
} catch (SQLException e) {
60+
throw new RuntimeException("Error scanning database schema", e);
61+
}
62+
63+
return schemaBuilder.build();
64+
}
65+
66+
/** Scans all tables in the database. */
67+
protected Map<String, SourceTable> scanTables() throws SQLException {
68+
Map<String, SourceTable> tables = new HashMap<>();
69+
try (Statement stmt = connection.createStatement();
70+
ResultSet rs = stmt.executeQuery(getTablesQuery())) {
71+
while (rs.next()) {
72+
String tableName = rs.getString(1);
73+
String schema = rs.getString(2);
74+
75+
SourceTable table = scanTable(tableName, schema);
76+
tables.put(tableName, table);
77+
}
78+
}
79+
return tables;
80+
}
81+
82+
/** Scans a single table's structure. */
83+
protected SourceTable scanTable(String tableName, String schema) throws SQLException {
84+
SourceTable.Builder tableBuilder =
85+
SourceTable.builder(sourceType).name(tableName).schema(schema);
86+
87+
// Scan columns
88+
List<SourceColumn> columns = scanColumns(tableName, schema);
89+
tableBuilder.columns(ImmutableList.copyOf(columns));
90+
91+
// Scan primary keys
92+
List<String> primaryKeys = scanPrimaryKeys(tableName, schema);
93+
tableBuilder.primaryKeyColumns(ImmutableList.copyOf(primaryKeys));
94+
95+
return tableBuilder.build();
96+
}
97+
98+
/** Gets the SQL query to list all tables. */
99+
protected abstract String getTablesQuery();
100+
101+
/** Scans columns for a given table. */
102+
protected abstract List<SourceColumn> scanColumns(String tableName, String schema)
103+
throws SQLException;
104+
105+
/** Scans primary keys for a given table. */
106+
protected abstract List<String> scanPrimaryKeys(String tableName, String schema)
107+
throws SQLException;
108+
}

0 commit comments

Comments
 (0)