Skip to content

Commit 9f7e62f

Browse files
[sourcedb-to-spanner, datastream-to-spanner] Bulk Migration DLQ uses Spanner Mutations but Live Retry consumes it as Source Row (#3259)
* initial changes * testing * removing logs,etc * add unit tests * CustomTransformationFetcherTest
1 parent 90cea0e commit 9f7e62f

File tree

14 files changed

+257
-14
lines changed

14 files changed

+257
-14
lines changed

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/transform/ChangeEventTransformerDoFn.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,12 @@ public void processElement(ProcessContext c) {
180180
Map<String, Object> sourceRecord =
181181
ChangeEventToMapConvertor.convertChangeEventToMap(changeEvent);
182182

183+
boolean isSpannerMutation =
184+
changeEvent.has("_metadata_spanner_mutation")
185+
&& changeEvent.get("_metadata_spanner_mutation").asBoolean();
186+
183187
// TODO: Transformation via session file should be marked deprecated and removed.
184-
if (!schema().isEmpty()) {
188+
if (!schema().isEmpty() && !isSpannerMutation) {
185189
schema().verifyTableInSession(changeEvent.get(EVENT_TABLE_NAME_KEY).asText());
186190
changeEvent = changeEventSessionConvertor.transformChangeEventViaSessionFile(changeEvent);
187191
}
@@ -191,20 +195,22 @@ public void processElement(ProcessContext c) {
191195
}
192196

193197
// Perform mapping as per overrides
194-
if (schemaOverridesParser() != null) {
198+
if (schemaOverridesParser() != null && !isSpannerMutation) {
195199
changeEvent = changeEventSessionConvertor.transformChangeEventViaOverrides(changeEvent);
196200
}
197201

198-
changeEvent =
199-
changeEventSessionConvertor.transformChangeEventData(
200-
changeEvent, spannerAccessor.getDatabaseClient(), ddl);
202+
if (!isSpannerMutation) {
203+
changeEvent =
204+
changeEventSessionConvertor.transformChangeEventData(
205+
changeEvent, spannerAccessor.getDatabaseClient(), ddl);
206+
}
201207

202208
// If custom jar is specified apply custom transformation to the change event
203209
if (datastreamToSpannerTransformer != null) {
204210
MigrationTransformationResponse migrationTransformationResponse = null;
205211
try {
206212
migrationTransformationResponse =
207-
getCustomTransformationResponse(changeEvent, sourceRecord);
213+
getTransformationResponse(changeEvent, sourceRecord, isSpannerMutation);
208214
if (migrationTransformationResponse.isEventFiltered()) {
209215
filteredEvents.inc();
210216
c.output(DatastreamToSpannerConstants.FILTERED_EVENT_TAG, msg.getOriginalPayload());
@@ -215,6 +221,7 @@ public void processElement(ProcessContext c) {
215221
changeEvent =
216222
ChangeEventToMapConvertor.transformChangeEventViaCustomTransformation(
217223
changeEvent, migrationTransformationResponse.getResponseRow());
224+
218225
if (changeEvent.get(SHARD_ID_COLUMN_NAME) != null) {
219226
migrationShardId =
220227
changeEvent.get(changeEvent.get(SHARD_ID_COLUMN_NAME).asText()).asText();
@@ -265,17 +272,25 @@ public void processElement(ProcessContext c) {
265272
}
266273
}
267274

268-
MigrationTransformationResponse getCustomTransformationResponse(
269-
JsonNode changeEvent, Map<String, Object> sourceRecord)
275+
MigrationTransformationResponse getTransformationResponse(
276+
JsonNode changeEvent, Map<String, Object> sourceRecord, boolean isSpannerMutation)
270277
throws InvalidTransformationException {
271278
String shardId = changeEventSessionConvertor.getShardId(changeEvent);
272279
String tableName = changeEvent.get(EVENT_TABLE_NAME_KEY).asText();
273280
Instant startTimestamp = Instant.now();
274281
MigrationTransformationRequest migrationTransformationRequest =
275282
new MigrationTransformationRequest(
276283
tableName, sourceRecord, shardId, changeEvent.get(EVENT_CHANGE_TYPE_KEY).asText());
277-
MigrationTransformationResponse migrationTransformationResponse =
278-
datastreamToSpannerTransformer.toSpannerRow(migrationTransformationRequest);
284+
MigrationTransformationResponse migrationTransformationResponse;
285+
if (isSpannerMutation) {
286+
migrationTransformationResponse =
287+
datastreamToSpannerTransformer.transformFailedSpannerMutation(
288+
migrationTransformationRequest);
289+
} else {
290+
migrationTransformationResponse =
291+
datastreamToSpannerTransformer.toSpannerRow(migrationTransformationRequest);
292+
}
293+
279294
Instant endTimestamp = Instant.now();
280295
applyCustomTransformationResponseTimeMetric.update(
281296
new Duration(startTimestamp, endTimestamp).getMillis());

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/transform/ChangeEventTransformerDoFnTest.java

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,4 +616,146 @@ public void testProcessElementWithInvalidChangeEventException() throws Exception
616616
assertEquals(
617617
"Invalid byte array value for column: invalidKey", argument.getValue().getErrorMessage());
618618
}
619+
620+
@Test
621+
public void testProcessElementWithSpannerMutationAndColumnRename() throws Exception {
622+
ObjectMapper mapper = new ObjectMapper();
623+
mapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
624+
Schema schema = mock(Schema.class);
625+
CustomTransformation customTransformation = mock(CustomTransformation.class);
626+
DoFn.ProcessContext processContextMock = mock(DoFn.ProcessContext.class);
627+
ISpannerMigrationTransformer spannerMigrationTransformer =
628+
mock(ISpannerMigrationTransformer.class);
629+
PCollectionView<Ddl> ddl = mock(PCollectionView.class);
630+
SpannerConfig spannerConfig = mock(SpannerConfig.class);
631+
SpannerAccessor spannerAccessor = mock(SpannerAccessor.class);
632+
DatabaseClient databaseClientMock = mock(DatabaseClient.class);
633+
ChangeEventSessionConvertor changeEventSessionConvertor =
634+
mock(ChangeEventSessionConvertor.class);
635+
636+
// Create failsafe element input for the DoFn
637+
// Simulating a failed Spanner mutation where column "first_name" (Source) was renamed to
638+
// "full_name" (Spanner)
639+
ObjectNode changeEvent = mapper.createObjectNode();
640+
changeEvent.put(DatastreamConstants.EVENT_SOURCE_TYPE_KEY, Constants.MYSQL_SOURCE_TYPE);
641+
changeEvent.put(DatastreamConstants.EVENT_TABLE_NAME_KEY, "Users");
642+
changeEvent.put("full_name", "Johnny Rose"); // Spanner column name
643+
changeEvent.put(EVENT_CHANGE_TYPE_KEY, "INSERT");
644+
changeEvent.put("_metadata_spanner_mutation", true);
645+
FailsafeElement<String, String> failsafeElement =
646+
FailsafeElement.of(changeEvent.toString(), changeEvent.toString());
647+
648+
Map<String, Object> sourceRecord =
649+
ChangeEventToMapConvertor.convertChangeEventToMap(changeEvent);
650+
MigrationTransformationRequest expectedRequest =
651+
new MigrationTransformationRequest("Users", sourceRecord, "", "INSERT");
652+
Map<String, Object> spannerRecord = new HashMap<>(sourceRecord);
653+
// The transformer returns the record as is (i.e. custom transformation is a no-op)
654+
MigrationTransformationResponse migrationTransformationResponse =
655+
new MigrationTransformationResponse(spannerRecord, false);
656+
657+
when(processContextMock.element()).thenReturn(failsafeElement);
658+
// Explicitly return false for schema.isEmpty() to prove that we would have entered the schema
659+
// check block if not bypassed
660+
when(schema.isEmpty()).thenReturn(false);
661+
// Throw if verifyTableInSession IS called - proving bypass
662+
doThrow(new RuntimeException("Schema check should be bypassed for spanner mutations"))
663+
.when(schema)
664+
.verifyTableInSession(any());
665+
when(spannerMigrationTransformer.transformFailedSpannerMutation(refEq(expectedRequest)))
666+
.thenReturn(migrationTransformationResponse);
667+
when(spannerAccessor.getDatabaseClient()).thenReturn(databaseClientMock);
668+
when(changeEventSessionConvertor.getShardId(changeEvent)).thenReturn("");
669+
670+
ChangeEventTransformerDoFn changeEventTransformerDoFn =
671+
ChangeEventTransformerDoFn.create(
672+
schema, null, null, null, "mysql", customTransformation, false, ddl, spannerConfig);
673+
changeEventTransformerDoFn.setMapper(mapper);
674+
changeEventTransformerDoFn.setDatastreamToSpannerTransformer(spannerMigrationTransformer);
675+
changeEventTransformerDoFn.setSpannerAccessor(spannerAccessor);
676+
changeEventTransformerDoFn.setChangeEventSessionConvertor(changeEventSessionConvertor);
677+
changeEventTransformerDoFn.processElement(processContextMock);
678+
679+
ArgumentCaptor<FailsafeElement> argument = ArgumentCaptor.forClass(FailsafeElement.class);
680+
verify(processContextMock, times(1))
681+
.output(eq(DatastreamToSpannerConstants.TRANSFORMED_EVENT_TAG), argument.capture());
682+
683+
// Verify standard transformations bypassed (which would have failed or looked for "first_name"
684+
// if session was checking)
685+
verify(schema, times(0)).verifyTableInSession(any());
686+
687+
assertEquals(
688+
"{\"_metadata_source_type\":\"mysql\",\"_metadata_table\":\"Users\",\"full_name\":\"Johnny Rose\",\"_metadata_change_type\":\"INSERT\",\"_metadata_spanner_mutation\":true}",
689+
argument.getValue().getPayload());
690+
}
691+
692+
@Test
693+
public void testProcessElementWithSpannerMutationAndCustomTransformation() throws Exception {
694+
ObjectMapper mapper = new ObjectMapper();
695+
mapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
696+
Schema schema = mock(Schema.class);
697+
CustomTransformation customTransformation = mock(CustomTransformation.class);
698+
DoFn.ProcessContext processContextMock = mock(DoFn.ProcessContext.class);
699+
ISpannerMigrationTransformer spannerMigrationTransformer =
700+
mock(ISpannerMigrationTransformer.class);
701+
PCollectionView<Ddl> ddl = mock(PCollectionView.class);
702+
SpannerConfig spannerConfig = mock(SpannerConfig.class);
703+
SpannerAccessor spannerAccessor = mock(SpannerAccessor.class);
704+
DatabaseClient databaseClientMock = mock(DatabaseClient.class);
705+
ChangeEventSessionConvertor changeEventSessionConvertor =
706+
mock(ChangeEventSessionConvertor.class);
707+
708+
// Create failsafe element input for the DoFn
709+
ObjectNode changeEvent = mapper.createObjectNode();
710+
changeEvent.put(DatastreamConstants.EVENT_SOURCE_TYPE_KEY, Constants.MYSQL_SOURCE_TYPE);
711+
changeEvent.put(DatastreamConstants.EVENT_TABLE_NAME_KEY, "Users");
712+
changeEvent.put("first_name", "Johnny");
713+
changeEvent.put(EVENT_CHANGE_TYPE_KEY, "INSERT");
714+
changeEvent.put("_metadata_spanner_mutation", true);
715+
FailsafeElement<String, String> failsafeElement =
716+
FailsafeElement.of(changeEvent.toString(), changeEvent.toString());
717+
718+
Map<String, Object> sourceRecord =
719+
ChangeEventToMapConvertor.convertChangeEventToMap(changeEvent);
720+
MigrationTransformationRequest expectedRequest =
721+
new MigrationTransformationRequest("Users", sourceRecord, "", "INSERT");
722+
Map<String, Object> spannerRecord = new HashMap<>(sourceRecord);
723+
spannerRecord.put("first_name", "Johnny Rose");
724+
MigrationTransformationResponse migrationTransformationResponse =
725+
new MigrationTransformationResponse(spannerRecord, false);
726+
727+
// Explicitly return false for schema.isEmpty() to prove that we would have entered the schema
728+
// check block if not bypassed
729+
when(schema.isEmpty()).thenReturn(false);
730+
// Throw if verifyTableInSession IS called - proving bypass
731+
doThrow(new RuntimeException("Schema check should be bypassed for spanner mutations"))
732+
.when(schema)
733+
.verifyTableInSession(any());
734+
when(processContextMock.element()).thenReturn(failsafeElement);
735+
when(spannerMigrationTransformer.transformFailedSpannerMutation(refEq(expectedRequest)))
736+
.thenReturn(migrationTransformationResponse);
737+
when(spannerAccessor.getDatabaseClient()).thenReturn(databaseClientMock);
738+
when(changeEventSessionConvertor.getShardId(changeEvent)).thenReturn("");
739+
740+
ChangeEventTransformerDoFn changeEventTransformerDoFn =
741+
ChangeEventTransformerDoFn.create(
742+
schema, null, null, null, "mysql", customTransformation, false, ddl, spannerConfig);
743+
changeEventTransformerDoFn.setMapper(mapper);
744+
changeEventTransformerDoFn.setDatastreamToSpannerTransformer(spannerMigrationTransformer);
745+
changeEventTransformerDoFn.setSpannerAccessor(spannerAccessor);
746+
changeEventTransformerDoFn.setChangeEventSessionConvertor(changeEventSessionConvertor);
747+
changeEventTransformerDoFn.processElement(processContextMock);
748+
749+
ArgumentCaptor<FailsafeElement> argument = ArgumentCaptor.forClass(FailsafeElement.class);
750+
verify(processContextMock, times(1))
751+
.output(eq(DatastreamToSpannerConstants.TRANSFORMED_EVENT_TAG), argument.capture());
752+
753+
// Verify standard transformations bypassed
754+
verify(schema, times(0)).verifyTableInSession(any());
755+
verify(changeEventSessionConvertor, times(0)).transformChangeEventViaSessionFile(any());
756+
757+
assertEquals(
758+
"{\"_metadata_source_type\":\"mysql\",\"_metadata_table\":\"Users\",\"first_name\":\"Johnny Rose\",\"_metadata_change_type\":\"INSERT\",\"_metadata_spanner_mutation\":true}",
759+
argument.getValue().getPayload());
760+
}
619761
}

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/writer/DeadLetterQueue.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,10 @@ protected FailsafeElement<String, String> rowContextToDlqElement(RowContext r) {
197197
/*
198198
* We take special care that if GenericRecordTypeConvertor throws an exception,
199199
* we would still preserve the original record in DLQ.
200-
* Also note that here we are calling a static utility from GenericRecordTypeConvertor
201-
* Which just marshals types like logical, record etc. It does not pass the data via custom transform.
200+
* Also note that here we are calling a static utility from
201+
* GenericRecordTypeConvertor
202+
* Which just marshals types like logical, record etc. It does not pass the data
203+
* via custom transform.
202204
*/
203205
try {
204206
value =
@@ -214,7 +216,8 @@ protected FailsafeElement<String, String> rowContextToDlqElement(RowContext r) {
214216
putValueToJson(json, f.name(), value);
215217
}
216218
if (r.row().shardId() != null) {
217-
// Added default to not fail in the DLQ flow if the src table is not found in map
219+
// Added default to not fail in the DLQ flow if the src table is not found in
220+
// map
218221
json.put(
219222
srcTableToShardIdColumnMap.getOrDefault(r.row().tableName(), "migration_shard_id"),
220223
r.row().shardId());
@@ -232,7 +235,8 @@ protected FailsafeElement<String, String> rowContextToDlqElement(RowContext r) {
232235
public void failedMutationsToDLQ(
233236
PCollection<@UnknownKeyFor @NonNull @Initialized MutationGroup> failedMutations) {
234237
// TODO - add the exception message
235-
// TODO - Explore windowing with CoGroupByKey to extract source row based on mutation
238+
// TODO - Explore windowing with CoGroupByKey to extract source row based on
239+
// mutation
236240
LOG.warn("added mutation output to pipeline");
237241
failedMutations
238242
.apply(
@@ -262,6 +266,7 @@ public void processElement(
262266
@VisibleForTesting
263267
protected FailsafeElement<String, String> mutationToDlqElement(Mutation m) {
264268
JSONObject json = new JSONObject();
269+
json.put("_metadata_spanner_mutation", true);
265270

266271
Instant instant = Instant.now();
267272
initializeJsonNode(

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/writer/DeadLetterQueueTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,15 @@ public void testMutationToDlqElementWithBytesArray() {
494494
assertTrue(payload.contains("\"bytes_array_col\":[\"7465737431\",\"7465737432\"]"));
495495
}
496496

497+
@Test
498+
public void testMutationToDlqElementWithMutationFlag() {
499+
DeadLetterQueue dlq = DeadLetterQueue.create("testDir", null, null, SQLDialect.MYSQL, null);
500+
Mutation mutation = Mutation.newInsertBuilder("srcTable").set("firstName").to("abc").build();
501+
FailsafeElement<String, String> dlqElement = dlq.mutationToDlqElement(mutation);
502+
assertNotNull(dlqElement);
503+
assertTrue(dlqElement.getOriginalPayload().contains("\"_metadata_spanner_mutation\":true"));
504+
}
505+
497506
private static ISchemaMapper getIdentityMapper(Ddl spannerDdl) {
498507
return new IdentityMapper(spannerDdl);
499508
}

v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertorTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1440,6 +1440,12 @@ public MigrationTransformationResponse toSourceRow(
14401440
MigrationTransformationRequest request) {
14411441
return null;
14421442
}
1443+
1444+
@Override
1445+
public MigrationTransformationResponse transformFailedSpannerMutation(
1446+
MigrationTransformationRequest request) throws InvalidTransformationException {
1447+
return new MigrationTransformationResponse(request.getRequestRow(), false);
1448+
}
14431449
};
14441450

14451451
GenericRecordTypeConvertor genericRecordTypeConvertor =
@@ -1497,6 +1503,12 @@ public MigrationTransformationResponse toSourceRow(MigrationTransformationReques
14971503
throws InvalidTransformationException {
14981504
return null;
14991505
}
1506+
1507+
@Override
1508+
public MigrationTransformationResponse transformFailedSpannerMutation(
1509+
MigrationTransformationRequest request) throws InvalidTransformationException {
1510+
return new MigrationTransformationResponse(request.getRequestRow(), false);
1511+
}
15001512
}
15011513

15021514
private CassandraAnnotations getTestCassandraAnnotation(String annotation) {

v2/spanner-custom-shard/src/main/java/com/custom/CustomTransformationAllTypes.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,10 @@ public MigrationTransformationResponse toSourceRow(MigrationTransformationReques
4545
throws InvalidTransformationException {
4646
return new MigrationTransformationResponse(null, false);
4747
}
48+
49+
@Override
50+
public MigrationTransformationResponse transformFailedSpannerMutation(
51+
MigrationTransformationRequest request) throws InvalidTransformationException {
52+
return new MigrationTransformationResponse(null, false);
53+
}
4854
}

v2/spanner-custom-shard/src/main/java/com/custom/CustomTransformationAllTypesWithException.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,10 @@ public MigrationTransformationResponse toSourceRow(MigrationTransformationReques
4646
throws InvalidTransformationException {
4747
return new MigrationTransformationResponse(null, false);
4848
}
49+
50+
@Override
51+
public MigrationTransformationResponse transformFailedSpannerMutation(
52+
MigrationTransformationRequest request) throws InvalidTransformationException {
53+
return new MigrationTransformationResponse(null, false);
54+
}
4955
}

v2/spanner-custom-shard/src/main/java/com/custom/CustomTransformationFetcher.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,10 @@ public MigrationTransformationResponse toSourceRow(MigrationTransformationReques
7474
}
7575
return new MigrationTransformationResponse(null, false);
7676
}
77+
78+
@Override
79+
public MigrationTransformationResponse transformFailedSpannerMutation(
80+
MigrationTransformationRequest request) throws InvalidTransformationException {
81+
return new MigrationTransformationResponse(null, false);
82+
}
7783
}

v2/spanner-custom-shard/src/main/java/com/custom/CustomTransformationForCassandraAllDataTypesIT.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,10 @@ public MigrationTransformationResponse toSourceRow(MigrationTransformationReques
7171
throw new UnsupportedOperationException(
7272
"This test custom transform is not intended for reverse replication.");
7373
}
74+
75+
@Override
76+
public MigrationTransformationResponse transformFailedSpannerMutation(
77+
MigrationTransformationRequest request) throws InvalidTransformationException {
78+
return new MigrationTransformationResponse(null, false);
79+
}
7480
}

v2/spanner-custom-shard/src/main/java/com/custom/CustomTransformationWithCassandraForIT.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,10 @@ public MigrationTransformationResponse toSourceRow(MigrationTransformationReques
5353
}
5454
return new MigrationTransformationResponse(null, false);
5555
}
56+
57+
@Override
58+
public MigrationTransformationResponse transformFailedSpannerMutation(
59+
MigrationTransformationRequest request) throws InvalidTransformationException {
60+
return new MigrationTransformationResponse(null, false);
61+
}
5662
}

0 commit comments

Comments
 (0)