Skip to content

Commit cd4c9b0

Browse files
feat(Spanner): Remove generated columns from change events during transformation and add tests for this behavior.
1 parent ea80a3b commit cd4c9b0

File tree

14 files changed

+773
-89
lines changed

14 files changed

+773
-89
lines changed

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.google.cloud.Timestamp;
2929
import com.google.cloud.spanner.Options;
3030
import com.google.cloud.spanner.SpannerException;
31+
import com.google.cloud.spanner.Statement;
3132
import com.google.cloud.spanner.TransactionRunner;
3233
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
3334
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException;
@@ -279,7 +280,7 @@ public void processElement(ProcessContext c) {
279280
c, changeEventContext, currentChangeEventSequence, shadowTableDdl, ddl);
280281
} else {
281282
processSingleDatabaseTransaction(
282-
c, changeEventContext, currentChangeEventSequence, shadowTableDdl);
283+
c, changeEventContext, currentChangeEventSequence, shadowTableDdl, ddl);
283284
}
284285
com.google.cloud.Timestamp timestamp = com.google.cloud.Timestamp.now();
285286
c.output(timestamp);
@@ -378,7 +379,8 @@ private void processSingleDatabaseTransaction(
378379
ProcessContext c,
379380
ChangeEventContext changeEventContext,
380381
ChangeEventSequence currentChangeEventSequence,
381-
Ddl shadowDdl) {
382+
Ddl shadowDdl,
383+
Ddl ddl) {
382384

383385
spannerAccessor
384386
.getDatabaseClient()
@@ -403,7 +405,14 @@ private void processSingleDatabaseTransaction(
403405
skippedEvents.inc();
404406
return null;
405407
}
406-
// Apply shadow and data table mutations.
408+
// Execute DML if applicable
409+
Statement dataDml = changeEventContext.getDataDmlStatement(ddl);
410+
411+
if (dataDml != null) {
412+
transaction.executeUpdate(dataDml);
413+
}
414+
415+
// Apply shadow and data table mutations (only if they exist)
407416
transaction.buffer(changeEventContext.getMutations());
408417
isInTransaction.set(false);
409418
return null;
@@ -473,6 +482,13 @@ void processCrossDatabaseTransaction(
473482
.run(
474483
(TransactionRunner.TransactionCallable<Void>)
475484
mainTxn -> {
485+
// Execute Data DML if applicable
486+
Statement dataDml =
487+
changeEventContext.getDataDmlStatement(dataTableDdl);
488+
if (dataDml != null) {
489+
mainTxn.executeUpdate(dataDml);
490+
}
491+
476492
// Read row from main table with lock scanned ranges to acquire
477493
// exclusive lock on the main table row.
478494
changeEventContext.readDataTableRowWithExclusiveLock(
@@ -505,7 +521,9 @@ void processCrossDatabaseTransaction(
505521
}
506522

507523
// Write to main table
508-
mainTxn.buffer(changeEventContext.getDataMutation());
524+
if (changeEventContext.getDataMutation() != null) {
525+
mainTxn.buffer(changeEventContext.getDataMutation());
526+
}
509527
return null;
510528
});
511529

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java

Lines changed: 84 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,19 @@
2121
import com.google.cloud.spanner.ResultSet;
2222
import com.google.cloud.spanner.Statement;
2323
import com.google.cloud.spanner.TransactionContext;
24+
import com.google.cloud.spanner.Value;
2425
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
26+
import com.google.cloud.teleport.v2.spanner.ddl.IndexColumn;
2527
import com.google.cloud.teleport.v2.spanner.ddl.Table;
2628
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventSpannerConvertor;
29+
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventTypeConvertor;
2730
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException;
2831
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException;
2932
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException;
3033
import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerReadUtils;
3134
import com.google.cloud.teleport.v2.templates.spanner.ShadowTableCreator;
3235
import com.google.common.collect.ImmutableMap;
33-
import java.util.Arrays;
36+
import java.util.ArrayList;
3437
import java.util.List;
3538
import java.util.Map;
3639
import java.util.Set;
@@ -107,23 +110,93 @@ protected void convertChangeEventToMutation(Ddl ddl, Ddl shadowTableDdl)
107110
ChangeEventConvertor.convertChangeEventColumnKeysToLowerCase(changeEvent);
108111
ChangeEventConvertor.verifySpannerSchema(ddl, changeEvent);
109112

113+
boolean hasGeneratedPK = false;
114+
Table table = ddl.table(this.dataTable);
115+
if (table != null) {
116+
hasGeneratedPK = hasGeneratedPK(table);
117+
}
118+
110119
this.primaryKey =
111120
ChangeEventSpannerConvertor.changeEventToPrimaryKey(
112121
changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(),
113122
ddl,
114123
changeEvent,
115124
/* convertNameToLowerCase= */ true);
116-
this.dataMutation = ChangeEventConvertor.changeEventToMutation(ddl, changeEvent);
125+
126+
String changeType = getChangeType(changeEvent);
127+
boolean isDelete = DatastreamConstants.DELETE_EVENT.equalsIgnoreCase(changeType);
128+
129+
if (hasGeneratedPK && isDelete) {
130+
// For delete events on tables with generated primary keys, we need to use DML
131+
// to delete the row.
132+
this.dataMutation = null;
133+
} else {
134+
this.dataMutation = ChangeEventConvertor.changeEventToMutation(ddl, changeEvent);
135+
}
136+
117137
this.shadowTableMutation = generateShadowTableMutation(ddl, shadowTableDdl);
118138
}
119139

140+
public Statement getDataDmlStatement(Ddl ddl) throws ChangeEventConvertorException {
141+
String changeType = getChangeType(changeEvent);
142+
boolean isDelete = DatastreamConstants.DELETE_EVENT.equalsIgnoreCase(changeType);
143+
if (!isDelete) {
144+
return null;
145+
}
146+
Table table = ddl.table(this.dataTable);
147+
if (table != null && hasGeneratedPK(table)) {
148+
return generateDeleteDml(table, this.dataTable, changeEvent);
149+
}
150+
return null;
151+
}
152+
153+
private boolean hasGeneratedPK(Table table) {
154+
for (IndexColumn keyColumn : table.primaryKeys()) {
155+
if (table.column(keyColumn.name()).isGenerated()) {
156+
return true;
157+
}
158+
}
159+
return false;
160+
}
161+
162+
private Statement generateDeleteDml(Table table, String tableName, JsonNode event)
163+
throws ChangeEventConvertorException {
164+
StringBuilder sql = new StringBuilder("DELETE FROM ").append(tableName).append(" WHERE ");
165+
Statement.Builder builder = Statement.newBuilder("");
166+
boolean first = true;
167+
for (com.google.cloud.teleport.v2.spanner.ddl.Column column : table.columns()) {
168+
String colName = column.name();
169+
if (column.isGenerated()) {
170+
continue;
171+
}
172+
if (!first) {
173+
sql.append(" AND ");
174+
}
175+
sql.append(colName).append(" = @").append(colName);
176+
// Bind value
177+
Value value =
178+
ChangeEventTypeConvertor.toValue(event, column.type(), colName, /* requiredField */ true);
179+
builder.bind(colName).to(value);
180+
first = false;
181+
}
182+
builder.replace(sql.toString());
183+
return builder.build();
184+
}
185+
120186
public JsonNode getChangeEvent() {
121187
return changeEvent;
122188
}
123189

124190
// Returns an array of data and shadow table mutations.
125191
public Iterable<Mutation> getMutations() {
126-
return Arrays.asList(dataMutation, shadowTableMutation);
192+
List<Mutation> mutations = new ArrayList<>();
193+
if (dataMutation != null) {
194+
mutations.add(dataMutation);
195+
}
196+
if (shadowTableMutation != null) {
197+
mutations.add(shadowTableMutation);
198+
}
199+
return mutations;
127200
}
128201

129202
// Returns the data table mutation
@@ -176,4 +249,12 @@ public void readDataTableRowWithExclusiveLock(
176249
// Read the row in order to acquire the lock and discard it.
177250
resultSet.getCurrentRowAsStruct();
178251
}
252+
253+
public static String getChangeType(JsonNode changeEvent) {
254+
return changeEvent.has(DatastreamConstants.EVENT_CHANGE_TYPE_KEY)
255+
? changeEvent
256+
.get(DatastreamConstants.EVENT_CHANGE_TYPE_KEY)
257+
.asText(DatastreamConstants.EMPTY_EVENT)
258+
: DatastreamConstants.EMPTY_EVENT;
259+
}
179260
}

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ Table constructShadowTable(Ddl informationSchema, String dataTableName, Dialect
7070
.filter(col -> primaryKeyColNames.contains(col.name()))
7171
.collect(Collectors.toList());
7272
for (Column col : primaryKeyCols) {
73+
// Generated expression should be removed from the shadow table columns.
74+
// This is requried as shadow table only contains primary key columns.
75+
col = col.toBuilder().isGenerated(false).generationExpression("").autoBuild();
7376
shadowTableBuilder.addColumn(col);
7477
}
7578

0 commit comments

Comments
 (0)