Skip to content

Commit bd488f0

Browse files
manitguptaMnkyGns
authored andcommitted
feat: Remove migration_shard_id check from AssignShardIdFn (GoogleCloudPlatform#2467)
* Remove shardIdColumn check from AssignShardId DoFn * Fix mutation * Add TODO comment * Spotless
1 parent e745887 commit bd488f0

File tree

5 files changed

+25
-61
lines changed

5 files changed

+25
-61
lines changed

v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/transforms/AssignShardIdFn.java

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.google.cloud.teleport.v2.spanner.ddl.IndexColumn;
2828
import com.google.cloud.teleport.v2.spanner.ddl.Table;
2929
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
30-
import com.google.cloud.teleport.v2.spanner.migrations.schema.SpannerTable;
3130
import com.google.cloud.teleport.v2.spanner.type.Type;
3231
import com.google.cloud.teleport.v2.spanner.utils.IShardIdFetcher;
3332
import com.google.cloud.teleport.v2.spanner.utils.ShardIdRequest;
@@ -188,8 +187,9 @@ public void processElement(ProcessContext c) throws Exception {
188187
qualifiedShard = this.shardName;
189188
} else {
190189
// Skip from processing if table not in session File
191-
String shardIdColumn = getShardIdColumnForTableName(tableName);
192-
if (shardIdColumn.isEmpty()) {
190+
// TODO: remove dependency on session file when session file is made optional
191+
boolean doesTableExist = doesTableExistInSessionFile(tableName);
192+
if (!doesTableExist) {
193193
LOG.warn(
194194
"Writing record for table {} to skipped directory name {} since table not present in"
195195
+ " the session file.",
@@ -450,27 +450,14 @@ private Object getColumnValueFromRow(Column column, Value value) throws Exceptio
450450
}
451451
}
452452

453-
private String getShardIdColumnForTableName(String tableName) throws IllegalArgumentException {
454-
if (!schema.getSpannerToID().containsKey(tableName)) {
455-
LOG.warn(
456-
"Table {} found in change record but not found in session file. Skipping record",
457-
tableName);
458-
return "";
453+
private boolean doesTableExistInSessionFile(String tableName) throws IllegalArgumentException {
454+
if (schema.getSpannerToID().containsKey(tableName)) {
455+
return true;
459456
}
460-
String tableId = schema.getSpannerToID().get(tableName).getName();
461-
if (!schema.getSpSchema().containsKey(tableId)) {
462-
LOG.warn("Table {} not found in session file. Skipping record.", tableId);
463-
return "";
464-
}
465-
SpannerTable spTable = schema.getSpSchema().get(tableId);
466-
String shardColId = spTable.getShardIdColumn();
467-
if (!spTable.getColDefs().containsKey(shardColId)) {
468-
throw new IllegalArgumentException(
469-
"ColumnId "
470-
+ shardColId
471-
+ " not found in session file. Please provide a valid session file.");
472-
}
473-
return spTable.getColDefs().get(shardColId).getName();
457+
LOG.warn(
458+
"Table {} found in change record but not found in session file. Skipping record",
459+
tableName);
460+
return false;
474461
}
475462

476463
private Map<String, Object> getSpannerRecordFromChangeStreamData(

v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomShardIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,6 @@ private void writeSpannerDataForSingers(int singerId, String firstName, String s
200200
.to(singerId)
201201
.set("FirstName")
202202
.to(firstName)
203-
.set("migration_shard_id")
204-
.to(shardId)
205203
.build();
206204
spannerResourceManager.write(m);
207205
}

v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/transforms/AssignShardIdFnTest.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,7 @@ public static Map<String, SpannerTable> getSampleSpSchema() {
643643
return spSchema;
644644
}
645645

646-
public static Map<String, NameAndCols> getSampleSpannerToId() {
646+
private static Map<String, NameAndCols> getSampleSpannerToId() {
647647
Map<String, NameAndCols> spannerToId = new HashMap<String, NameAndCols>();
648648
Map<String, String> t1ColIds = new HashMap<String, String>();
649649
t1ColIds.put("accountId", "c1");
@@ -654,7 +654,7 @@ public static Map<String, NameAndCols> getSampleSpannerToId() {
654654
return spannerToId;
655655
}
656656

657-
static Ddl getTestDdl() {
657+
private static Ddl getTestDdl() {
658658
Ddl ddl =
659659
Ddl.builder()
660660
.createTable("tableName")
@@ -678,7 +678,7 @@ static Ddl getTestDdl() {
678678
return ddl;
679679
}
680680

681-
static Ddl getTestDdlForPrimaryKeyTest() {
681+
private static Ddl getTestDdlForPrimaryKeyTest() {
682682

683683
Ddl ddl =
684684
Ddl.builder()
@@ -743,7 +743,7 @@ static Ddl getTestDdlForPrimaryKeyTest() {
743743
return ddl;
744744
}
745745

746-
public static Schema getSchemaObjectAllDatatypes() {
746+
private static Schema getSchemaObjectAllDatatypes() {
747747
Map<String, SyntheticPKey> syntheticPKeys = new HashMap<String, SyntheticPKey>();
748748
Map<String, SourceTable> srcSchema = new HashMap<String, SourceTable>();
749749
Map<String, SpannerTable> spSchema = getSampleSpSchemaAllDatatypes();
@@ -753,7 +753,7 @@ public static Schema getSchemaObjectAllDatatypes() {
753753
return expectedSchema;
754754
}
755755

756-
public static Map<String, SpannerTable> getSampleSpSchemaAllDatatypes() {
756+
private static Map<String, SpannerTable> getSampleSpSchemaAllDatatypes() {
757757
Map<String, SpannerTable> spSchema = new HashMap<String, SpannerTable>();
758758
Map<String, SpannerColumnDefinition> t1SpColDefs =
759759
new HashMap<String, SpannerColumnDefinition>();
@@ -813,7 +813,7 @@ public static Map<String, SpannerTable> getSampleSpSchemaAllDatatypes() {
813813
return spSchema;
814814
}
815815

816-
public static Map<String, NameAndCols> getSampleSpannerToIdAllDatatypes() {
816+
private static Map<String, NameAndCols> getSampleSpannerToIdAllDatatypes() {
817817
Map<String, NameAndCols> spannerToId = new HashMap<String, NameAndCols>();
818818
Map<String, String> t1ColIds = new HashMap<String, String>();
819819
t1ColIds.put("first_name", "c1");
@@ -834,7 +834,7 @@ public static Map<String, NameAndCols> getSampleSpannerToIdAllDatatypes() {
834834
return spannerToId;
835835
}
836836

837-
public static Schema getBotchedSchemaObjectForInvalidSpannerToOid() {
837+
private static Schema getBotchedSchemaObjectForInvalidSpannerToOid() {
838838
Map<String, SyntheticPKey> syntheticPKeys = new HashMap<String, SyntheticPKey>();
839839
Map<String, SourceTable> srcSchema = new HashMap<String, SourceTable>();
840840
Map<String, SpannerTable> spSchema = getSampleSpSchema();
@@ -844,7 +844,7 @@ public static Schema getBotchedSchemaObjectForInvalidSpannerToOid() {
844844
return expectedSchema;
845845
}
846846

847-
public static Schema getBotchedSchemaObjectForInvalidSpSchema() {
847+
private static Schema getBotchedSchemaObjectForInvalidSpSchema() {
848848
Map<String, SyntheticPKey> syntheticPKeys = new HashMap<String, SyntheticPKey>();
849849
Map<String, SourceTable> srcSchema = new HashMap<String, SourceTable>();
850850
Map<String, SpannerTable> spSchema = getBotchedSampleSpSchema();
@@ -854,7 +854,7 @@ public static Schema getBotchedSchemaObjectForInvalidSpSchema() {
854854
return expectedSchema;
855855
}
856856

857-
public static Schema getBotchedSchemaObjectForMissingShardColumn() {
857+
private static Schema getBotchedSchemaObjectForMissingShardColumn() {
858858
Map<String, SyntheticPKey> syntheticPKeys = new HashMap<String, SyntheticPKey>();
859859
Map<String, SourceTable> srcSchema = new HashMap<String, SourceTable>();
860860
Map<String, SpannerTable> spSchema = getBotchedSampleSpColmSchema();
@@ -864,7 +864,7 @@ public static Schema getBotchedSchemaObjectForMissingShardColumn() {
864864
return expectedSchema;
865865
}
866866

867-
public static Map<String, SpannerTable> getBotchedSampleSpSchema() {
867+
private static Map<String, SpannerTable> getBotchedSampleSpSchema() {
868868
Map<String, SpannerTable> spSchema = new HashMap<String, SpannerTable>();
869869
Map<String, SpannerColumnDefinition> t1SpColDefs =
870870
new HashMap<String, SpannerColumnDefinition>();
@@ -888,7 +888,7 @@ public static Map<String, SpannerTable> getBotchedSampleSpSchema() {
888888
return spSchema;
889889
}
890890

891-
public static Map<String, NameAndCols> getBotchedSampleSpannerToId() {
891+
private static Map<String, NameAndCols> getBotchedSampleSpannerToId() {
892892
Map<String, NameAndCols> spannerToId = new HashMap<String, NameAndCols>();
893893
Map<String, String> t1ColIds = new HashMap<String, String>();
894894
t1ColIds.put("accountId", "c1");
@@ -899,7 +899,7 @@ public static Map<String, NameAndCols> getBotchedSampleSpannerToId() {
899899
return spannerToId;
900900
}
901901

902-
public static Map<String, SpannerTable> getBotchedSampleSpColmSchema() {
902+
private static Map<String, SpannerTable> getBotchedSampleSpColmSchema() {
903903
Map<String, SpannerTable> spSchema = new HashMap<String, SpannerTable>();
904904
Map<String, SpannerColumnDefinition> t1SpColDefs =
905905
new HashMap<String, SpannerColumnDefinition>();

v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomShardIT/session.json

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44
"Name": "Singers",
55
"ColIds": [
66
"c3",
7-
"c4",
8-
"c36"
7+
"c4"
98
],
10-
"ShardIdColumn": "c36",
9+
"ShardIdColumn": "",
1110
"ColDefs": {
1211
"c3": {
1312
"Name": "SingerId",
@@ -20,17 +19,6 @@
2019
"Comment": "From: SingerId bigint(19)",
2120
"Id": "c3"
2221
},
23-
"c36": {
24-
"Name": "migration_shard_id",
25-
"T": {
26-
"Name": "STRING",
27-
"Len": 50,
28-
"IsArray": false
29-
},
30-
"NotNull": false,
31-
"Comment": "",
32-
"Id": "c36"
33-
},
3422
"c4": {
3523
"Name": "FirstName",
3624
"T": {
@@ -47,11 +35,6 @@
4735
{
4836
"ColId": "c3",
4937
"Desc": false,
50-
"Order": 2
51-
},
52-
{
53-
"ColId": "c36",
54-
"Desc": false,
5538
"Order": 1
5639
}
5740
],
@@ -129,9 +112,6 @@
129112
"t1": {
130113
"ColumnLevelIssues": {
131114
"c3": [],
132-
"c36": [
133-
28
134-
],
135115
"c4": []
136116
},
137117
"TableLevelIssues": null

v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomShardIT/spanner-schema.sql

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
CREATE TABLE IF NOT EXISTS Singers (
22
SingerId INT64 NOT NULL,
33
FirstName STRING(MAX),
4-
migration_shard_id STRING(50),
5-
) PRIMARY KEY(SingerId, migration_shard_id);
4+
) PRIMARY KEY(SingerId);
65

76
CREATE CHANGE STREAM allstream
87
FOR ALL OPTIONS (

0 commit comments

Comments
 (0)