Skip to content

Commit 7ce8030

Browse files
committed
Refactor code to avoid session file for anything other than name mappings
1 parent 03ea6fc commit 7ce8030

File tree

18 files changed

+3047
-2100
lines changed

18 files changed

+3047
-2100
lines changed
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.spanner.migrations.utils;
17+
18+
import com.fasterxml.jackson.databind.ObjectMapper;
19+
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
20+
import com.google.cloud.teleport.v2.spanner.ddl.IndexColumn;
21+
import com.google.cloud.teleport.v2.spanner.ddl.Table;
22+
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceColumn;
23+
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceDatabaseType;
24+
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;
25+
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceTable;
26+
import com.google.cloud.teleport.v2.spanner.type.Type;
27+
import com.google.common.collect.ImmutableList;
28+
import com.google.common.collect.ImmutableMap;
29+
import java.io.File;
30+
import java.util.List;
31+
import java.util.Map;
32+
33+
public class SchemaUtils {
34+
35+
public static Ddl buildDdlFromSessionFile(String sessionFile) {
36+
try {
37+
ObjectMapper mapper = new ObjectMapper();
38+
Map<String, Object> session = mapper.readValue(new File(sessionFile), Map.class);
39+
Map<String, Object> spSchema = (Map<String, Object>) session.get("SpSchema");
40+
Ddl.Builder ddlBuilder = Ddl.builder();
41+
for (Map.Entry<String, Object> entry : spSchema.entrySet()) {
42+
Map<String, Object> tableMap = (Map<String, Object>) entry.getValue();
43+
String tableName = (String) tableMap.get("Name");
44+
Table.Builder tableBuilder = ddlBuilder.createTable(tableName);
45+
Map<String, Object> colDefs = (Map<String, Object>) tableMap.get("ColDefs");
46+
for (String colId : ((Map<String, Object>) colDefs).keySet()) {
47+
Map<String, Object> colMap = (Map<String, Object>) colDefs.get(colId);
48+
String colName = (String) colMap.get("Name");
49+
Map<String, Object> typeMap = (Map<String, Object>) colMap.get("T");
50+
String typeName = (String) typeMap.get("Name");
51+
Boolean isArray = (Boolean) typeMap.get("IsArray");
52+
if (typeName.equals("STRING")) {
53+
if (isArray) {
54+
tableBuilder.column(colName).array(Type.string()).endColumn();
55+
} else {
56+
tableBuilder.column(colName).string().max().endColumn();
57+
}
58+
} else if (typeName.equals("INT64")) {
59+
tableBuilder.column(colName).int64().endColumn();
60+
} else if (typeName.equals("FLOAT32")) {
61+
tableBuilder.column(colName).float32().endColumn();
62+
} else if (typeName.equals("FLOAT64")) {
63+
tableBuilder.column(colName).float64().endColumn();
64+
} else if (typeName.equals("BOOL")) {
65+
tableBuilder.column(colName).bool().endColumn();
66+
} else if (typeName.equals("BYTES")) {
67+
tableBuilder.column(colName).bytes().max().endColumn();
68+
} else if (typeName.equals("TIMESTAMP")) {
69+
tableBuilder.column(colName).timestamp().endColumn();
70+
} else if (typeName.equals("DATE")) {
71+
tableBuilder.column(colName).date().endColumn();
72+
} else if (typeName.equals("NUMERIC")) {
73+
tableBuilder.column(colName).numeric().endColumn();
74+
} else if (typeName.equals("JSON")) {
75+
tableBuilder.column(colName).json().endColumn();
76+
} else {
77+
throw new IllegalArgumentException(
78+
"This spanner type in session file is not yet supported");
79+
}
80+
// TODO: Add other types like arrays etc.
81+
}
82+
List<Map<String, Object>> pks = (List<Map<String, Object>>) tableMap.get("PrimaryKeys");
83+
if (pks != null && !pks.isEmpty()) {
84+
IndexColumn.IndexColumnsBuilder<Table.Builder> pkBuilder = tableBuilder.primaryKey();
85+
for (Map<String, Object> pk : pks) {
86+
String colId = (String) pk.get("ColId");
87+
Map<String, Object> colMap = (Map<String, Object>) colDefs.get(colId);
88+
String colName = (String) colMap.get("Name");
89+
pkBuilder.asc(colName);
90+
}
91+
pkBuilder.end();
92+
}
93+
tableBuilder.endTable();
94+
}
95+
return ddlBuilder.build();
96+
} catch (Exception e) {
97+
throw new RuntimeException(e);
98+
}
99+
}
100+
101+
public static SourceSchema buildSourceSchemaFromSessionFile(String sessionFile) {
102+
try {
103+
ObjectMapper mapper = new ObjectMapper();
104+
Map<String, Object> session = mapper.readValue(new File(sessionFile), Map.class);
105+
106+
SourceDatabaseType dbType = SourceDatabaseType.MYSQL;
107+
String dbName = "test-db";
108+
Map<String, Object> srcSchema = (Map<String, Object>) session.get("SrcSchema");
109+
SourceSchema.Builder schemaBuilder = SourceSchema.builder(dbType).databaseName(dbName);
110+
ImmutableMap.Builder<String, SourceTable> tablesBuilder = ImmutableMap.builder();
111+
if (srcSchema != null) {
112+
for (Map.Entry<String, Object> entry : srcSchema.entrySet()) {
113+
Map<String, Object> tableMap = (Map<String, Object>) entry.getValue();
114+
SourceTable.Builder tableBuilder =
115+
SourceTable.builder(dbType)
116+
.name((String) tableMap.get("Name"))
117+
.schema((String) tableMap.get("Schema"));
118+
Map<String, Object> colDefs = (Map<String, Object>) tableMap.get("ColDefs");
119+
ImmutableList.Builder<SourceColumn> columnsBuilder = ImmutableList.builder();
120+
if (colDefs != null) {
121+
for (String colId : ((Map<String, Object>) colDefs).keySet()) {
122+
Map<String, Object> colMap = (Map<String, Object>) colDefs.get(colId);
123+
Map<String, Object> typeMap = (Map<String, Object>) colMap.get("Type");
124+
String typeName = (String) typeMap.get("Name");
125+
Long size = null;
126+
if (typeMap.get("Len") instanceof Number) {
127+
size = ((Number) typeMap.get("Len")).longValue();
128+
}
129+
SourceColumn.Builder colBuilder =
130+
SourceColumn.builder(dbType)
131+
.name((String) colMap.get("Name"))
132+
.type(typeName)
133+
.isNullable(
134+
!(colMap.get("NotNull") != null && (Boolean) colMap.get("NotNull")))
135+
.size(size);
136+
columnsBuilder.add(colBuilder.build());
137+
}
138+
}
139+
tableBuilder.columns(columnsBuilder.build());
140+
ImmutableList.Builder<String> pkCols = ImmutableList.builder();
141+
if (tableMap.get("PrimaryKeys") != null) {
142+
for (Map<String, Object> pk :
143+
(Iterable<Map<String, Object>>) tableMap.get("PrimaryKeys")) {
144+
String colId = (String) pk.get("ColId");
145+
String colName = ((Map<String, Object>) colDefs.get(colId)).get("Name").toString();
146+
pkCols.add(colName);
147+
}
148+
}
149+
tableBuilder.primaryKeyColumns(pkCols.build());
150+
tablesBuilder.put((String) tableMap.get("Name"), tableBuilder.build());
151+
}
152+
}
153+
schemaBuilder.tables(tablesBuilder.build());
154+
return schemaBuilder.build();
155+
} catch (Exception e) {
156+
throw new RuntimeException(e);
157+
}
158+
}
159+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Copyright (C) 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.spanner.migrations.utils;
17+
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertTrue;
20+
21+
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
22+
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;
23+
import com.google.cloud.teleport.v2.spanner.type.Type;
24+
import com.google.common.io.Resources;
25+
import java.nio.file.Path;
26+
import java.nio.file.Paths;
27+
import org.junit.Test;
28+
29+
public class SchemaUtilsTest {
30+
31+
@Test
32+
public void testBuildDdlFromSessionFile_valid() throws Exception {
33+
Path sessionFile = Paths.get(Resources.getResource("session-file.json").toURI());
34+
Ddl ddl = SchemaUtils.buildDdlFromSessionFile(sessionFile.toString());
35+
// Basic assertions: table names and column names
36+
assertTrue(ddl.table("new_cart") != null);
37+
assertTrue(ddl.table("new_people") != null);
38+
assertEquals("new_cart", ddl.table("new_cart").name());
39+
assertEquals("new_people", ddl.table("new_people").name());
40+
assertTrue(
41+
ddl.table("new_cart").columns().stream().anyMatch(c -> c.name().equals("new_product_id")));
42+
assertTrue(
43+
ddl.table("new_cart").columns().stream().anyMatch(c -> c.name().equals("new_quantity")));
44+
assertTrue(
45+
ddl.table("new_cart").columns().stream().anyMatch(c -> c.name().equals("new_user_id")));
46+
47+
// Assert types
48+
assertEquals(
49+
Type.string(),
50+
ddl.table("new_cart").columns().stream()
51+
.filter(c -> c.name().equals("new_product_id"))
52+
.findFirst()
53+
.get()
54+
.type());
55+
assertEquals(
56+
Type.int64(),
57+
ddl.table("new_cart").columns().stream()
58+
.filter(c -> c.name().equals("new_quantity"))
59+
.findFirst()
60+
.get()
61+
.type());
62+
assertEquals(
63+
Type.string(),
64+
ddl.table("new_cart").columns().stream()
65+
.filter(c -> c.name().equals("new_user_id"))
66+
.findFirst()
67+
.get()
68+
.type());
69+
}
70+
71+
@Test
72+
public void testBuildDdlFromSessionFile_empty() throws Exception {
73+
Path sessionFile = Paths.get(Resources.getResource("session-file-empty.json").toURI());
74+
Ddl ddl = SchemaUtils.buildDdlFromSessionFile(sessionFile.toString());
75+
// Should have no tables
76+
assertTrue(ddl.allTables().isEmpty());
77+
}
78+
79+
@Test(expected = RuntimeException.class)
80+
public void testBuildDdlFromSessionFile_nonexistent() {
81+
SchemaUtils.buildDdlFromSessionFile("/nonexistent/path/to/session-file.json");
82+
}
83+
84+
@Test
85+
public void testBuildSourceSchemaFromSessionFile_valid() throws Exception {
86+
Path sessionFile = Paths.get(Resources.getResource("session-file.json").toURI());
87+
SourceSchema schema = SchemaUtils.buildSourceSchemaFromSessionFile(sessionFile.toString());
88+
assertEquals("cart", schema.tables().get("cart").name());
89+
assertEquals("people", schema.tables().get("people").name());
90+
assertEquals("my_schema", schema.tables().get("cart").schema());
91+
assertTrue(
92+
schema.tables().get("cart").columns().stream()
93+
.anyMatch(c -> c.name().equals("product_id")));
94+
assertTrue(
95+
schema.tables().get("cart").columns().stream().anyMatch(c -> c.name().equals("quantity")));
96+
assertTrue(
97+
schema.tables().get("cart").columns().stream().anyMatch(c -> c.name().equals("user_id")));
98+
99+
// Assert types
100+
assertEquals(
101+
"varchar",
102+
schema.tables().get("cart").columns().stream()
103+
.filter(c -> c.name().equals("product_id"))
104+
.findFirst()
105+
.get()
106+
.type());
107+
assertEquals(
108+
"bigint",
109+
schema.tables().get("cart").columns().stream()
110+
.filter(c -> c.name().equals("quantity"))
111+
.findFirst()
112+
.get()
113+
.type());
114+
assertEquals(
115+
"varchar",
116+
schema.tables().get("cart").columns().stream()
117+
.filter(c -> c.name().equals("user_id"))
118+
.findFirst()
119+
.get()
120+
.type());
121+
}
122+
123+
@Test
124+
public void testBuildSourceSchemaFromSessionFile_empty() throws Exception {
125+
Path sessionFile = Paths.get(Resources.getResource("session-file-empty.json").toURI());
126+
SourceSchema schema = SchemaUtils.buildSourceSchemaFromSessionFile(sessionFile.toString());
127+
// Should have no tables
128+
assertTrue(schema.tables().isEmpty());
129+
}
130+
131+
@Test(expected = RuntimeException.class)
132+
public void testBuildSourceSchemaFromSessionFile_nonexistent() {
133+
SchemaUtils.buildSourceSchemaFromSessionFile("/nonexistent/path/to/session-file.json");
134+
}
135+
}

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDb.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,7 @@ public static PipelineResult run(Options options) {
702702
spannerConfig,
703703
schema,
704704
ddl,
705+
sourceSchema,
705706
shardingMode,
706707
shards.get(0).getLogicalShardId(),
707708
options.getSkipDirectoryName(),
@@ -723,6 +724,7 @@ public static PipelineResult run(Options options) {
723724
spannerMetadataConfig,
724725
options.getSourceDbTimezoneOffset(),
725726
ddl,
727+
sourceSchema,
726728
options.getShadowTablePrefix(),
727729
options.getSkipDirectoryName(),
728730
connectionPoolSizePerWorker,

0 commit comments

Comments
 (0)