Skip to content

Commit 8c90808

Browse files
feat(Spanner): Remove generated columns from change events during transformation and add tests for this behavior.
1 parent cd616f0 commit 8c90808

File tree

12 files changed

+465
-84
lines changed

12 files changed

+465
-84
lines changed

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.google.cloud.Timestamp;
2929
import com.google.cloud.spanner.Options;
3030
import com.google.cloud.spanner.SpannerException;
31+
import com.google.cloud.spanner.Statement;
3132
import com.google.cloud.spanner.TransactionRunner;
3233
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
3334
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException;
@@ -279,7 +280,7 @@ public void processElement(ProcessContext c) {
279280
c, changeEventContext, currentChangeEventSequence, shadowTableDdl, ddl);
280281
} else {
281282
processSingleDatabaseTransaction(
282-
c, changeEventContext, currentChangeEventSequence, shadowTableDdl);
283+
c, changeEventContext, currentChangeEventSequence, shadowTableDdl, ddl);
283284
}
284285
com.google.cloud.Timestamp timestamp = com.google.cloud.Timestamp.now();
285286
c.output(timestamp);
@@ -378,7 +379,8 @@ private void processSingleDatabaseTransaction(
378379
ProcessContext c,
379380
ChangeEventContext changeEventContext,
380381
ChangeEventSequence currentChangeEventSequence,
381-
Ddl shadowDdl) {
382+
Ddl shadowDdl,
383+
Ddl ddl) {
382384

383385
spannerAccessor
384386
.getDatabaseClient()
@@ -403,7 +405,14 @@ private void processSingleDatabaseTransaction(
403405
skippedEvents.inc();
404406
return null;
405407
}
406-
// Apply shadow and data table mutations.
408+
// Execute DML if applicable
409+
Statement dataDml = changeEventContext.getDataDmlStatement(ddl);
410+
411+
if (dataDml != null) {
412+
transaction.executeUpdate(dataDml);
413+
}
414+
415+
// Apply shadow and data table mutations (only if they exist)
407416
transaction.buffer(changeEventContext.getMutations());
408417
isInTransaction.set(false);
409418
return null;
@@ -473,6 +482,13 @@ void processCrossDatabaseTransaction(
473482
.run(
474483
(TransactionRunner.TransactionCallable<Void>)
475484
mainTxn -> {
485+
// Execute Data DML if applicable
486+
Statement dataDml =
487+
changeEventContext.getDataDmlStatement(dataTableDdl);
488+
if (dataDml != null) {
489+
mainTxn.executeUpdate(dataDml);
490+
}
491+
476492
// Read row from main table with lock scanned ranges to acquire
477493
// exclusive lock on the main table row.
478494
changeEventContext.readDataTableRowWithExclusiveLock(
@@ -505,7 +521,9 @@ void processCrossDatabaseTransaction(
505521
}
506522

507523
// Write to main table
508-
mainTxn.buffer(changeEventContext.getDataMutation());
524+
if (changeEventContext.getDataMutation() != null) {
525+
mainTxn.buffer(changeEventContext.getDataMutation());
526+
}
509527
return null;
510528
});
511529

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java

Lines changed: 89 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,18 @@
2121
import com.google.cloud.spanner.ResultSet;
2222
import com.google.cloud.spanner.Statement;
2323
import com.google.cloud.spanner.TransactionContext;
24+
import com.google.cloud.spanner.Value;
2425
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
26+
import com.google.cloud.teleport.v2.spanner.ddl.IndexColumn;
2527
import com.google.cloud.teleport.v2.spanner.ddl.Table;
2628
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventSpannerConvertor;
29+
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventTypeConvertor;
2730
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException;
2831
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException;
2932
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException;
3033
import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerReadUtils;
3134
import com.google.cloud.teleport.v2.templates.spanner.ShadowTableCreator;
3235
import com.google.common.collect.ImmutableMap;
33-
import java.util.Arrays;
3436
import java.util.List;
3537
import java.util.Map;
3638
import java.util.Set;
@@ -107,23 +109,93 @@ protected void convertChangeEventToMutation(Ddl ddl, Ddl shadowTableDdl)
107109
ChangeEventConvertor.convertChangeEventColumnKeysToLowerCase(changeEvent);
108110
ChangeEventConvertor.verifySpannerSchema(ddl, changeEvent);
109111

110-
this.primaryKey =
111-
ChangeEventSpannerConvertor.changeEventToPrimaryKey(
112-
changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(),
113-
ddl,
114-
changeEvent,
115-
/* convertNameToLowerCase= */ true);
116-
this.dataMutation = ChangeEventConvertor.changeEventToMutation(ddl, changeEvent);
112+
boolean hasGeneratedPK = false;
113+
Table table = ddl.table(this.dataTable);
114+
if (table != null) {
115+
hasGeneratedPK = hasGeneratedPK(table);
116+
}
117+
118+
String changeType = getChangeType(changeEvent);
119+
boolean isDelete = DatastreamConstants.DELETE_EVENT.equalsIgnoreCase(changeType);
120+
121+
if (hasGeneratedPK && isDelete) {
122+
// For delete events on tables with generated primary keys, we need to use DML
123+
// to delete the row.
124+
this.primaryKey = null;
125+
this.dataMutation = null;
126+
} else {
127+
this.primaryKey =
128+
ChangeEventSpannerConvertor.changeEventToPrimaryKey(
129+
changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(),
130+
ddl,
131+
changeEvent,
132+
/* convertNameToLowerCase= */ true);
133+
this.dataMutation = ChangeEventConvertor.changeEventToMutation(ddl, changeEvent);
134+
}
135+
117136
this.shadowTableMutation = generateShadowTableMutation(ddl, shadowTableDdl);
118137
}
119138

139+
public Statement getDataDmlStatement(Ddl ddl) throws ChangeEventConvertorException {
140+
String changeType = getChangeType(changeEvent);
141+
boolean isDelete = DatastreamConstants.DELETE_EVENT.equalsIgnoreCase(changeType);
142+
if (!isDelete) {
143+
return null;
144+
}
145+
Table table = ddl.table(this.dataTable);
146+
if (table != null && hasGeneratedPK(table)) {
147+
return generateDeleteDml(table, this.dataTable, changeEvent);
148+
}
149+
return null;
150+
}
151+
152+
private boolean hasGeneratedPK(Table table) {
153+
for (IndexColumn keyColumn : table.primaryKeys()) {
154+
if (table.column(keyColumn.name()).isGenerated()) {
155+
return true;
156+
}
157+
}
158+
return false;
159+
}
160+
161+
private Statement generateDeleteDml(Table table, String tableName, JsonNode event)
162+
throws ChangeEventConvertorException {
163+
StringBuilder sql = new StringBuilder("DELETE FROM ").append(tableName).append(" WHERE ");
164+
Statement.Builder builder = Statement.newBuilder("");
165+
boolean first = true;
166+
for (com.google.cloud.teleport.v2.spanner.ddl.Column column : table.columns()) {
167+
String colName = column.name();
168+
if (column.isGenerated()) {
169+
continue;
170+
}
171+
if (!first) {
172+
sql.append(" AND ");
173+
}
174+
sql.append(colName).append(" = @").append(colName);
175+
// Bind value
176+
Value value =
177+
ChangeEventTypeConvertor.toValue(event, column.type(), colName, /* requiredField */ true);
178+
builder.bind(colName).to(value);
179+
first = false;
180+
}
181+
builder.replace(sql.toString());
182+
return builder.build();
183+
}
184+
120185
public JsonNode getChangeEvent() {
121186
return changeEvent;
122187
}
123188

124189
// Returns an array of data and shadow table mutations.
125190
public Iterable<Mutation> getMutations() {
126-
return Arrays.asList(dataMutation, shadowTableMutation);
191+
java.util.List<Mutation> mutations = new java.util.ArrayList<>();
192+
if (dataMutation != null) {
193+
mutations.add(dataMutation);
194+
}
195+
if (shadowTableMutation != null) {
196+
mutations.add(shadowTableMutation);
197+
}
198+
return mutations;
127199
}
128200

129201
// Returns the data table mutation
@@ -176,4 +248,12 @@ public void readDataTableRowWithExclusiveLock(
176248
// Read the row in order to acquire the lock and discard it.
177249
resultSet.getCurrentRowAsStruct();
178250
}
251+
252+
public static String getChangeType(JsonNode changeEvent) {
253+
return changeEvent.has(DatastreamConstants.EVENT_CHANGE_TYPE_KEY)
254+
? changeEvent
255+
.get(DatastreamConstants.EVENT_CHANGE_TYPE_KEY)
256+
.asText(DatastreamConstants.EMPTY_EVENT)
257+
: DatastreamConstants.EMPTY_EVENT;
258+
}
179259
}

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLDatastreamToSpannerDataTypesIT.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.Set;
34+
import kotlin.Pair;
3435
import org.apache.beam.it.common.PipelineLauncher;
3536
import org.apache.beam.it.common.PipelineOperator;
3637
import org.apache.beam.it.common.utils.ResourceManagerUtils;
@@ -292,6 +293,19 @@ private List<Map<String, Object>> createRows(String colPrefix, Object... values)
292293
return rows;
293294
}
294295

296+
private List<Map<String, Object>> createMultiColumnRows(
297+
List<List<Pair<String, Object>>> rows_values) {
298+
List<Map<String, Object>> rows = new ArrayList<>();
299+
for (List<Pair<String, Object>> row_values : rows_values) {
300+
Map<String, Object> row = new HashMap<>();
301+
for (Pair<String, Object> col_value : row_values) {
302+
row.put(col_value.getFirst(), col_value.getSecond());
303+
}
304+
rows.add(row);
305+
}
306+
return rows;
307+
}
308+
295309
private List<String> getAllowedTables() {
296310
Map<String, List<Map<String, Object>>> expectedData = getExpectedData();
297311
List<String> tableNames = new ArrayList<>(expectedData.size() + UNSUPPORTED_TYPE_TABLES.size());
@@ -579,6 +593,46 @@ private Map<String, List<Map<String, Object>>> getExpectedData() {
579593
expectedData.put("set", createRows("set", "v1,v2", "NULL"));
580594
expectedData.put(
581595
"integer_unsigned", createRows("integer_unsigned", "0", "42", "4294967295", "NULL"));
596+
597+
// Generated Column data
598+
expectedData.put(
599+
"generated_pk_column",
600+
createMultiColumnRows(
601+
Arrays.asList(
602+
Arrays.asList(
603+
new Pair<>("first_name", "AA"),
604+
new Pair<>("last_name", "BB"),
605+
new Pair<>("generated_column", "AA ")))));
606+
607+
expectedData.put(
608+
"generated_non_pk_column",
609+
createMultiColumnRows(
610+
Arrays.asList(
611+
Arrays.asList(
612+
new Pair<>("id", 1),
613+
new Pair<>("first_name", "AA"),
614+
new Pair<>("last_name", "BB"),
615+
new Pair<>("generated_column", "AA ")))));
616+
617+
expectedData.put(
618+
"non_generated_to_generated_column",
619+
createMultiColumnRows(
620+
Arrays.asList(
621+
Arrays.asList(
622+
new Pair<>("first_name", "AA"),
623+
new Pair<>("last_name", "BB"),
624+
new Pair<>("generated_column", "AA "),
625+
new Pair<>("generated_column_pk", "AA ")))));
626+
627+
expectedData.put(
628+
"generated_to_non_generated_column",
629+
createMultiColumnRows(
630+
Arrays.asList(
631+
Arrays.asList(
632+
new Pair<>("first_name", "AA"),
633+
new Pair<>("last_name", "BB"),
634+
new Pair<>("generated_column", "AA "),
635+
new Pair<>("generated_column_pk", "AA ")))));
582636
return expectedData;
583637
}
584638

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContextTest.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,4 +295,76 @@ public void testReadDataTable() throws Exception {
295295
verify(resultSet, times(1)).next();
296296
verify(resultSet, times(0)).getCurrentRowAsStruct();
297297
}
298+
299+
@Test
300+
public void testCanGenerateDataDmlStatement() throws Exception {
301+
long eventTimestamp = 1615159728L;
302+
// DDL with generated PK
303+
Ddl ddl =
304+
Ddl.builder()
305+
.createTable("Users")
306+
.column("id")
307+
.string()
308+
.max()
309+
.generatedAs("uuid()")
310+
.endColumn()
311+
.column("first_name")
312+
.string()
313+
.max()
314+
.endColumn()
315+
.column("last_name")
316+
.string()
317+
.max()
318+
.endColumn()
319+
.primaryKey()
320+
.asc("id")
321+
.end()
322+
.endTable()
323+
.createTable("shadow_Users")
324+
.column("id")
325+
.string()
326+
.max()
327+
.endColumn()
328+
.column("first_name")
329+
.string()
330+
.max()
331+
.endColumn()
332+
.column("last_name")
333+
.string()
334+
.max()
335+
.endColumn()
336+
.primaryKey()
337+
.asc("id")
338+
.end()
339+
.endTable()
340+
.build();
341+
342+
JSONObject changeEvent = new JSONObject();
343+
changeEvent.put(DatastreamConstants.EVENT_TABLE_NAME_KEY, "Users");
344+
changeEvent.put(DatastreamConstants.EVENT_CHANGE_TYPE_KEY, DatastreamConstants.DELETE_EVENT);
345+
changeEvent.put(
346+
DatastreamConstants.EVENT_SOURCE_TYPE_KEY, DatastreamConstants.MYSQL_SOURCE_TYPE);
347+
changeEvent.put(DatastreamConstants.MYSQL_TIMESTAMP_KEY, eventTimestamp);
348+
// Source keys (mapped to non-generated columns in Spanner)
349+
changeEvent.put("first_name", "John");
350+
changeEvent.put("last_name", "Doe");
351+
changeEvent.put("id", "aa");
352+
353+
ChangeEventContext context =
354+
ChangeEventContextFactory.createChangeEventContext(
355+
getJsonNode(changeEvent.toString()),
356+
ddl,
357+
ddl,
358+
"shadow_",
359+
DatastreamConstants.MYSQL_SOURCE_TYPE);
360+
361+
Statement dmlStatement = context.getDataDmlStatement(ddl);
362+
363+
// Verify DML generation
364+
String expectedSql =
365+
"DELETE FROM Users WHERE first_name = @first_name AND last_name = @last_name";
366+
assertEquals(expectedSql, dmlStatement.getSql());
367+
assertEquals("John", dmlStatement.getParameters().get("first_name").getString());
368+
assertEquals("Doe", dmlStatement.getParameters().get("last_name").getString());
369+
}
298370
}

0 commit comments

Comments
 (0)