Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.google.cloud.teleport.v2.spanner.ddl.IndexColumn;
import com.google.cloud.teleport.v2.spanner.ddl.Table;
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SpannerTable;
import com.google.cloud.teleport.v2.spanner.type.Type;
import com.google.cloud.teleport.v2.spanner.utils.IShardIdFetcher;
import com.google.cloud.teleport.v2.spanner.utils.ShardIdRequest;
Expand Down Expand Up @@ -188,8 +187,9 @@ public void processElement(ProcessContext c) throws Exception {
qualifiedShard = this.shardName;
} else {
// Skip from processing if table not in session File
String shardIdColumn = getShardIdColumnForTableName(tableName);
if (shardIdColumn.isEmpty()) {
// TODO: remove dependency on session file when session file is made optional
boolean doesTableExist = doesTableExistInSessionFile(tableName);
if (!doesTableExist) {
LOG.warn(
"Writing record for table {} to skipped directory name {} since table not present in"
+ " the session file.",
Expand Down Expand Up @@ -450,27 +450,14 @@ private Object getColumnValueFromRow(Column column, Value value) throws Exceptio
}
}

private String getShardIdColumnForTableName(String tableName) throws IllegalArgumentException {
if (!schema.getSpannerToID().containsKey(tableName)) {
LOG.warn(
"Table {} found in change record but not found in session file. Skipping record",
tableName);
return "";
private boolean doesTableExistInSessionFile(String tableName) throws IllegalArgumentException {
if (schema.getSpannerToID().containsKey(tableName)) {
return true;
}
String tableId = schema.getSpannerToID().get(tableName).getName();
if (!schema.getSpSchema().containsKey(tableId)) {
LOG.warn("Table {} not found in session file. Skipping record.", tableId);
return "";
}
SpannerTable spTable = schema.getSpSchema().get(tableId);
String shardColId = spTable.getShardIdColumn();
if (!spTable.getColDefs().containsKey(shardColId)) {
throw new IllegalArgumentException(
"ColumnId "
+ shardColId
+ " not found in session file. Please provide a valid session file.");
}
return spTable.getColDefs().get(shardColId).getName();
LOG.warn(
"Table {} found in change record but not found in session file. Skipping record",
tableName);
return false;
}

private Map<String, Object> getSpannerRecordFromChangeStreamData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,6 @@ private void writeSpannerDataForSingers(int singerId, String firstName, String s
.to(singerId)
.set("FirstName")
.to(firstName)
.set("migration_shard_id")
.to(shardId)
.build();
spannerResourceManager.write(m);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ public static Map<String, SpannerTable> getSampleSpSchema() {
return spSchema;
}

public static Map<String, NameAndCols> getSampleSpannerToId() {
private static Map<String, NameAndCols> getSampleSpannerToId() {
Map<String, NameAndCols> spannerToId = new HashMap<String, NameAndCols>();
Map<String, String> t1ColIds = new HashMap<String, String>();
t1ColIds.put("accountId", "c1");
Expand All @@ -654,7 +654,7 @@ public static Map<String, NameAndCols> getSampleSpannerToId() {
return spannerToId;
}

static Ddl getTestDdl() {
private static Ddl getTestDdl() {
Ddl ddl =
Ddl.builder()
.createTable("tableName")
Expand All @@ -678,7 +678,7 @@ static Ddl getTestDdl() {
return ddl;
}

static Ddl getTestDdlForPrimaryKeyTest() {
private static Ddl getTestDdlForPrimaryKeyTest() {

Ddl ddl =
Ddl.builder()
Expand Down Expand Up @@ -743,7 +743,7 @@ static Ddl getTestDdlForPrimaryKeyTest() {
return ddl;
}

public static Schema getSchemaObjectAllDatatypes() {
private static Schema getSchemaObjectAllDatatypes() {
Map<String, SyntheticPKey> syntheticPKeys = new HashMap<String, SyntheticPKey>();
Map<String, SourceTable> srcSchema = new HashMap<String, SourceTable>();
Map<String, SpannerTable> spSchema = getSampleSpSchemaAllDatatypes();
Expand All @@ -753,7 +753,7 @@ public static Schema getSchemaObjectAllDatatypes() {
return expectedSchema;
}

public static Map<String, SpannerTable> getSampleSpSchemaAllDatatypes() {
private static Map<String, SpannerTable> getSampleSpSchemaAllDatatypes() {
Map<String, SpannerTable> spSchema = new HashMap<String, SpannerTable>();
Map<String, SpannerColumnDefinition> t1SpColDefs =
new HashMap<String, SpannerColumnDefinition>();
Expand Down Expand Up @@ -813,7 +813,7 @@ public static Map<String, SpannerTable> getSampleSpSchemaAllDatatypes() {
return spSchema;
}

public static Map<String, NameAndCols> getSampleSpannerToIdAllDatatypes() {
private static Map<String, NameAndCols> getSampleSpannerToIdAllDatatypes() {
Map<String, NameAndCols> spannerToId = new HashMap<String, NameAndCols>();
Map<String, String> t1ColIds = new HashMap<String, String>();
t1ColIds.put("first_name", "c1");
Expand All @@ -834,7 +834,7 @@ public static Map<String, NameAndCols> getSampleSpannerToIdAllDatatypes() {
return spannerToId;
}

public static Schema getBotchedSchemaObjectForInvalidSpannerToOid() {
private static Schema getBotchedSchemaObjectForInvalidSpannerToOid() {
Map<String, SyntheticPKey> syntheticPKeys = new HashMap<String, SyntheticPKey>();
Map<String, SourceTable> srcSchema = new HashMap<String, SourceTable>();
Map<String, SpannerTable> spSchema = getSampleSpSchema();
Expand All @@ -844,7 +844,7 @@ public static Schema getBotchedSchemaObjectForInvalidSpannerToOid() {
return expectedSchema;
}

public static Schema getBotchedSchemaObjectForInvalidSpSchema() {
private static Schema getBotchedSchemaObjectForInvalidSpSchema() {
Map<String, SyntheticPKey> syntheticPKeys = new HashMap<String, SyntheticPKey>();
Map<String, SourceTable> srcSchema = new HashMap<String, SourceTable>();
Map<String, SpannerTable> spSchema = getBotchedSampleSpSchema();
Expand All @@ -854,7 +854,7 @@ public static Schema getBotchedSchemaObjectForInvalidSpSchema() {
return expectedSchema;
}

public static Schema getBotchedSchemaObjectForMissingShardColumn() {
private static Schema getBotchedSchemaObjectForMissingShardColumn() {
Map<String, SyntheticPKey> syntheticPKeys = new HashMap<String, SyntheticPKey>();
Map<String, SourceTable> srcSchema = new HashMap<String, SourceTable>();
Map<String, SpannerTable> spSchema = getBotchedSampleSpColmSchema();
Expand All @@ -864,7 +864,7 @@ public static Schema getBotchedSchemaObjectForMissingShardColumn() {
return expectedSchema;
}

public static Map<String, SpannerTable> getBotchedSampleSpSchema() {
private static Map<String, SpannerTable> getBotchedSampleSpSchema() {
Map<String, SpannerTable> spSchema = new HashMap<String, SpannerTable>();
Map<String, SpannerColumnDefinition> t1SpColDefs =
new HashMap<String, SpannerColumnDefinition>();
Expand All @@ -888,7 +888,7 @@ public static Map<String, SpannerTable> getBotchedSampleSpSchema() {
return spSchema;
}

public static Map<String, NameAndCols> getBotchedSampleSpannerToId() {
private static Map<String, NameAndCols> getBotchedSampleSpannerToId() {
Map<String, NameAndCols> spannerToId = new HashMap<String, NameAndCols>();
Map<String, String> t1ColIds = new HashMap<String, String>();
t1ColIds.put("accountId", "c1");
Expand All @@ -899,7 +899,7 @@ public static Map<String, NameAndCols> getBotchedSampleSpannerToId() {
return spannerToId;
}

public static Map<String, SpannerTable> getBotchedSampleSpColmSchema() {
private static Map<String, SpannerTable> getBotchedSampleSpColmSchema() {
Map<String, SpannerTable> spSchema = new HashMap<String, SpannerTable>();
Map<String, SpannerColumnDefinition> t1SpColDefs =
new HashMap<String, SpannerColumnDefinition>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
"Name": "Singers",
"ColIds": [
"c3",
"c4",
"c36"
"c4"
],
"ShardIdColumn": "c36",
"ShardIdColumn": "",
"ColDefs": {
"c3": {
"Name": "SingerId",
Expand All @@ -20,17 +19,6 @@
"Comment": "From: SingerId bigint(19)",
"Id": "c3"
},
"c36": {
"Name": "migration_shard_id",
"T": {
"Name": "STRING",
"Len": 50,
"IsArray": false
},
"NotNull": false,
"Comment": "",
"Id": "c36"
},
"c4": {
"Name": "FirstName",
"T": {
Expand All @@ -47,11 +35,6 @@
{
"ColId": "c3",
"Desc": false,
"Order": 2
},
{
"ColId": "c36",
"Desc": false,
"Order": 1
}
],
Expand Down Expand Up @@ -129,9 +112,6 @@
"t1": {
"ColumnLevelIssues": {
"c3": [],
"c36": [
28
],
"c4": []
},
"TableLevelIssues": null
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
CREATE TABLE IF NOT EXISTS Singers (
SingerId INT64 NOT NULL,
FirstName STRING(MAX),
migration_shard_id STRING(50),
) PRIMARY KEY(SingerId, migration_shard_id);
) PRIMARY KEY(SingerId);

CREATE CHANGE STREAM allstream
FOR ALL OPTIONS (
Expand Down
Loading