Skip to content

Commit dc66812

Browse files
feat(Spanner): Remove generated columns from change events during transformation and add tests for this behavior.
1 parent cd616f0 commit dc66812

File tree

8 files changed

+334
-84
lines changed

8 files changed

+334
-84
lines changed

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.google.cloud.Timestamp;
2929
import com.google.cloud.spanner.Options;
3030
import com.google.cloud.spanner.SpannerException;
31+
import com.google.cloud.spanner.Statement;
3132
import com.google.cloud.spanner.TransactionRunner;
3233
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
3334
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException;
@@ -279,7 +280,7 @@ public void processElement(ProcessContext c) {
279280
c, changeEventContext, currentChangeEventSequence, shadowTableDdl, ddl);
280281
} else {
281282
processSingleDatabaseTransaction(
282-
c, changeEventContext, currentChangeEventSequence, shadowTableDdl);
283+
c, changeEventContext, currentChangeEventSequence, shadowTableDdl, ddl);
283284
}
284285
com.google.cloud.Timestamp timestamp = com.google.cloud.Timestamp.now();
285286
c.output(timestamp);
@@ -378,7 +379,8 @@ private void processSingleDatabaseTransaction(
378379
ProcessContext c,
379380
ChangeEventContext changeEventContext,
380381
ChangeEventSequence currentChangeEventSequence,
381-
Ddl shadowDdl) {
382+
Ddl shadowDdl,
383+
Ddl ddl) {
382384

383385
spannerAccessor
384386
.getDatabaseClient()
@@ -403,7 +405,14 @@ private void processSingleDatabaseTransaction(
403405
skippedEvents.inc();
404406
return null;
405407
}
406-
// Apply shadow and data table mutations.
408+
// Execute DML if applicable
409+
Statement dataDml = changeEventContext.getDataDmlStatement(ddl);
410+
411+
if (dataDml != null) {
412+
transaction.executeUpdate(dataDml);
413+
}
414+
415+
// Apply shadow and data table mutations (only if they exist)
407416
transaction.buffer(changeEventContext.getMutations());
408417
isInTransaction.set(false);
409418
return null;
@@ -473,6 +482,13 @@ void processCrossDatabaseTransaction(
473482
.run(
474483
(TransactionRunner.TransactionCallable<Void>)
475484
mainTxn -> {
485+
// Execute Data DML if applicable
486+
Statement dataDml =
487+
changeEventContext.getDataDmlStatement(dataTableDdl);
488+
if (dataDml != null) {
489+
mainTxn.executeUpdate(dataDml);
490+
}
491+
476492
// Read row from main table with lock scanned ranges to acquire
477493
// exclusive lock on the main table row.
478494
changeEventContext.readDataTableRowWithExclusiveLock(
@@ -505,7 +521,9 @@ void processCrossDatabaseTransaction(
505521
}
506522

507523
// Write to main table
508-
mainTxn.buffer(changeEventContext.getDataMutation());
524+
if (changeEventContext.getDataMutation() != null) {
525+
mainTxn.buffer(changeEventContext.getDataMutation());
526+
}
509527
return null;
510528
});
511529

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java

Lines changed: 87 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,18 @@
2121
import com.google.cloud.spanner.ResultSet;
2222
import com.google.cloud.spanner.Statement;
2323
import com.google.cloud.spanner.TransactionContext;
24+
import com.google.cloud.spanner.Value;
2425
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
26+
import com.google.cloud.teleport.v2.spanner.ddl.IndexColumn;
2527
import com.google.cloud.teleport.v2.spanner.ddl.Table;
2628
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventSpannerConvertor;
29+
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventTypeConvertor;
2730
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException;
2831
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException;
2932
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException;
3033
import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerReadUtils;
3134
import com.google.cloud.teleport.v2.templates.spanner.ShadowTableCreator;
3235
import com.google.common.collect.ImmutableMap;
33-
import java.util.Arrays;
3436
import java.util.List;
3537
import java.util.Map;
3638
import java.util.Set;
@@ -107,23 +109,91 @@ protected void convertChangeEventToMutation(Ddl ddl, Ddl shadowTableDdl)
107109
ChangeEventConvertor.convertChangeEventColumnKeysToLowerCase(changeEvent);
108110
ChangeEventConvertor.verifySpannerSchema(ddl, changeEvent);
109111

110-
this.primaryKey =
111-
ChangeEventSpannerConvertor.changeEventToPrimaryKey(
112-
changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(),
113-
ddl,
114-
changeEvent,
115-
/* convertNameToLowerCase= */ true);
116-
this.dataMutation = ChangeEventConvertor.changeEventToMutation(ddl, changeEvent);
112+
boolean hasGeneratedPK = false;
113+
Table table = ddl.table(this.dataTable);
114+
if (table != null) {
115+
hasGeneratedPK = hasGeneratedPK(table);
116+
}
117+
118+
String changeType = getChangeType(changeEvent);
119+
boolean isDelete = DatastreamConstants.DELETE_EVENT.equalsIgnoreCase(changeType);
120+
121+
if (hasGeneratedPK && isDelete) {
122+
this.primaryKey = null;
123+
this.dataMutation = null;
124+
} else {
125+
this.primaryKey =
126+
ChangeEventSpannerConvertor.changeEventToPrimaryKey(
127+
changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(),
128+
ddl,
129+
changeEvent,
130+
/* convertNameToLowerCase= */ true);
131+
this.dataMutation = ChangeEventConvertor.changeEventToMutation(ddl, changeEvent);
132+
}
133+
117134
this.shadowTableMutation = generateShadowTableMutation(ddl, shadowTableDdl);
118135
}
119136

137+
public Statement getDataDmlStatement(Ddl ddl) throws ChangeEventConvertorException {
138+
String changeType = getChangeType(changeEvent);
139+
boolean isDelete = DatastreamConstants.DELETE_EVENT.equalsIgnoreCase(changeType);
140+
if (!isDelete) {
141+
return null;
142+
}
143+
Table table = ddl.table(this.dataTable);
144+
if (table != null && hasGeneratedPK(table)) {
145+
return generateDeleteDml(table, this.dataTable, changeEvent);
146+
}
147+
return null;
148+
}
149+
150+
private boolean hasGeneratedPK(Table table) {
151+
for (IndexColumn keyColumn : table.primaryKeys()) {
152+
if (table.column(keyColumn.name()).isGenerated()) {
153+
return true;
154+
}
155+
}
156+
return false;
157+
}
158+
159+
private Statement generateDeleteDml(Table table, String tableName, JsonNode event)
160+
throws ChangeEventConvertorException {
161+
StringBuilder sql = new StringBuilder("DELETE FROM ").append(tableName).append(" WHERE ");
162+
Statement.Builder builder = Statement.newBuilder("");
163+
boolean first = true;
164+
for (com.google.cloud.teleport.v2.spanner.ddl.Column column : table.columns()) {
165+
String colName = column.name();
166+
if (column.isGenerated()) {
167+
continue;
168+
}
169+
if (!first) {
170+
sql.append(" AND ");
171+
}
172+
sql.append(colName).append(" = @").append(colName);
173+
// Bind value
174+
Value value =
175+
ChangeEventTypeConvertor.toValue(event, column.type(), colName, /* requiredField */ true);
176+
builder.bind(colName).to(value);
177+
first = false;
178+
}
179+
builder.replace(sql.toString());
180+
return builder.build();
181+
}
182+
120183
public JsonNode getChangeEvent() {
121184
return changeEvent;
122185
}
123186

124187
// Returns an array of data and shadow table mutations.
125188
public Iterable<Mutation> getMutations() {
126-
return Arrays.asList(dataMutation, shadowTableMutation);
189+
java.util.List<Mutation> mutations = new java.util.ArrayList<>();
190+
if (dataMutation != null) {
191+
mutations.add(dataMutation);
192+
}
193+
if (shadowTableMutation != null) {
194+
mutations.add(shadowTableMutation);
195+
}
196+
return mutations;
127197
}
128198

129199
// Returns the data table mutation
@@ -176,4 +246,12 @@ public void readDataTableRowWithExclusiveLock(
176246
// Read the row in order to acquire the lock and discard it.
177247
resultSet.getCurrentRowAsStruct();
178248
}
249+
250+
public static String getChangeType(JsonNode changeEvent) {
251+
return changeEvent.has(DatastreamConstants.EVENT_CHANGE_TYPE_KEY)
252+
? changeEvent
253+
.get(DatastreamConstants.EVENT_CHANGE_TYPE_KEY)
254+
.asText(DatastreamConstants.EMPTY_EVENT)
255+
: DatastreamConstants.EMPTY_EVENT;
256+
}
179257
}

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContextTest.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,4 +295,76 @@ public void testReadDataTable() throws Exception {
295295
verify(resultSet, times(1)).next();
296296
verify(resultSet, times(0)).getCurrentRowAsStruct();
297297
}
298+
299+
@Test
300+
public void testCanGenerateDataDmlStatement() throws Exception {
301+
long eventTimestamp = 1615159728L;
302+
// DDL with generated PK
303+
Ddl ddl =
304+
Ddl.builder()
305+
.createTable("Users")
306+
.column("id")
307+
.string()
308+
.max()
309+
.generatedAs("uuid()")
310+
.endColumn()
311+
.column("first_name")
312+
.string()
313+
.max()
314+
.endColumn()
315+
.column("last_name")
316+
.string()
317+
.max()
318+
.endColumn()
319+
.primaryKey()
320+
.asc("id")
321+
.end()
322+
.endTable()
323+
.createTable("shadow_Users")
324+
.column("id")
325+
.string()
326+
.max()
327+
.endColumn()
328+
.column("first_name")
329+
.string()
330+
.max()
331+
.endColumn()
332+
.column("last_name")
333+
.string()
334+
.max()
335+
.endColumn()
336+
.primaryKey()
337+
.asc("id")
338+
.end()
339+
.endTable()
340+
.build();
341+
342+
JSONObject changeEvent = new JSONObject();
343+
changeEvent.put(DatastreamConstants.EVENT_TABLE_NAME_KEY, "Users");
344+
changeEvent.put(DatastreamConstants.EVENT_CHANGE_TYPE_KEY, DatastreamConstants.DELETE_EVENT);
345+
changeEvent.put(
346+
DatastreamConstants.EVENT_SOURCE_TYPE_KEY, DatastreamConstants.MYSQL_SOURCE_TYPE);
347+
changeEvent.put(DatastreamConstants.MYSQL_TIMESTAMP_KEY, eventTimestamp);
348+
// Source keys (mapped to non-generated columns in Spanner)
349+
changeEvent.put("first_name", "John");
350+
changeEvent.put("last_name", "Doe");
351+
changeEvent.put("id", "aa");
352+
353+
ChangeEventContext context =
354+
ChangeEventContextFactory.createChangeEventContext(
355+
getJsonNode(changeEvent.toString()),
356+
ddl,
357+
ddl,
358+
"shadow_",
359+
DatastreamConstants.MYSQL_SOURCE_TYPE);
360+
361+
Statement dmlStatement = context.getDataDmlStatement(ddl);
362+
363+
// Verify DML generation
364+
String expectedSql =
365+
"DELETE FROM Users WHERE first_name = @first_name AND last_name = @last_name";
366+
assertEquals(expectedSql, dmlStatement.getSql());
367+
assertEquals("John", dmlStatement.getParameters().get("first_name").getString());
368+
assertEquals("Doe", dmlStatement.getParameters().get("last_name").getString());
369+
}
298370
}

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/transform/ChangeEventTransformerDoFnTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,4 +616,62 @@ public void testProcessElementWithInvalidChangeEventException() throws Exception
616616
assertEquals(
617617
"Invalid byte array value for column: invalidKey", argument.getValue().getErrorMessage());
618618
}
619+
620+
@Test
621+
public void testProcessElementWithGeneratedColumn() throws Exception {
622+
ObjectMapper mapper = new ObjectMapper();
623+
mapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
624+
Schema schema = mock(Schema.class);
625+
CustomTransformation customTransformation = mock(CustomTransformation.class);
626+
DoFn.ProcessContext processContextMock = mock(DoFn.ProcessContext.class);
627+
PCollectionView<Ddl> ddlView = mock(PCollectionView.class);
628+
SpannerConfig spannerConfig = mock(SpannerConfig.class);
629+
SpannerAccessor spannerAccessor = mock(SpannerAccessor.class);
630+
DatabaseClient databaseClientMock = mock(DatabaseClient.class);
631+
ChangeEventSessionConvertor changeEventSessionConvertor =
632+
mock(ChangeEventSessionConvertor.class);
633+
634+
// Create failsafe element input for the DoFn
635+
ObjectNode changeEvent = mapper.createObjectNode();
636+
changeEvent.put(DatastreamConstants.EVENT_SOURCE_TYPE_KEY, Constants.MYSQL_SOURCE_TYPE);
637+
changeEvent.put(DatastreamConstants.EVENT_TABLE_NAME_KEY, "Users");
638+
changeEvent.put("first_name", "Johnny");
639+
changeEvent.put("gen_col", "123");
640+
FailsafeElement<String, String> failsafeElement =
641+
FailsafeElement.of(changeEvent.toString(), changeEvent.toString());
642+
643+
Ddl ddl =
644+
Ddl.builder()
645+
.createTable("Users")
646+
.column("first_name")
647+
.string()
648+
.max()
649+
.endColumn()
650+
.column("gen_col")
651+
.int64()
652+
.generatedAs("1")
653+
.endColumn()
654+
.endTable()
655+
.build();
656+
657+
when(schema.isEmpty()).thenReturn(false);
658+
when(processContextMock.element()).thenReturn(failsafeElement);
659+
when(processContextMock.sideInput(ddlView)).thenReturn(ddl);
660+
when(spannerAccessor.getDatabaseClient()).thenReturn(databaseClientMock);
661+
when(changeEventSessionConvertor.transformChangeEventViaSessionFile(changeEvent))
662+
.thenReturn(changeEvent);
663+
when(changeEventSessionConvertor.transformChangeEventData(changeEvent, databaseClientMock, ddl))
664+
.thenReturn(changeEvent);
665+
666+
ChangeEventTransformerDoFn changeEventTransformerDoFn =
667+
ChangeEventTransformerDoFn.create(
668+
schema, null, null, null, "mysql", customTransformation, false, ddlView, spannerConfig);
669+
changeEventTransformerDoFn.setMapper(mapper);
670+
changeEventTransformerDoFn.setSpannerAccessor(spannerAccessor);
671+
changeEventTransformerDoFn.setChangeEventSessionConvertor(changeEventSessionConvertor);
672+
changeEventTransformerDoFn.processElement(processContextMock);
673+
674+
verify(changeEventSessionConvertor, times(1))
675+
.transformChangeEventData(changeEvent, databaseClientMock, ddl);
676+
}
619677
}

v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/convertors/ChangeEventSessionConvertor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,10 @@ public JsonNode transformChangeEventData(JsonNode changeEvent, DatabaseClient db
265265
.filter(f -> !f.startsWith(EVENT_METADATA_KEY_PREFIX))
266266
.collect(Collectors.toList());
267267
for (String columnName : columnNames) {
268+
if (ddl.table(tableName).column(columnName).isGenerated()) {
269+
((ObjectNode) changeEvent).remove(columnName);
270+
continue;
271+
}
268272
Type columnType = ddl.table(tableName).column(columnName).type();
269273
if (columnType.getCode() == Type.Code.JSON || columnType.getCode() == Type.Code.PG_JSONB) {
270274
// JSON type cannot be a key column, hence setting requiredField to false.

0 commit comments

Comments
 (0)