Skip to content

Commit 81cf9c5

Browse files
Fix Transformation Failed for Dropped Columns (#3466)
* initial change * review changes
1 parent 3c5c4b5 commit 81cf9c5

File tree

2 files changed

+109
-4
lines changed

2 files changed

+109
-4
lines changed

v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertor.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -302,10 +302,18 @@ private Map<String, Object> genericRecordToMap(GenericRecord record, String srcT
302302
Schema fieldSchema = filterNullSchema(field.schema(), fieldName, fieldValue);
303303
// Handle logical/record types.
304304
final CassandraAnnotations cassandraAnnotations =
305-
schemaMapper.getSpannerColumnCassandraAnnotations(
306-
namespace,
307-
schemaMapper.getSpannerTableName(namespace, srcTableName),
308-
schemaMapper.getSpannerColumnName(namespace, srcTableName, fieldName));
305+
((java.util.function.Supplier<CassandraAnnotations>)
306+
() -> {
307+
try {
308+
return schemaMapper.getSpannerColumnCassandraAnnotations(
309+
namespace,
310+
schemaMapper.getSpannerTableName(namespace, srcTableName),
311+
schemaMapper.getSpannerColumnName(namespace, srcTableName, fieldName));
312+
} catch (NoSuchElementException e) {
313+
return CassandraAnnotations.fromColumnOptions(List.of(), fieldName);
314+
}
315+
})
316+
.get();
309317
fieldValue =
310318
handleNonPrimitiveAvroTypes(fieldValue, fieldSchema, fieldName, cassandraAnnotations);
311319
// Standardizing the types for custom jar input.

v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertorTest.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -701,6 +701,103 @@ public void testPrimitiveAndNonPrimitiveTypesHandling() throws InvalidTransforma
701701
"timeStampArrayCol", Value.timestampArray(expectedTimeStampArray)));
702702
}
703703

704+
@Test
705+
public void testDroppedColumnHandlingInCustomTransformation()
706+
throws InvalidTransformationException {
707+
final String tableName = "test_table";
708+
final ISchemaMapper schemaMapper = org.mockito.Mockito.mock(ISchemaMapper.class);
709+
710+
org.mockito.Mockito.when(schemaMapper.getDialect())
711+
.thenReturn(com.google.cloud.spanner.Dialect.GOOGLE_STANDARD_SQL);
712+
org.mockito.Mockito.when(
713+
schemaMapper.getSpannerTableName(
714+
org.mockito.ArgumentMatchers.anyString(), org.mockito.ArgumentMatchers.anyString()))
715+
.thenReturn(tableName);
716+
org.mockito.Mockito.when(
717+
schemaMapper.getSpannerColumns(
718+
org.mockito.ArgumentMatchers.anyString(), org.mockito.ArgumentMatchers.anyString()))
719+
.thenReturn(ImmutableList.of("id"));
720+
721+
org.mockito.Mockito.when(
722+
schemaMapper.getSpannerColumnName(
723+
org.mockito.ArgumentMatchers.anyString(),
724+
org.mockito.ArgumentMatchers.anyString(),
725+
org.mockito.ArgumentMatchers.eq("id")))
726+
.thenReturn("id");
727+
org.mockito.Mockito.when(
728+
schemaMapper.getSpannerColumnCassandraAnnotations(
729+
org.mockito.ArgumentMatchers.anyString(),
730+
org.mockito.ArgumentMatchers.anyString(),
731+
org.mockito.ArgumentMatchers.eq("id")))
732+
.thenReturn(getTestCassandraAnnotationNone());
733+
org.mockito.Mockito.when(
734+
schemaMapper.getSpannerColumnType(
735+
org.mockito.ArgumentMatchers.anyString(),
736+
org.mockito.ArgumentMatchers.anyString(),
737+
org.mockito.ArgumentMatchers.eq("id")))
738+
.thenReturn(Type.int64());
739+
740+
org.mockito.Mockito.when(
741+
schemaMapper.getSpannerColumnName(
742+
org.mockito.ArgumentMatchers.anyString(),
743+
org.mockito.ArgumentMatchers.anyString(),
744+
org.mockito.ArgumentMatchers.eq("dropped_col")))
745+
.thenThrow(new java.util.NoSuchElementException("Dropped column"));
746+
747+
ISpannerMigrationTransformer customTransformer =
748+
new ISpannerMigrationTransformer() {
749+
@Override
750+
public void init(String customParameters) {}
751+
752+
@Override
753+
public MigrationTransformationResponse toSpannerRow(
754+
MigrationTransformationRequest request) {
755+
Map<String, Object> requestRow = request.getRequestRow();
756+
Map<String, Object> responseRow = new HashMap<>();
757+
if (requestRow.containsKey("dropped_col")) {
758+
responseRow.put("id", 100L);
759+
}
760+
return new MigrationTransformationResponse(responseRow, false);
761+
}
762+
763+
@Override
764+
public MigrationTransformationResponse toSourceRow(
765+
MigrationTransformationRequest request) {
766+
return new MigrationTransformationResponse(new HashMap<>(), false);
767+
}
768+
769+
@Override
770+
public MigrationTransformationResponse transformFailedSpannerMutation(
771+
MigrationTransformationRequest request) {
772+
return new MigrationTransformationResponse(new HashMap<>(), false);
773+
}
774+
};
775+
776+
GenericRecordTypeConvertor genericRecordTypeConvertor =
777+
new GenericRecordTypeConvertor(schemaMapper, "", null, customTransformer);
778+
779+
Schema payloadSchema =
780+
SchemaBuilder.record("payload")
781+
.fields()
782+
.name("id")
783+
.type(Schema.create(Schema.Type.LONG))
784+
.noDefault()
785+
.name("dropped_col")
786+
.type(Schema.create(Schema.Type.STRING))
787+
.noDefault()
788+
.endRecord();
789+
790+
GenericRecord payload =
791+
new GenericRecordBuilder(payloadSchema)
792+
.set("id", 1L)
793+
.set("dropped_col", "some_value")
794+
.build();
795+
796+
Map<String, Value> result = genericRecordTypeConvertor.transformChangeEvent(payload, tableName);
797+
798+
assertThat(result.get("id")).isEqualTo(Value.int64(100L));
799+
}
800+
704801
/*
705802
* Test conversion of Interval Nano to String for various cases.
706803
*/

0 commit comments

Comments
 (0)