Skip to content

Commit 46e8839

Browse files
pratickchokhaniever
authored andcommitted
feat(Spanner): Remove generated columns from change events during transformation and add tests for this behavior. (GoogleCloudPlatform#3255)
1 parent 3215790 commit 46e8839

File tree

14 files changed

+807
-99
lines changed

14 files changed

+807
-99
lines changed

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.google.cloud.Timestamp;
2929
import com.google.cloud.spanner.Options;
3030
import com.google.cloud.spanner.SpannerException;
31+
import com.google.cloud.spanner.Statement;
3132
import com.google.cloud.spanner.TransactionRunner;
3233
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
3334
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException;
@@ -279,7 +280,7 @@ public void processElement(ProcessContext c) {
279280
c, changeEventContext, currentChangeEventSequence, shadowTableDdl, ddl);
280281
} else {
281282
processSingleDatabaseTransaction(
282-
c, changeEventContext, currentChangeEventSequence, shadowTableDdl);
283+
c, changeEventContext, currentChangeEventSequence, shadowTableDdl, ddl);
283284
}
284285
com.google.cloud.Timestamp timestamp = com.google.cloud.Timestamp.now();
285286
c.output(timestamp);
@@ -378,7 +379,8 @@ private void processSingleDatabaseTransaction(
378379
ProcessContext c,
379380
ChangeEventContext changeEventContext,
380381
ChangeEventSequence currentChangeEventSequence,
381-
Ddl shadowDdl) {
382+
Ddl shadowDdl,
383+
Ddl ddl) {
382384

383385
spannerAccessor
384386
.getDatabaseClient()
@@ -403,7 +405,14 @@ private void processSingleDatabaseTransaction(
403405
skippedEvents.inc();
404406
return null;
405407
}
406-
// Apply shadow and data table mutations.
408+
// Execute DML if applicable
409+
Statement dataDml = changeEventContext.getDataDmlStatement(ddl);
410+
411+
if (dataDml != null) {
412+
transaction.executeUpdate(dataDml);
413+
}
414+
415+
// Apply shadow and data table mutations (only if they exist)
407416
transaction.buffer(changeEventContext.getMutations());
408417
isInTransaction.set(false);
409418
return null;
@@ -504,8 +513,17 @@ void processCrossDatabaseTransaction(
504513
"Shadow table sequence changed during transaction");
505514
}
506515

516+
// Execute Data DML if applicable
517+
Statement dataDml =
518+
changeEventContext.getDataDmlStatement(dataTableDdl);
519+
if (dataDml != null) {
520+
mainTxn.executeUpdate(dataDml);
521+
}
522+
507523
// Write to main table
508-
mainTxn.buffer(changeEventContext.getDataMutation());
524+
if (changeEventContext.getDataMutation() != null) {
525+
mainTxn.buffer(changeEventContext.getDataMutation());
526+
}
509527
return null;
510528
});
511529

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java

Lines changed: 85 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,19 @@
2121
import com.google.cloud.spanner.ResultSet;
2222
import com.google.cloud.spanner.Statement;
2323
import com.google.cloud.spanner.TransactionContext;
24+
import com.google.cloud.spanner.Value;
2425
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
26+
import com.google.cloud.teleport.v2.spanner.ddl.IndexColumn;
2527
import com.google.cloud.teleport.v2.spanner.ddl.Table;
2628
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventSpannerConvertor;
29+
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventTypeConvertor;
2730
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException;
2831
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException;
2932
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException;
3033
import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerReadUtils;
3134
import com.google.cloud.teleport.v2.templates.spanner.ShadowTableCreator;
3235
import com.google.common.collect.ImmutableMap;
33-
import java.util.Arrays;
36+
import java.util.ArrayList;
3437
import java.util.List;
3538
import java.util.Map;
3639
import java.util.Set;
@@ -107,23 +110,94 @@ protected void convertChangeEventToMutation(Ddl ddl, Ddl shadowTableDdl)
107110
ChangeEventConvertor.convertChangeEventColumnKeysToLowerCase(changeEvent);
108111
ChangeEventConvertor.verifySpannerSchema(ddl, changeEvent);
109112

113+
boolean hasGeneratedPK = false;
114+
Table table = ddl.table(this.dataTable);
115+
if (table != null) {
116+
hasGeneratedPK = hasGeneratedPK(table);
117+
}
118+
110119
this.primaryKey =
111120
ChangeEventSpannerConvertor.changeEventToPrimaryKey(
112121
changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(),
113122
ddl,
114123
changeEvent,
115124
/* convertNameToLowerCase= */ true);
116-
this.dataMutation = ChangeEventConvertor.changeEventToMutation(ddl, changeEvent);
125+
126+
String changeType = getChangeType(changeEvent);
127+
boolean isDelete = DatastreamConstants.DELETE_EVENT.equalsIgnoreCase(changeType);
128+
129+
if (hasGeneratedPK && isDelete) {
130+
// For delete events on tables with generated primary keys, we need to use DML
131+
// to delete the row.
132+
this.dataMutation = null;
133+
} else {
134+
this.dataMutation = ChangeEventConvertor.changeEventToMutation(ddl, changeEvent);
135+
}
136+
117137
this.shadowTableMutation = generateShadowTableMutation(ddl, shadowTableDdl);
118138
}
119139

140+
public Statement getDataDmlStatement(Ddl ddl) throws ChangeEventConvertorException {
141+
String changeType = getChangeType(changeEvent);
142+
boolean isDelete = DatastreamConstants.DELETE_EVENT.equalsIgnoreCase(changeType);
143+
if (!isDelete) {
144+
return null;
145+
}
146+
Table table = ddl.table(this.dataTable);
147+
if (table != null && hasGeneratedPK(table)) {
148+
return generateDeleteDml(table, this.dataTable, changeEvent);
149+
}
150+
return null;
151+
}
152+
153+
private boolean hasGeneratedPK(Table table) {
154+
for (IndexColumn keyColumn : table.primaryKeys()) {
155+
if (table.column(keyColumn.name()).isGenerated()) {
156+
return true;
157+
}
158+
}
159+
return false;
160+
}
161+
162+
private Statement generateDeleteDml(Table table, String tableName, JsonNode event)
163+
throws ChangeEventConvertorException {
164+
// TODO: Add support for PostgreSQL
165+
StringBuilder sql = new StringBuilder("DELETE FROM ").append(tableName).append(" WHERE ");
166+
Statement.Builder builder = Statement.newBuilder("");
167+
boolean first = true;
168+
for (com.google.cloud.teleport.v2.spanner.ddl.Column column : table.columns()) {
169+
String colName = column.name();
170+
if (column.isGenerated()) {
171+
continue;
172+
}
173+
if (!first) {
174+
sql.append(" AND ");
175+
}
176+
sql.append(colName).append(" = @").append(colName);
177+
// Bind value
178+
Value value =
179+
ChangeEventTypeConvertor.toValue(event, column.type(), colName, /* requiredField */ true);
180+
builder.bind(colName).to(value);
181+
first = false;
182+
}
183+
builder.replace(sql.toString());
184+
return builder.build();
185+
}
186+
120187
public JsonNode getChangeEvent() {
121188
return changeEvent;
122189
}
123190

124191
// Returns an array of data and shadow table mutations.
125192
public Iterable<Mutation> getMutations() {
126-
return Arrays.asList(dataMutation, shadowTableMutation);
193+
List<Mutation> mutations = new ArrayList<>();
194+
if (dataMutation != null) {
195+
mutations.add(dataMutation);
196+
}
197+
if (shadowTableMutation != null) {
198+
mutations.add(shadowTableMutation);
199+
}
200+
return mutations;
127201
}
128202

129203
// Returns the data table mutation
@@ -176,4 +250,12 @@ public void readDataTableRowWithExclusiveLock(
176250
// Read the row in order to acquire the lock and discard it.
177251
resultSet.getCurrentRowAsStruct();
178252
}
253+
254+
public static String getChangeType(JsonNode changeEvent) {
255+
return changeEvent.has(DatastreamConstants.EVENT_CHANGE_TYPE_KEY)
256+
? changeEvent
257+
.get(DatastreamConstants.EVENT_CHANGE_TYPE_KEY)
258+
.asText(DatastreamConstants.EMPTY_EVENT)
259+
: DatastreamConstants.EMPTY_EVENT;
260+
}
179261
}

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ Table constructShadowTable(Ddl informationSchema, String dataTableName, Dialect
7070
.filter(col -> primaryKeyColNames.contains(col.name()))
7171
.collect(Collectors.toList());
7272
for (Column col : primaryKeyCols) {
73+
// In Shadow table we only have primary keys. If primary key is dependent on
74+
// non-primary key column, then shadow table creation will fail.
75+
// Hence, generated expression should be removed from the shadow table columns.
76+
col = col.toBuilder().isGenerated(false).generationExpression("").autoBuild();
7377
shadowTableBuilder.addColumn(col);
7478
}
7579

0 commit comments

Comments
 (0)