Skip to content

Commit 95666a2

Browse files
feat(Spanner): Remove generated columns from change events during transformation and add tests for this behavior.
1 parent 2735749 commit 95666a2

File tree

8 files changed

+331
-84
lines changed

8 files changed

+331
-84
lines changed

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.google.cloud.Timestamp;
2929
import com.google.cloud.spanner.Options;
3030
import com.google.cloud.spanner.SpannerException;
31+
import com.google.cloud.spanner.Statement;
3132
import com.google.cloud.spanner.TransactionRunner;
3233
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
3334
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException;
@@ -278,7 +279,7 @@ public void processElement(ProcessContext c) {
278279
c, changeEventContext, currentChangeEventSequence, shadowTableDdl, ddl);
279280
} else {
280281
processSingleDatabaseTransaction(
281-
c, changeEventContext, currentChangeEventSequence, shadowTableDdl);
282+
c, changeEventContext, currentChangeEventSequence, shadowTableDdl, ddl);
282283
}
283284
com.google.cloud.Timestamp timestamp = com.google.cloud.Timestamp.now();
284285
c.output(timestamp);
@@ -377,7 +378,8 @@ private void processSingleDatabaseTransaction(
377378
ProcessContext c,
378379
ChangeEventContext changeEventContext,
379380
ChangeEventSequence currentChangeEventSequence,
380-
Ddl shadowDdl) {
381+
Ddl shadowDdl,
382+
Ddl ddl) {
381383

382384
spannerAccessor
383385
.getDatabaseClient()
@@ -402,7 +404,14 @@ private void processSingleDatabaseTransaction(
402404
skippedEvents.inc();
403405
return null;
404406
}
405-
// Apply shadow and data table mutations.
407+
// Execute DML if applicable
408+
Statement dataDml = changeEventContext.getDataDmlStatement(ddl);
409+
410+
if (dataDml != null) {
411+
transaction.executeUpdate(dataDml);
412+
}
413+
414+
// Apply shadow and data table mutations (only if they exist)
406415
transaction.buffer(changeEventContext.getMutations());
407416
isInTransaction.set(false);
408417
return null;
@@ -472,6 +481,13 @@ void processCrossDatabaseTransaction(
472481
.run(
473482
(TransactionRunner.TransactionCallable<Void>)
474483
mainTxn -> {
484+
// Execute Data DML if applicable
485+
Statement dataDml =
486+
changeEventContext.getDataDmlStatement(dataTableDdl);
487+
if (dataDml != null) {
488+
mainTxn.executeUpdate(dataDml);
489+
}
490+
475491
// Read row from main table with lock scanned ranges to acquire
476492
// exclusive lock on the main table row.
477493
changeEventContext.readDataTableRowWithExclusiveLock(
@@ -504,7 +520,9 @@ void processCrossDatabaseTransaction(
504520
}
505521

506522
// Write to main table
507-
mainTxn.buffer(changeEventContext.getDataMutation());
523+
if (changeEventContext.getDataMutation() != null) {
524+
mainTxn.buffer(changeEventContext.getDataMutation());
525+
}
508526
return null;
509527
});
510528

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java

Lines changed: 84 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,18 @@
2121
import com.google.cloud.spanner.ResultSet;
2222
import com.google.cloud.spanner.Statement;
2323
import com.google.cloud.spanner.TransactionContext;
24+
import com.google.cloud.spanner.Value;
2425
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
26+
import com.google.cloud.teleport.v2.spanner.ddl.IndexColumn;
2527
import com.google.cloud.teleport.v2.spanner.ddl.Table;
2628
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventSpannerConvertor;
29+
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventTypeConvertor;
2730
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException;
2831
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException;
2932
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException;
3033
import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerReadUtils;
3134
import com.google.cloud.teleport.v2.templates.spanner.ShadowTableCreator;
3235
import com.google.common.collect.ImmutableMap;
33-
import java.util.Arrays;
3436
import java.util.List;
3537
import java.util.Map;
3638
import java.util.Set;
@@ -107,23 +109,96 @@ protected void convertChangeEventToMutation(Ddl ddl, Ddl shadowTableDdl)
107109
ChangeEventConvertor.convertChangeEventColumnKeysToLowerCase(changeEvent);
108110
ChangeEventConvertor.verifySpannerSchema(ddl, changeEvent);
109111

110-
this.primaryKey =
111-
ChangeEventSpannerConvertor.changeEventToPrimaryKey(
112-
changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(),
113-
ddl,
114-
changeEvent,
115-
/* convertNameToLowerCase= */ true);
116-
this.dataMutation = ChangeEventConvertor.changeEventToMutation(ddl, changeEvent);
112+
boolean hasGeneratedPK = false;
113+
Table table = ddl.table(this.dataTable);
114+
if (table != null) {
115+
hasGeneratedPK = hasGeneratedPK(table);
116+
}
117+
118+
String changeType =
119+
changeEvent.has(DatastreamConstants.EVENT_CHANGE_TYPE_KEY)
120+
? changeEvent
121+
.get(DatastreamConstants.EVENT_CHANGE_TYPE_KEY)
122+
.asText(DatastreamConstants.EMPTY_EVENT)
123+
: DatastreamConstants.EMPTY_EVENT;
124+
boolean isDelete = DatastreamConstants.DELETE_EVENT.equalsIgnoreCase(changeType);
125+
126+
if (hasGeneratedPK && isDelete) {
127+
this.primaryKey = null;
128+
this.dataMutation = null;
129+
} else {
130+
this.primaryKey =
131+
ChangeEventSpannerConvertor.changeEventToPrimaryKey(
132+
changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(),
133+
ddl,
134+
changeEvent,
135+
/* convertNameToLowerCase= */ true);
136+
this.dataMutation = ChangeEventConvertor.changeEventToMutation(ddl, changeEvent);
137+
}
138+
117139
this.shadowTableMutation = generateShadowTableMutation(ddl, shadowTableDdl);
118140
}
119141

142+
public Statement getDataDmlStatement(Ddl ddl) throws ChangeEventConvertorException {
143+
String changeType = changeEvent.get(DatastreamConstants.EVENT_CHANGE_TYPE_KEY).asText();
144+
boolean isDelete = DatastreamConstants.DELETE_EVENT.equalsIgnoreCase(changeType);
145+
if (!isDelete) {
146+
return null;
147+
}
148+
Table table = ddl.table(this.dataTable);
149+
if (table != null && hasGeneratedPK(table)) {
150+
return generateDeleteDml(table, this.dataTable, changeEvent);
151+
}
152+
return null;
153+
}
154+
155+
private boolean hasGeneratedPK(Table table) {
156+
for (IndexColumn keyColumn : table.primaryKeys()) {
157+
if (table.column(keyColumn.name()).isGenerated()) {
158+
return true;
159+
}
160+
}
161+
return false;
162+
}
163+
164+
private Statement generateDeleteDml(Table table, String tableName, JsonNode event)
165+
throws ChangeEventConvertorException {
166+
StringBuilder sql = new StringBuilder("DELETE FROM ").append(tableName).append(" WHERE ");
167+
Statement.Builder builder = Statement.newBuilder("");
168+
boolean first = true;
169+
for (com.google.cloud.teleport.v2.spanner.ddl.Column column : table.columns()) {
170+
String colName = column.name();
171+
if (column.isGenerated()) {
172+
continue;
173+
}
174+
if (!first) {
175+
sql.append(" AND ");
176+
}
177+
sql.append(colName).append(" = @").append(colName);
178+
// Bind value
179+
Value value =
180+
ChangeEventTypeConvertor.toValue(event, column.type(), colName, /* requiredField */ true);
181+
builder.bind(colName).to(value);
182+
first = false;
183+
}
184+
builder.replace(sql.toString());
185+
return builder.build();
186+
}
187+
120188
public JsonNode getChangeEvent() {
121189
return changeEvent;
122190
}
123191

124192
// Returns an array of data and shadow table mutations.
125193
public Iterable<Mutation> getMutations() {
126-
return Arrays.asList(dataMutation, shadowTableMutation);
194+
java.util.List<Mutation> mutations = new java.util.ArrayList<>();
195+
if (dataMutation != null) {
196+
mutations.add(dataMutation);
197+
}
198+
if (shadowTableMutation != null) {
199+
mutations.add(shadowTableMutation);
200+
}
201+
return mutations;
127202
}
128203

129204
// Returns the data table mutation

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContextTest.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,4 +295,76 @@ public void testReadDataTable() throws Exception {
295295
verify(resultSet, times(1)).next();
296296
verify(resultSet, times(0)).getCurrentRowAsStruct();
297297
}
298+
299+
@Test
300+
public void testCanGenerateDataDmlStatement() throws Exception {
301+
long eventTimestamp = 1615159728L;
302+
// DDL with generated PK
303+
Ddl ddl =
304+
Ddl.builder()
305+
.createTable("Users")
306+
.column("id")
307+
.string()
308+
.max()
309+
.generatedAs("uuid()")
310+
.endColumn()
311+
.column("first_name")
312+
.string()
313+
.max()
314+
.endColumn()
315+
.column("last_name")
316+
.string()
317+
.max()
318+
.endColumn()
319+
.primaryKey()
320+
.asc("id")
321+
.end()
322+
.endTable()
323+
.createTable("shadow_Users")
324+
.column("id")
325+
.string()
326+
.max()
327+
.endColumn()
328+
.column("first_name")
329+
.string()
330+
.max()
331+
.endColumn()
332+
.column("last_name")
333+
.string()
334+
.max()
335+
.endColumn()
336+
.primaryKey()
337+
.asc("id")
338+
.end()
339+
.endTable()
340+
.build();
341+
342+
JSONObject changeEvent = new JSONObject();
343+
changeEvent.put(DatastreamConstants.EVENT_TABLE_NAME_KEY, "Users");
344+
changeEvent.put(DatastreamConstants.EVENT_CHANGE_TYPE_KEY, DatastreamConstants.DELETE_EVENT);
345+
changeEvent.put(
346+
DatastreamConstants.EVENT_SOURCE_TYPE_KEY, DatastreamConstants.MYSQL_SOURCE_TYPE);
347+
changeEvent.put(DatastreamConstants.MYSQL_TIMESTAMP_KEY, eventTimestamp);
348+
// Source keys (mapped to non-generated columns in Spanner)
349+
changeEvent.put("first_name", "John");
350+
changeEvent.put("last_name", "Doe");
351+
changeEvent.put("id", "aa");
352+
353+
ChangeEventContext context =
354+
ChangeEventContextFactory.createChangeEventContext(
355+
getJsonNode(changeEvent.toString()),
356+
ddl,
357+
ddl,
358+
"shadow_",
359+
DatastreamConstants.MYSQL_SOURCE_TYPE);
360+
361+
Statement dmlStatement = context.getDataDmlStatement(ddl);
362+
363+
// Verify DML generation
364+
String expectedSql =
365+
"DELETE FROM Users WHERE first_name = @first_name AND last_name = @last_name";
366+
assertEquals(expectedSql, dmlStatement.getSql());
367+
assertEquals("John", dmlStatement.getParameters().get("first_name").getString());
368+
assertEquals("Doe", dmlStatement.getParameters().get("last_name").getString());
369+
}
298370
}

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/transform/ChangeEventTransformerDoFnTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,4 +616,62 @@ public void testProcessElementWithInvalidChangeEventException() throws Exception
616616
assertEquals(
617617
"Invalid byte array value for column: invalidKey", argument.getValue().getErrorMessage());
618618
}
619+
620+
@Test
621+
public void testProcessElementWithGeneratedColumn() throws Exception {
622+
ObjectMapper mapper = new ObjectMapper();
623+
mapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
624+
Schema schema = mock(Schema.class);
625+
CustomTransformation customTransformation = mock(CustomTransformation.class);
626+
DoFn.ProcessContext processContextMock = mock(DoFn.ProcessContext.class);
627+
PCollectionView<Ddl> ddlView = mock(PCollectionView.class);
628+
SpannerConfig spannerConfig = mock(SpannerConfig.class);
629+
SpannerAccessor spannerAccessor = mock(SpannerAccessor.class);
630+
DatabaseClient databaseClientMock = mock(DatabaseClient.class);
631+
ChangeEventSessionConvertor changeEventSessionConvertor =
632+
mock(ChangeEventSessionConvertor.class);
633+
634+
// Create failsafe element input for the DoFn
635+
ObjectNode changeEvent = mapper.createObjectNode();
636+
changeEvent.put(DatastreamConstants.EVENT_SOURCE_TYPE_KEY, Constants.MYSQL_SOURCE_TYPE);
637+
changeEvent.put(DatastreamConstants.EVENT_TABLE_NAME_KEY, "Users");
638+
changeEvent.put("first_name", "Johnny");
639+
changeEvent.put("gen_col", "123");
640+
FailsafeElement<String, String> failsafeElement =
641+
FailsafeElement.of(changeEvent.toString(), changeEvent.toString());
642+
643+
Ddl ddl =
644+
Ddl.builder()
645+
.createTable("Users")
646+
.column("first_name")
647+
.string()
648+
.max()
649+
.endColumn()
650+
.column("gen_col")
651+
.int64()
652+
.generatedAs("1")
653+
.endColumn()
654+
.endTable()
655+
.build();
656+
657+
when(schema.isEmpty()).thenReturn(false);
658+
when(processContextMock.element()).thenReturn(failsafeElement);
659+
when(processContextMock.sideInput(ddlView)).thenReturn(ddl);
660+
when(spannerAccessor.getDatabaseClient()).thenReturn(databaseClientMock);
661+
when(changeEventSessionConvertor.transformChangeEventViaSessionFile(changeEvent))
662+
.thenReturn(changeEvent);
663+
when(changeEventSessionConvertor.transformChangeEventData(changeEvent, databaseClientMock, ddl))
664+
.thenReturn(changeEvent);
665+
666+
ChangeEventTransformerDoFn changeEventTransformerDoFn =
667+
ChangeEventTransformerDoFn.create(
668+
schema, null, null, null, "mysql", customTransformation, false, ddlView, spannerConfig);
669+
changeEventTransformerDoFn.setMapper(mapper);
670+
changeEventTransformerDoFn.setSpannerAccessor(spannerAccessor);
671+
changeEventTransformerDoFn.setChangeEventSessionConvertor(changeEventSessionConvertor);
672+
changeEventTransformerDoFn.processElement(processContextMock);
673+
674+
verify(changeEventSessionConvertor, times(1))
675+
.transformChangeEventData(changeEvent, databaseClientMock, ddl);
676+
}
619677
}

v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/convertors/ChangeEventSessionConvertor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,10 @@ public JsonNode transformChangeEventData(JsonNode changeEvent, DatabaseClient db
265265
.filter(f -> !f.startsWith(EVENT_METADATA_KEY_PREFIX))
266266
.collect(Collectors.toList());
267267
for (String columnName : columnNames) {
268+
if (ddl.table(tableName).column(columnName).isGenerated()) {
269+
((ObjectNode) changeEvent).remove(columnName);
270+
continue;
271+
}
268272
Type columnType = ddl.table(tableName).column(columnName).type();
269273
if (columnType.getCode() == Type.Code.JSON || columnType.getCode() == Type.Code.PG_JSONB) {
270274
// JSON type cannot be a key column, hence setting requiredField to false.

0 commit comments

Comments
 (0)