diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java index 9aa9cbd4c1..7fa5560304 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java @@ -22,14 +22,20 @@ import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.TransactionContext; import com.google.cloud.teleport.v2.spanner.ddl.Ddl; +import com.google.cloud.teleport.v2.spanner.ddl.Table; import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventSpannerConvertor; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException; import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerReadUtils; +import com.google.cloud.teleport.v2.templates.spanner.ShadowTableCreator; +import com.google.common.collect.ImmutableMap; import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,14 +68,45 @@ public abstract class ChangeEventContext { // Data table for the change event. protected String dataTable; + // Immutable map to store the "safe" column names of the shadow table (to avoid collision with + // data column names). + protected final ImmutableMap safeShadowColNames; + + protected ChangeEventContext( + JsonNode changeEvent, Ddl ddl, Map> shadowColumnConstants) + throws InvalidChangeEventException, DroppedTableException { + this.changeEvent = changeEvent; + this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(); + Table table = ddl.table(this.dataTable); + if (table == null) { + throw new DroppedTableException( + "Table from change event does not exist in Spanner. table=" + this.dataTable); + } + Set existingPrimaryKeyColumnNames = + table.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet()); + + ImmutableMap.Builder mapBuilder = ImmutableMap.builder(); + for (Map.Entry> entry : shadowColumnConstants.entrySet()) { + String metadataKey = entry.getKey(); + String originalShadowName = entry.getValue().getLeft(); + String safeName = + ShadowTableCreator.getSafeShadowColumnName( + originalShadowName, existingPrimaryKeyColumnNames); + mapBuilder.put(metadataKey, safeName); + } + this.safeShadowColNames = mapBuilder.build(); + } + // Abstract method to generate shadow table mutation. - abstract Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorException; + abstract Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) + throws ChangeEventConvertorException; // Helper method to convert change event to mutation. protected void convertChangeEventToMutation(Ddl ddl, Ddl shadowTableDdl) throws ChangeEventConvertorException, InvalidChangeEventException, DroppedTableException { ChangeEventConvertor.convertChangeEventColumnKeysToLowerCase(changeEvent); ChangeEventConvertor.verifySpannerSchema(ddl, changeEvent); + this.primaryKey = ChangeEventSpannerConvertor.changeEventToPrimaryKey( changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(), @@ -77,7 +114,7 @@ protected void convertChangeEventToMutation(Ddl ddl, Ddl shadowTableDdl) changeEvent, /* convertNameToLowerCase= */ true); this.dataMutation = ChangeEventConvertor.changeEventToMutation(ddl, changeEvent); - this.shadowTableMutation = generateShadowTableMutation(shadowTableDdl); + this.shadowTableMutation = generateShadowTableMutation(ddl, shadowTableDdl); } public JsonNode getChangeEvent() { @@ -114,6 +151,11 @@ public String getShadowTable() { return shadowTable; } + // Getter for the safe shadow table column names. + public String getSafeShadowColumn(String key) { + return safeShadowColNames.get(key); + } + // Fires a read on Data table with lock scanned ranges. Used to acquire exclusive lock on Data row // at the beginning of a readWriteTransaction public void readDataTableRowWithExclusiveLock( diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactory.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactory.java index 8cf77caceb..a9aef9a273 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactory.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactory.java @@ -73,25 +73,13 @@ public static ChangeEventSequence createChangeEventSequenceFromShadowTable( if (DatastreamConstants.MYSQL_SOURCE_TYPE.equals(sourceType)) { return MySqlChangeEventSequence.createFromShadowTable( - transactionContext, - changeEventContext.getShadowTable(), - shadowDdl, - changeEventContext.getPrimaryKey(), - useSqlStatements); + transactionContext, changeEventContext, shadowDdl, useSqlStatements); } else if (DatastreamConstants.ORACLE_SOURCE_TYPE.equals(sourceType)) { return OracleChangeEventSequence.createFromShadowTable( - transactionContext, - changeEventContext.getShadowTable(), - shadowDdl, - changeEventContext.getPrimaryKey(), - useSqlStatements); + transactionContext, changeEventContext, shadowDdl, useSqlStatements); } else if (DatastreamConstants.POSTGRES_SOURCE_TYPE.equals(sourceType)) { return PostgresChangeEventSequence.createFromShadowTable( - transactionContext, - changeEventContext.getShadowTable(), - shadowDdl, - changeEventContext.getPrimaryKey(), - useSqlStatements); + transactionContext, changeEventContext, shadowDdl, useSqlStatements); } throw new InvalidChangeEventException("Unsupported source database: " + sourceType); } diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java index 250d0011c5..92a0e5d001 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java @@ -19,10 +19,13 @@ import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.Value; import com.google.cloud.teleport.v2.spanner.ddl.Ddl; +import com.google.cloud.teleport.v2.spanner.ddl.Table; import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventTypeConvertor; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException; +import java.util.Set; +import java.util.stream.Collectors; /** * MySql implementation of ChangeEventContext that provides implementation of the @@ -33,10 +36,16 @@ class MySqlChangeEventContext extends ChangeEventContext { public MySqlChangeEventContext( JsonNode changeEvent, Ddl ddl, Ddl shadowTableDdl, String shadowTablePrefix) throws ChangeEventConvertorException, InvalidChangeEventException, DroppedTableException { + super(changeEvent, ddl, DatastreamConstants.MYSQL_SORT_ORDER); this.changeEvent = changeEvent; this.shadowTablePrefix = shadowTablePrefix; this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(); this.shadowTable = shadowTablePrefix + this.dataTable; + + Table dataTable = ddl.table(this.dataTable); + Set primaryKeyColNames = + dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet()); + convertChangeEventToMutation(ddl, shadowTableDdl); } @@ -44,18 +53,19 @@ public MySqlChangeEventContext( * Creates shadow table mutation for MySql. */ @Override - Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorException { + Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) + throws ChangeEventConvertorException { // Get shadow information from change event mutation context Mutation.WriteBuilder builder = ChangeEventConvertor.changeEventToShadowTableMutationBuilder( - ddl, changeEvent, shadowTablePrefix); + shadowDdl, changeEvent, shadowTablePrefix); // Add timestamp information to shadow table mutation Long changeEventTimestamp = ChangeEventTypeConvertor.toLong( changeEvent, DatastreamConstants.MYSQL_TIMESTAMP_KEY, /* requiredField= */ true); builder - .set(DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft()) + .set(getSafeShadowColumn(DatastreamConstants.MYSQL_TIMESTAMP_KEY)) .to(Value.int64(changeEventTimestamp)); /* MySql backfill events "can" have log file and log file position as null. @@ -68,7 +78,7 @@ Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorExcepti logFile = ""; } // Add log file information to shadow table mutation - builder.set(DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft()).to(logFile); + builder.set(getSafeShadowColumn(DatastreamConstants.MYSQL_LOGFILE_KEY)).to(logFile); Long logPosition = ChangeEventTypeConvertor.toLong( @@ -78,7 +88,7 @@ Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorExcepti } // Add logfile position information to shadow table mutation builder - .set(DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft()) + .set(getSafeShadowColumn(DatastreamConstants.MYSQL_LOGPOSITION_KEY)) .to(Value.int64(logPosition)); return builder.build(); diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequence.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequence.java index e5d6760484..85e0e9377d 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequence.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequence.java @@ -26,7 +26,6 @@ import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException; import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerReadUtils; import java.util.List; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,24 +98,25 @@ public static MySqlChangeEventSequence createFromChangeEvent(ChangeEventContext /* * Creates a MySqlChangeEventSequence by reading from a shadow table. * @param transactionContext The transaction context to use for reading from the shadow table - * @param shadowTable The name of the shadow table to read from - * @param primaryKey The primary key to look up in the shadow table + * @param context The change event context with resolved safe shadow column names * @param useSqlStatements If true, performs shadow table read using SQL statement with exclusive lock on row */ public static MySqlChangeEventSequence createFromShadowTable( final TransactionContext transactionContext, - String shadowTable, + ChangeEventContext context, Ddl shadowTableDdl, - Key primaryKey, boolean useSqlStatements) throws ChangeEventSequenceCreationException { try { + String shadowTable = context.getShadowTable(); + Key primaryKey = context.getPrimaryKey(); // Read columns from shadow table List readColumnList = - DatastreamConstants.MYSQL_SORT_ORDER.values().stream() - .map(p -> p.getLeft()) - .collect(Collectors.toList()); + java.util.Arrays.asList( + context.getSafeShadowColumn(DatastreamConstants.MYSQL_TIMESTAMP_KEY), + context.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGFILE_KEY), + context.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGPOSITION_KEY)); Struct row; // TODO: After beam release, use the latest client lib version which supports setting lock // hints via the read api. SQL string generation should be removed. diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java index 666a44ec3d..50344d16ed 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java @@ -19,10 +19,13 @@ import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.Value; import com.google.cloud.teleport.v2.spanner.ddl.Ddl; +import com.google.cloud.teleport.v2.spanner.ddl.Table; import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventTypeConvertor; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException; +import java.util.Set; +import java.util.stream.Collectors; /** * Oracle implementation of ChangeEventContext that provides implementation of the @@ -33,10 +36,16 @@ class OracleChangeEventContext extends ChangeEventContext { public OracleChangeEventContext( JsonNode changeEvent, Ddl ddl, Ddl shadowTableDdl, String shadowTablePrefix) throws ChangeEventConvertorException, InvalidChangeEventException, DroppedTableException { + super(changeEvent, ddl, DatastreamConstants.ORACLE_SORT_ORDER); this.changeEvent = changeEvent; this.shadowTablePrefix = shadowTablePrefix; this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(); this.shadowTable = shadowTablePrefix + this.dataTable; + + Table dataTable = ddl.table(this.dataTable); + Set primaryKeyColNames = + dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet()); + convertChangeEventToMutation(ddl, shadowTableDdl); } @@ -44,18 +53,19 @@ public OracleChangeEventContext( * Creates shadow table mutation for Oracle. */ @Override - Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorException { + Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) + throws ChangeEventConvertorException { // Get shadow information from change event mutation context Mutation.WriteBuilder builder = ChangeEventConvertor.changeEventToShadowTableMutationBuilder( - ddl, changeEvent, shadowTablePrefix); + shadowDdl, changeEvent, shadowTablePrefix); // Add timestamp information to shadow table mutation Long changeEventTimestamp = ChangeEventTypeConvertor.toLong( changeEvent, DatastreamConstants.ORACLE_TIMESTAMP_KEY, /* requiredField= */ true); builder - .set(DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft()) + .set(getSafeShadowColumn(DatastreamConstants.ORACLE_TIMESTAMP_KEY)) .to(Value.int64(changeEventTimestamp)); /* Oracle backfill events "can" have SCN value as null. @@ -69,7 +79,7 @@ Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorExcepti } // Add scn information to shadow table mutation builder - .set(DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft()) + .set(getSafeShadowColumn(DatastreamConstants.ORACLE_SCN_KEY)) .to(Value.int64(changeEventSCN)); return builder.build(); diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequence.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequence.java index e994e7455c..f56853617d 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequence.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequence.java @@ -26,7 +26,6 @@ import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException; import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerReadUtils; import java.util.List; -import java.util.stream.Collectors; /** * Implementation of ChangeEventSequence for Oracle database which stores change event sequence @@ -78,18 +77,19 @@ public static OracleChangeEventSequence createFromChangeEvent(ChangeEventContext */ public static OracleChangeEventSequence createFromShadowTable( final TransactionContext transactionContext, - String shadowTable, + ChangeEventContext context, Ddl shadowTableDdl, - Key primaryKey, boolean useSqlStatements) throws ChangeEventSequenceCreationException { try { + String shadowTable = context.getShadowTable(); + Key primaryKey = context.getPrimaryKey(); // Read columns from shadow table List readColumnList = - DatastreamConstants.ORACLE_SORT_ORDER.values().stream() - .map(p -> p.getLeft()) - .collect(Collectors.toList()); + java.util.Arrays.asList( + context.getSafeShadowColumn(DatastreamConstants.ORACLE_TIMESTAMP_KEY), + context.getSafeShadowColumn(DatastreamConstants.ORACLE_SCN_KEY)); Struct row; // TODO: After beam release, use the latest client lib version which supports setting lock // hints via the read api. SQL string generation should be removed. diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java index 18a31153bf..922d1a46a0 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java @@ -19,10 +19,13 @@ import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.Value; import com.google.cloud.teleport.v2.spanner.ddl.Ddl; +import com.google.cloud.teleport.v2.spanner.ddl.Table; import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventTypeConvertor; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException; +import java.util.Set; +import java.util.stream.Collectors; /** * Postgres implementation of ChangeEventContext that provides implementation of the @@ -33,10 +36,16 @@ class PostgresChangeEventContext extends ChangeEventContext { public PostgresChangeEventContext( JsonNode changeEvent, Ddl ddl, Ddl shadowTableDdl, String shadowTablePrefix) throws ChangeEventConvertorException, InvalidChangeEventException, DroppedTableException { + super(changeEvent, ddl, DatastreamConstants.POSTGRES_SORT_ORDER); this.changeEvent = changeEvent; this.shadowTablePrefix = shadowTablePrefix; this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(); this.shadowTable = shadowTablePrefix + this.dataTable; + + Table dataTable = ddl.table(this.dataTable); + Set primaryKeyColNames = + dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet()); + convertChangeEventToMutation(ddl, shadowTableDdl); } @@ -44,18 +53,19 @@ public PostgresChangeEventContext( * Creates shadow table mutation for Postgres. */ @Override - Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorException { + Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) + throws ChangeEventConvertorException { // Get shadow information from change event mutation context Mutation.WriteBuilder builder = ChangeEventConvertor.changeEventToShadowTableMutationBuilder( - ddl, changeEvent, shadowTablePrefix); + shadowDdl, changeEvent, shadowTablePrefix); // Add timestamp information to shadow table mutation Long changeEventTimestamp = ChangeEventTypeConvertor.toLong( changeEvent, DatastreamConstants.POSTGRES_TIMESTAMP_KEY, /* requiredField= */ true); builder - .set(DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft()) + .set(getSafeShadowColumn(DatastreamConstants.POSTGRES_TIMESTAMP_KEY)) .to(Value.int64(changeEventTimestamp)); /* Postgres backfill events "can" have LSN value as null. @@ -69,7 +79,7 @@ Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorExcepti } // Add lsn information to shadow table mutation builder - .set(DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft()) + .set(getSafeShadowColumn(DatastreamConstants.POSTGRES_LSN_KEY)) .to(Value.string(changeEventLSN)); return builder.build(); diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequence.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequence.java index 1d6482a78c..1ef41b6464 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequence.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequence.java @@ -26,7 +26,6 @@ import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException; import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerReadUtils; import java.util.List; -import java.util.stream.Collectors; /** * Implementation of ChangeEventSequence for Postgres database which stores change event sequence @@ -78,18 +77,19 @@ public static PostgresChangeEventSequence createFromChangeEvent(ChangeEventConte */ public static PostgresChangeEventSequence createFromShadowTable( final TransactionContext transactionContext, - String shadowTable, + ChangeEventContext context, Ddl shadowTableDdl, - Key primaryKey, boolean useSqlStatements) throws ChangeEventSequenceCreationException { try { + String shadowTable = context.getShadowTable(); + Key primaryKey = context.getPrimaryKey(); // Read columns from shadow table List readColumnList = - DatastreamConstants.POSTGRES_SORT_ORDER.values().stream() - .map(p -> p.getLeft()) - .collect(Collectors.toList()); + java.util.Arrays.asList( + context.getSafeShadowColumn(DatastreamConstants.POSTGRES_TIMESTAMP_KEY), + context.getSafeShadowColumn(DatastreamConstants.POSTGRES_LSN_KEY)); Struct row; // TODO: After beam release, use the latest client lib version which supports setting lock // hints via the read api. SQL string generation should be removed. diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java index 86ffec4ad3..e82f961cf1 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java @@ -28,7 +28,7 @@ import org.apache.commons.lang3.tuple.Pair; /** Helper class to create shadow tables for different source types. */ -class ShadowTableCreator { +public class ShadowTableCreator { private final String sourceType; private final String shadowTablePrefix; @@ -83,16 +83,40 @@ Table constructShadowTable(Ddl informationSchema, String dataTableName, Dialect } // Add extra column to track ChangeEventSequence information - addChangeEventSequenceColumns(shadowTableBuilder); + addChangeEventSequenceColumns(shadowTableBuilder, primaryKeyColNames); return shadowTableBuilder.build(); } - private void addChangeEventSequenceColumns(Table.Builder shadowTableBuilder) { + private void addChangeEventSequenceColumns( + Table.Builder shadowTableBuilder, Set primaryKeyColNames) { for (Pair shadowInfo : sortOrderMap.values()) { - Column.Builder versionColumnBuilder = shadowTableBuilder.column(shadowInfo.getLeft()); + String baseShadowColumnName = shadowInfo.getLeft(); + String finalShadowColumnName = + getSafeShadowColumnName(baseShadowColumnName, primaryKeyColNames); + Column.Builder versionColumnBuilder = shadowTableBuilder.column(finalShadowColumnName); versionColumnBuilder.parseType(shadowInfo.getRight()); versionColumnBuilder.endColumn(); } } + + /** + * Generates a safe column name for a shadow table by checking for collisions with existing column + * names and iteratively prepending a prefix until a unique name is found. + * + * @param baseShadowColumnName the initial desired name for the shadow column + * @param existingPrimaryKeyColumnNames a set of existing column names in the data table (should + * be lowercase) + * @return a unique and safe column name + */ + public static String getSafeShadowColumnName( + String baseShadowColumnName, Set existingPrimaryKeyColumnNames) { + Set normalizedKeys = + existingPrimaryKeyColumnNames.stream().map(String::toLowerCase).collect(Collectors.toSet()); + String safeName = baseShadowColumnName; + while (normalizedKeys.contains(safeName.toLowerCase())) { + safeName = "shadow_" + safeName; + } + return safeName; + } } diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactoryTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactoryTest.java index 9107abbea5..7a8068f619 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactoryTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactoryTest.java @@ -186,12 +186,22 @@ public void canCreateMySqlChangeEventSequenceFromShadowTable() throws Exception ChangeEventContext mockContext = getMockMySqlChangeEventContext(/* addMysqlPositionFields= */ true, /* cdcEvent= */ true); + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_TIMESTAMP_KEY)) + .thenReturn(DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft()); + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGFILE_KEY)) + .thenReturn(DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft()); + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGPOSITION_KEY)) + .thenReturn(DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft()); // Mock transaction which can read a row from shadow table. TransactionContext mockTransaction = mock(TransactionContext.class); Struct mockRow = mock(Struct.class); - when(mockRow.getLong(any(String.class))).thenReturn(previousEventTimestamp, 1L); - when(mockRow.getString(any(String.class))).thenReturn("oldlogfile.log"); + when(mockRow.getLong(DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft())) + .thenReturn(previousEventTimestamp); + when(mockRow.getLong(DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft())) + .thenReturn(1L); + when(mockRow.getString(DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft())) + .thenReturn("oldlogfile.log"); when(mockTransaction.readRow(any(String.class), any(Key.class), any(Iterable.class))) .thenReturn(mockRow); @@ -214,12 +224,21 @@ public void canCreateMySqlChangeEventSequenceFromShadowTableForDumpEvent() throw ChangeEventContext mockContext = getMockMySqlChangeEventContext(/* addMysqlPositionFields= */ true, /* cdcEvent= */ true); + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_TIMESTAMP_KEY)) + .thenReturn(DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft()); + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGFILE_KEY)) + .thenReturn(DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft()); + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGPOSITION_KEY)) + .thenReturn(DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft()); // Mock transaction which can read a row from shadow table. TransactionContext mockTransaction = mock(TransactionContext.class); Struct mockRow = mock(Struct.class); - when(mockRow.getLong(any(String.class))).thenReturn(previousEventTimestamp, -1L); - when(mockRow.getString(any(String.class))).thenReturn(""); + when(mockRow.getLong(DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft())) + .thenReturn(previousEventTimestamp); + when(mockRow.getLong(DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft())) + .thenReturn(-1L); + when(mockRow.getString(DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft())).thenReturn(""); when(mockTransaction.readRow(any(String.class), any(Key.class), any(Iterable.class))) .thenReturn(mockRow); @@ -241,6 +260,12 @@ public void cannotCreateMySqlChangeEventSequenceWhenMissingRecordInShadowTable() ChangeEventContext mockContext = getMockMySqlChangeEventContext(/* addMysqlPositionFields= */ true, /* cdcEvent= */ true); + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_TIMESTAMP_KEY)) + .thenReturn(DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft()); + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGFILE_KEY)) + .thenReturn(DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft()); + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGPOSITION_KEY)) + .thenReturn(DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft()); // mock transaction which cannot find a row from shadow table. TransactionContext mockTransaction = mock(TransactionContext.class); @@ -311,11 +336,17 @@ public void canCreateOracleChangeEventSequenceFromShadowTable() throws Exception ChangeEventContext mockContext = getMockOracleChangeEventContext(/* addOraclePositionFields= */ true, /* cdcEvent= */ true); + when(mockContext.getSafeShadowColumn(DatastreamConstants.ORACLE_TIMESTAMP_KEY)) + .thenReturn(DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft()); + when(mockContext.getSafeShadowColumn(DatastreamConstants.ORACLE_SCN_KEY)) + .thenReturn(DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft()); // Mock transaction which can read a row from shadow table. TransactionContext mockTransaction = mock(TransactionContext.class); Struct mockRow = mock(Struct.class); - when(mockRow.getLong(any(String.class))).thenReturn(previousEventTimestamp, 1L); + when(mockRow.getLong(DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft())) + .thenReturn(previousEventTimestamp); + when(mockRow.getLong(DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft())).thenReturn(1L); when(mockTransaction.readRow(any(String.class), any(Key.class), any(Iterable.class))) .thenReturn(mockRow); @@ -337,11 +368,17 @@ public void canCreateOracleChangeEventSequenceFromShadowTableForDumpEvent() thro ChangeEventContext mockContext = getMockOracleChangeEventContext(/* addOraclePositionFields= */ true, /* cdcEvent= */ true); + when(mockContext.getSafeShadowColumn(DatastreamConstants.ORACLE_TIMESTAMP_KEY)) + .thenReturn(DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft()); + when(mockContext.getSafeShadowColumn(DatastreamConstants.ORACLE_SCN_KEY)) + .thenReturn(DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft()); // Mock transaction which can read a row from shadow table. TransactionContext mockTransaction = mock(TransactionContext.class); Struct mockRow = mock(Struct.class); - when(mockRow.getLong(any(String.class))).thenReturn(previousEventTimestamp, -1L); + when(mockRow.getLong(DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft())) + .thenReturn(previousEventTimestamp); + when(mockRow.getLong(DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft())).thenReturn(-1L); when(mockTransaction.readRow(any(String.class), any(Key.class), any(Iterable.class))) .thenReturn(mockRow); @@ -362,6 +399,10 @@ public void cannotCreateOracleChangeEventSequenceWhenMissingRecordInShadowTable( ChangeEventContext mockContext = getMockOracleChangeEventContext(/* addOraclePositionFields= */ true, /* cdcEvent= */ true); + when(mockContext.getSafeShadowColumn(DatastreamConstants.ORACLE_TIMESTAMP_KEY)) + .thenReturn(DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft()); + when(mockContext.getSafeShadowColumn(DatastreamConstants.ORACLE_SCN_KEY)) + .thenReturn(DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft()); // mock transaction which cannot find a row from shadow table. TransactionContext mockTransaction = mock(TransactionContext.class); @@ -435,12 +476,18 @@ public void canCreatePostgresChangeEventSequenceFromShadowTable() throws Excepti ChangeEventContext mockContext = getMockPostgresChangeEventContext( /* addPostgresPositionFields= */ true, /* cdcEvent= */ true); + when(mockContext.getSafeShadowColumn(DatastreamConstants.POSTGRES_TIMESTAMP_KEY)) + .thenReturn(DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft()); + when(mockContext.getSafeShadowColumn(DatastreamConstants.POSTGRES_LSN_KEY)) + .thenReturn(DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft()); // Mock transaction which can read a row from shadow table. TransactionContext mockTransaction = mock(TransactionContext.class); Struct mockRow = mock(Struct.class); - when(mockRow.getLong(any(String.class))).thenReturn(previousEventTimestamp); - when(mockRow.getString(any(String.class))).thenReturn("13/314"); + when(mockRow.getLong(DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft())) + .thenReturn(previousEventTimestamp); + when(mockRow.getString(DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft())) + .thenReturn("13/314"); when(mockTransaction.readRow(any(String.class), any(Key.class), any(Iterable.class))) .thenReturn(mockRow); @@ -463,12 +510,17 @@ public void canCreatePostgresChangeEventSequenceFromShadowTableForDumpEvent() th ChangeEventContext mockContext = getMockPostgresChangeEventContext( /* addPostgresPositionFields= */ true, /* cdcEvent= */ true); + when(mockContext.getSafeShadowColumn(DatastreamConstants.POSTGRES_TIMESTAMP_KEY)) + .thenReturn(DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft()); + when(mockContext.getSafeShadowColumn(DatastreamConstants.POSTGRES_LSN_KEY)) + .thenReturn(DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft()); // Mock transaction which can read a row from shadow table. TransactionContext mockTransaction = mock(TransactionContext.class); Struct mockRow = mock(Struct.class); - when(mockRow.getLong(any(String.class))).thenReturn(previousEventTimestamp); - when(mockRow.getString(any(String.class))).thenReturn(""); + when(mockRow.getLong(DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft())) + .thenReturn(previousEventTimestamp); + when(mockRow.getString(DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft())).thenReturn(""); when(mockTransaction.readRow(any(String.class), any(Key.class), any(Iterable.class))) .thenReturn(mockRow); @@ -490,6 +542,10 @@ public void cannotCreatePostgresChangeEventSequenceWhenMissingRecordInShadowTabl ChangeEventContext mockContext = getMockPostgresChangeEventContext( /* addPostgresPositionFields= */ true, /* cdcEvent= */ true); + when(mockContext.getSafeShadowColumn(DatastreamConstants.POSTGRES_TIMESTAMP_KEY)) + .thenReturn(DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft()); + when(mockContext.getSafeShadowColumn(DatastreamConstants.POSTGRES_LSN_KEY)) + .thenReturn(DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft()); // mock transaction which cannot find a row from shadow table. TransactionContext mockTransaction = mock(TransactionContext.class); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContextTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContextTest.java index 11413195ed..92df7d2345 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContextTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContextTest.java @@ -179,6 +179,73 @@ public void canGenerateShadowTableMutationForBackfillEventsWithMissingSortOrderK assertEquals(shadowMutation.getOperation(), Mutation.Op.INSERT_OR_UPDATE); } + @Test + public void canGenerateShadowTableMutationWithCollision() throws Exception { + long eventTimestamp = 1615159728L; + Ddl ddl = + Ddl.builder() + .createTable("MyTable") + .column("log_file") + .int64() + .max() + .endColumn() + .column("data") + .string() + .max() + .endColumn() + .primaryKey() + .asc("log_file") + .end() + .endTable() + .build(); + Ddl shadowDdl = + Ddl.builder() + .createTable("shadow_MyTable") + .column("log_file") + .int64() + .max() + .endColumn() + .column("data") + .string() + .max() + .endColumn() + .column("shadow_log_file") + .string() + .max() + .endColumn() + .primaryKey() + .asc("log_file") + .end() + .endTable() + .build(); + JSONObject changeEvent = new JSONObject(); + changeEvent.put("log_file", 3); + changeEvent.put(DatastreamConstants.EVENT_TABLE_NAME_KEY, "MyTable"); + changeEvent.put( + DatastreamConstants.EVENT_SOURCE_TYPE_KEY, DatastreamConstants.MYSQL_SOURCE_TYPE); + changeEvent.put(DatastreamConstants.MYSQL_TIMESTAMP_KEY, eventTimestamp); + changeEvent.put(DatastreamConstants.MYSQL_LOGFILE_KEY, "mysql-bin.00001"); + changeEvent.put(DatastreamConstants.MYSQL_LOGPOSITION_KEY, 100L); + + ChangeEventContext changeEventContext = + ChangeEventContextFactory.createChangeEventContext( + getJsonNode(changeEvent.toString()), + ddl, + shadowDdl, + "shadow_", + DatastreamConstants.MYSQL_SOURCE_TYPE); + Mutation shadowMutation = changeEventContext.getShadowTableMutation(); + Map actual = shadowMutation.asMap(); + + // The PK column 'log_file' should be present. + assertEquals(actual.get("log_file"), Value.int64(3)); + // The conflicting shadow column should be renamed to 'shadow_log_file'. + assertEquals(actual.get("shadow_log_file"), Value.string("mysql-bin.00001")); + // The other shadow columns should be present with their default names. + assertEquals(actual.get("timestamp"), Value.int64(eventTimestamp)); + assertEquals(actual.get("log_position"), Value.int64(100L)); + } + @Test public void testReadDataTable() throws Exception { long eventTimestamp = 1615159728L; diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java index ea771aa077..1fe028d53e 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java @@ -104,14 +104,13 @@ public void canOrderDumpEventAndCDCEventAtSameTimestamp() { public void testCreateFromShadowTableWithUseSqlStatements() throws Exception { // Arrange TransactionContext transactionContext = mock(TransactionContext.class); - String shadowTable = "shadow_table1"; Ddl shadowTableDdl = Ddl.builder() .createTable("shadow_table1") .column("id") .int64() .endColumn() - .column("timestamp") + .column("shadow_timestamp") .int64() .endColumn() .column("log_file") @@ -125,13 +124,24 @@ public void testCreateFromShadowTableWithUseSqlStatements() throws Exception { .end() .endTable() .build(); - Key primaryKey = Key.of(1L); boolean useSqlStatements = true; + // Mock the ChangeEventContext + ChangeEventContext mockContext = mock(ChangeEventContext.class); + when(mockContext.getShadowTable()).thenReturn("shadow_table1"); + when(mockContext.getPrimaryKey()).thenReturn(Key.of(1L)); + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_TIMESTAMP_KEY)) + .thenReturn("shadow_timestamp"); + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGFILE_KEY)) + .thenReturn("log_file"); + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGPOSITION_KEY)) + .thenReturn("log_position"); + // Mock the behavior of the transaction context Struct mockRow = mock(Struct.class); when(mockRow.getLong("id")).thenReturn(1L); - when(mockRow.getLong("timestamp")).thenReturn(1615159728L); // Updated to match new column + when(mockRow.getLong("shadow_timestamp")) + .thenReturn(1615159728L); // Updated to match new column when(mockRow.getString("log_file")).thenReturn("file1.log"); when(mockRow.getLong("log_position")).thenReturn(2L); // Updated to match new column @@ -143,7 +153,7 @@ public void testCreateFromShadowTableWithUseSqlStatements() throws Exception { // Act MySqlChangeEventSequence result = MySqlChangeEventSequence.createFromShadowTable( - transactionContext, shadowTable, shadowTableDdl, primaryKey, useSqlStatements); + transactionContext, mockContext, shadowTableDdl, useSqlStatements); // Assert assertNotNull(result); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContextTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContextTest.java index 9eb39a40e6..127b2dd46f 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContextTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContextTest.java @@ -157,4 +157,64 @@ public void canGenerateShadowTableMutationForBackfillEventWithMissingKeys() thro assertEquals(shadowMutation.getTable(), "shadow_Users2"); assertEquals(shadowMutation.getOperation(), Mutation.Op.INSERT_OR_UPDATE); } + + @Test + public void canGenerateShadowTableMutationWithCollision() throws Exception { + long eventTimestamp = 1615159728L; + Ddl ddl = + Ddl.builder() + .createTable("MyTable") + .column("scn") + .string() + .endColumn() + .column("data") + .string() + .max() + .endColumn() + .primaryKey() + .asc("scn") + .end() + .endTable() + .build(); + Ddl shadowDdl = + Ddl.builder() + .createTable("shadow_MyTable") + .column("scn") + .string() + .max() + .endColumn() + .column("data") + .string() + .max() + .endColumn() + .column("shadow_scn") + .int64() + .max() + .endColumn() + .primaryKey() + .asc("scn") + .end() + .endTable() + .build(); + JSONObject changeEvent = new JSONObject(); + changeEvent.put("scn", "scn_string"); + changeEvent.put(DatastreamConstants.EVENT_TABLE_NAME_KEY, "MyTable"); + changeEvent.put( + DatastreamConstants.EVENT_SOURCE_TYPE_KEY, DatastreamConstants.ORACLE_SOURCE_TYPE); + changeEvent.put(DatastreamConstants.ORACLE_TIMESTAMP_KEY, eventTimestamp); + changeEvent.put(DatastreamConstants.ORACLE_SCN_KEY, 999L); + + ChangeEventContext changeEventContext = + ChangeEventContextFactory.createChangeEventContext( + getJsonNode(changeEvent.toString()), ddl, shadowDdl, "shadow_", "oracle"); + Mutation shadowMutation = changeEventContext.getShadowTableMutation(); + Map actual = shadowMutation.asMap(); + + // The PK column 'scn' should be present. + assertEquals(actual.get("scn"), Value.string("scn_string")); + // The conflicting shadow column should be renamed to 'shadow_scn'. + assertEquals(actual.get("shadow_scn"), Value.int64(999L)); + // The other shadow columns should be present with their default names. + assertEquals(actual.get("timestamp"), Value.int64(eventTimestamp)); + } } diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java index df1d15c893..3aec91c4b1 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java @@ -68,14 +68,13 @@ public void canOrderDumpEventAndCDCEventAtSameTimestamp() { public void testCreateFromShadowTableWithUseSqlStatements_Oracle() throws Exception { // Arrange TransactionContext transactionContext = mock(TransactionContext.class); - String shadowTable = "shadow_table_oracle"; Ddl shadowTableDdl = Ddl.builder() .createTable("shadow_table_oracle") .column("id") .int64() .endColumn() - .column("timestamp") + .column("shadow_timestamp") .int64() .endColumn() .column("scn") @@ -86,13 +85,19 @@ public void testCreateFromShadowTableWithUseSqlStatements_Oracle() throws Except .end() .endTable() .build(); - Key primaryKey = Key.of(1L); boolean useSqlStatements = true; + ChangeEventContext mockContext = mock(ChangeEventContext.class); + when(mockContext.getShadowTable()).thenReturn("shadow_table_oracle"); + when(mockContext.getPrimaryKey()).thenReturn(Key.of(1L)); + when(mockContext.getSafeShadowColumn(DatastreamConstants.ORACLE_TIMESTAMP_KEY)) + .thenReturn("shadow_timestamp"); + when(mockContext.getSafeShadowColumn(DatastreamConstants.ORACLE_SCN_KEY)).thenReturn("scn"); + // Mock the behavior of the transaction context Struct mockRow = mock(Struct.class); when(mockRow.getLong("id")).thenReturn(1L); - when(mockRow.getLong("timestamp")).thenReturn(1615159728L); + when(mockRow.getLong("shadow_timestamp")).thenReturn(1615159728L); when(mockRow.getLong("scn")).thenReturn(100L); ResultSet mockResultSet = mock(ResultSet.class); @@ -103,7 +108,7 @@ public void testCreateFromShadowTableWithUseSqlStatements_Oracle() throws Except // Act OracleChangeEventSequence result = OracleChangeEventSequence.createFromShadowTable( - transactionContext, shadowTable, shadowTableDdl, primaryKey, useSqlStatements); + transactionContext, mockContext, shadowTableDdl, useSqlStatements); // Assert assertNotNull(result); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContextTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContextTest.java index 9f1bda5fe7..e9c61bd091 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContextTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContextTest.java @@ -157,4 +157,65 @@ public void canGenerateShadowTableMutationForBackfillEventWithMissingKeys() thro assertEquals(shadowMutation.getTable(), "shadow_Users2"); assertEquals(shadowMutation.getOperation(), Mutation.Op.INSERT_OR_UPDATE); } + + @Test + public void canGenerateShadowTableMutationWithCollision() throws Exception { + long eventTimestamp = 1615159728L; + Ddl ddl = + Ddl.builder() + .createTable("MyTable") + .column("lsn") + .int64() + .max() + .endColumn() + .column("data") + .string() + .max() + .endColumn() + .primaryKey() + .asc("lsn") + .end() + .endTable() + .build(); + Ddl shadowDdl = + Ddl.builder() + .createTable("shadow_MyTable") + .column("lsn") + .int64() + .max() + .endColumn() + .column("data") + .string() + .max() + .endColumn() + .column("shadow_lsn") + .string() + .max() + .endColumn() + .primaryKey() + .asc("lsn") + .end() + .endTable() + .build(); + JSONObject changeEvent = new JSONObject(); + changeEvent.put("lsn", 123); + changeEvent.put(DatastreamConstants.EVENT_TABLE_NAME_KEY, "MyTable"); + changeEvent.put( + DatastreamConstants.EVENT_SOURCE_TYPE_KEY, DatastreamConstants.POSTGRES_SOURCE_TYPE); + changeEvent.put(DatastreamConstants.POSTGRES_TIMESTAMP_KEY, eventTimestamp); + changeEvent.put(DatastreamConstants.POSTGRES_LSN_KEY, "1/ABC"); + + ChangeEventContext changeEventContext = + ChangeEventContextFactory.createChangeEventContext( + getJsonNode(changeEvent.toString()), ddl, shadowDdl, "shadow_", "postgresql"); + Mutation shadowMutation = changeEventContext.getShadowTableMutation(); + Map actual = shadowMutation.asMap(); + + // The PK column 'lsn' should be present. + assertEquals(actual.get("lsn"), Value.int64(123)); + // The conflicting shadow column should be renamed to 'shadow_lsn'. + assertEquals(actual.get("shadow_lsn"), Value.string("1/ABC")); + // The other shadow columns should be present with their default names. + assertEquals(actual.get("timestamp"), Value.int64(eventTimestamp)); + } } diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequenceTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequenceTest.java index ed3a025dc2..2fcb99344c 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequenceTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequenceTest.java @@ -117,14 +117,13 @@ public void canOrderDumpEventAndCDCEventAtSameTimestamp() { public void testCreateFromShadowTableWithUseSqlStatements_Postgres() throws Exception { // Arrange TransactionContext transactionContext = mock(TransactionContext.class); - String shadowTable = "shadow_table_postgres"; Ddl shadowTableDdl = Ddl.builder() .createTable("shadow_table_postgres") .column("id") .int64() .endColumn() - .column("timestamp") + .column("shadow_timestamp") .int64() .endColumn() .column("lsn") @@ -135,13 +134,18 @@ public void testCreateFromShadowTableWithUseSqlStatements_Postgres() throws Exce .end() .endTable() .build(); - Key primaryKey = Key.of(1L); boolean useSqlStatements = true; + ChangeEventContext mockContext = mock(ChangeEventContext.class); + when(mockContext.getShadowTable()).thenReturn("shadow_table_postgres"); + when(mockContext.getPrimaryKey()).thenReturn(Key.of(1L)); + when(mockContext.getSafeShadowColumn(DatastreamConstants.POSTGRES_TIMESTAMP_KEY)) + .thenReturn("shadow_timestamp"); + when(mockContext.getSafeShadowColumn(DatastreamConstants.POSTGRES_LSN_KEY)).thenReturn("lsn"); + // Mock the behavior of the transaction context Struct mockRow = mock(Struct.class); - when(mockRow.getLong("id")).thenReturn(1L); - when(mockRow.getLong("timestamp")).thenReturn(1615159728L); + when(mockRow.getLong("shadow_timestamp")).thenReturn(1615159728L); when(mockRow.getString("lsn")).thenReturn("0/123456"); ResultSet mockResultSet = mock(ResultSet.class); @@ -152,7 +156,7 @@ public void testCreateFromShadowTableWithUseSqlStatements_Postgres() throws Exce // Act PostgresChangeEventSequence result = PostgresChangeEventSequence.createFromShadowTable( - transactionContext, shadowTable, shadowTableDdl, primaryKey, useSqlStatements); + transactionContext, mockContext, shadowTableDdl, useSqlStatements); // Assert assertNotNull(result); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java index f05089fbcc..2ab79e9eb3 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java @@ -250,4 +250,91 @@ public void canConstructShadowTableForPostgresWithPostgresDialect() { expectedColumnTypes.add(Type.pgVarchar().toString()); assertThat(columnTypes, is(expectedColumnTypes)); } + + @Test + public void canHandlePkColumnNameCollision() { + Ddl ddl = + Ddl.builder() + .createTable("MyTable") + .column("timestamp") + .timestamp() + .endColumn() + .column("data") + .string() + .max() + .endColumn() + .primaryKey() + .asc("timestamp") + .end() + .endTable() + .build(); + + ShadowTableCreator shadowTableCreator = + new ShadowTableCreator("mysql", "shadow_", Dialect.GOOGLE_STANDARD_SQL); + Table shadowTable = + shadowTableCreator.constructShadowTable(ddl, "MyTable", Dialect.GOOGLE_STANDARD_SQL); + + assertEquals(shadowTable.name(), "shadow_MyTable"); + // The original PK column should exist. + assertEquals(shadowTable.column("timestamp").type().getCode(), Type.Code.TIMESTAMP); + // The new shadow column should have been renamed. + assertEquals(shadowTable.column("shadow_timestamp").type().getCode(), Type.Code.INT64); + } + + @Test + public void canHandleMultiplePkColumnNameCollisions() { + Ddl ddl = + Ddl.builder(Dialect.GOOGLE_STANDARD_SQL) + .createTable("MyTable") + .column("log_file") + .string() + .max() + .endColumn() + .column("shadow_log_file") + .int64() + .max() + .endColumn() + .primaryKey() + .asc("log_file") + .asc("shadow_log_file") + .end() + .endTable() + .build(); + + ShadowTableCreator shadowTableCreator = + new ShadowTableCreator("mysql", "shadow_", Dialect.GOOGLE_STANDARD_SQL); + Table shadowTable = + shadowTableCreator.constructShadowTable(ddl, "MyTable", Dialect.GOOGLE_STANDARD_SQL); + + assertEquals(shadowTable.name(), "shadow_MyTable"); + // The original PK columns should exist. + assertEquals(shadowTable.column("log_file").type().getCode(), Type.Code.STRING); + assertEquals(shadowTable.column("shadow_log_file").type().getCode(), Type.Code.INT64); + // The new shadow column should have been renamed twice. + assertEquals(shadowTable.column("shadow_shadow_log_file").type().getCode(), Type.Code.STRING); + } + + @Test + public void testGetSafeShadowColumnName() { + // Base case: no collision + assertEquals( + "log_file", + ShadowTableCreator.getSafeShadowColumnName("log_file", Set.of("column1", "column2"))); + + // Single collision: should add one prefix + assertEquals( + "shadow_log_file", + ShadowTableCreator.getSafeShadowColumnName("log_file", Set.of("column1", "log_file"))); + + // Multiple collisions: should add two prefixes + assertEquals( + "shadow_shadow_log_file", + ShadowTableCreator.getSafeShadowColumnName( + "log_file", Set.of("log_file", "shadow_log_file"))); + + // Case-insensitivity test + assertEquals( + "shadow_log_file", + ShadowTableCreator.getSafeShadowColumnName("log_file", Set.of("column1", "LOG_FILE"))); + } }