From fae3234658e25f3d9ffa2cf959b566ece30f13c5 Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Mon, 1 Dec 2025 23:06:44 +0530 Subject: [PATCH 01/13] initial changes --- .../datastream/DatastreamConstants.java | 12 ++++---- .../spanner/ShadowTableCreatorTest.java | 30 +++++++++++++++++++ 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/DatastreamConstants.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/DatastreamConstants.java index d852cad0f4..cc354d542d 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/DatastreamConstants.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/DatastreamConstants.java @@ -74,10 +74,10 @@ public class DatastreamConstants { public static final String ORACLE_TIMESTAMP_KEY = "_metadata_timestamp"; public static final Pair ORACLE_TIMESTAMP_SHADOW_INFO = - Pair.of("timestamp", "INT64"); + Pair.of("shadow_timestamp", "INT64"); public static final Pair ORACLE_TIMESTAMP_SHADOW_INFO_PG_DIALECT = - Pair.of("timestamp", "bigint"); + Pair.of("shadow_timestamp", "bigint"); public static final String ORACLE_SCN_KEY = "_metadata_scn"; @@ -96,10 +96,10 @@ public class DatastreamConstants { public static final String MYSQL_TIMESTAMP_KEY = "_metadata_timestamp"; public static final Pair MYSQL_TIMESTAMP_SHADOW_INFO = - Pair.of("timestamp", "INT64"); + Pair.of("shadow_timestamp", "INT64"); public static final Pair MYSQL_TIMESTAMP_SHADOW_INFO_PG_DIALECT = - Pair.of("timestamp", "bigint"); + Pair.of("shadow_timestamp", "bigint"); public static final String MYSQL_LOGFILE_KEY = "_metadata_log_file"; @@ -127,10 +127,10 @@ public class DatastreamConstants { public static final String POSTGRES_TIMESTAMP_KEY = "_metadata_timestamp"; public static final Pair POSTGRES_TIMESTAMP_SHADOW_INFO = - Pair.of("timestamp", "INT64"); + Pair.of("shadow_timestamp", "INT64"); public static final Pair POSTGRES_TIMESTAMP_SHADOW_INFO_PG_DIALECT = - Pair.of("timestamp", "bigint"); + Pair.of("shadow_timestamp", "bigint"); public static final String POSTGRES_LSN_KEY = "_metadata_lsn"; diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java index f05089fbcc..5938ee94cb 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java @@ -250,4 +250,34 @@ public void canConstructShadowTableForPostgresWithPostgresDialect() { expectedColumnTypes.add(Type.pgVarchar().toString()); assertThat(columnTypes, is(expectedColumnTypes)); } + + @Test + public void canHandlePkColumnNameCollision() { + Ddl ddl = + Ddl.builder() + .createTable("MyTable") + .column("timestamp") + .timestamp() + .endColumn() + .column("data") + .string() + .max() + .endColumn() + .primaryKey() + .asc("timestamp") + .end() + .endTable() + .build(); + + ShadowTableCreator shadowTableCreator = + new ShadowTableCreator("mysql", "shadow_", Dialect.GOOGLE_STANDARD_SQL); + Table shadowTable = + shadowTableCreator.constructShadowTable(ddl, "MyTable", Dialect.GOOGLE_STANDARD_SQL); + + assertEquals(shadowTable.name(), "shadow_MyTable"); + // Verify that the original PK column is preserved with the correct type. + assertEquals(shadowTable.column("timestamp").type().getCode(), Type.Code.TIMESTAMP); + // Verify that the new metadata column is present with the new name and correct type. + assertEquals(shadowTable.column("shadow_timestamp").type().getCode(), Type.Code.INT64); + } } From c1307d2c9487bd1e66670d7d400732349ade5208 Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Tue, 2 Dec 2025 11:21:26 +0530 Subject: [PATCH 02/13] fix failing unit tests --- .../v2/templates/spanner/ShadowTableCreatorTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java index 5938ee94cb..a6d4786091 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java @@ -55,7 +55,7 @@ public void canConstructShadowTableForOracleWithGsqlDialect() { testDdl.table("Users_interleaved").primaryKeys().stream() .map(c -> c.name()) .collect(Collectors.toSet()); - expectedColumns.add("timestamp"); + expectedColumns.add("shadow_timestamp"); expectedColumns.add("scn"); assertThat(columns, is(expectedColumns)); } @@ -82,7 +82,7 @@ public void canConstructShadowTableForOracleWithPostgresDialect() { testDdl.table("Users_interleaved").primaryKeys().stream() .map(c -> c.name()) .collect(Collectors.toSet()); - expectedColumns.add("timestamp"); + expectedColumns.add("shadow_timestamp"); expectedColumns.add("scn"); assertThat(columns, is(expectedColumns)); List columnTypes = @@ -127,7 +127,7 @@ public void canConstructShadowTableForMySqlWithGsqlDialect() { testDdl.table("Users_interleaved").primaryKeys().stream() .map(c -> c.name()) .collect(Collectors.toSet()); - expectedColumns.add("timestamp"); + expectedColumns.add("shadow_timestamp"); expectedColumns.add("log_file"); expectedColumns.add("log_position"); assertThat(columns, is(expectedColumns)); @@ -155,7 +155,7 @@ public void canConstructShadowTableForMySqlWithPostgresDialect() { testDdl.table("Users_interleaved").primaryKeys().stream() .map(c -> c.name()) .collect(Collectors.toSet()); - expectedColumns.add("timestamp"); + expectedColumns.add("shadow_timestamp"); expectedColumns.add("log_file"); expectedColumns.add("log_position"); assertThat(columns, is(expectedColumns)); @@ -202,7 +202,7 @@ public void canConstructShadowTableForPostgresWithGsqlDialect() { testDdl.table("Users_interleaved").primaryKeys().stream() .map(c -> c.name()) .collect(Collectors.toSet()); - expectedColumns.add("timestamp"); + expectedColumns.add("shadow_timestamp"); expectedColumns.add("lsn"); assertThat(columns, is(expectedColumns)); } @@ -229,7 +229,7 @@ public void canConstructShadowTableForPostgresWithPostgresDialect() { testDdl.table("Users_interleaved").primaryKeys().stream() .map(c -> c.name()) .collect(Collectors.toSet()); - expectedColumns.add("timestamp"); + expectedColumns.add("shadow_timestamp"); expectedColumns.add("lsn"); assertThat(columns, is(expectedColumns)); List columnTypes = From 055b38a45245afcf83158f0729376da4216723e4 Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Tue, 2 Dec 2025 12:28:58 +0530 Subject: [PATCH 03/13] fix table name --- .../v2/templates/SpannerTransactionWriterDoFnTest.java | 2 +- .../v2/templates/datastream/MySqlChangeEventSequenceTest.java | 4 ++-- .../templates/datastream/OracleChangeEventSequenceTest.java | 4 ++-- .../templates/datastream/PostgresChangeEventSequenceTest.java | 4 ++-- .../v2/templates/spanner/ProcessInformationSchemaTest.java | 4 ++-- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFnTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFnTest.java index 1d594f4b95..d76c8b8614 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFnTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFnTest.java @@ -204,7 +204,7 @@ public void testProcessElement() { Mutation.WriteBuilder shadowBuilder = Mutation.newInsertOrUpdateBuilder("shadow_Users"); shadowBuilder.set("first_name").to("Johnny"); shadowBuilder.set("last_name").to("Depp"); - shadowBuilder.set("timestamp").to(12345); + shadowBuilder.set("shadow_timestamp").to(12345); shadowBuilder.set("log_file").to(""); shadowBuilder.set("log_position").to(-1); Mutation expectedShadowMutation = shadowBuilder.build(); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java index ea771aa077..2d80ce34a3 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java @@ -111,7 +111,7 @@ public void testCreateFromShadowTableWithUseSqlStatements() throws Exception { .column("id") .int64() .endColumn() - .column("timestamp") + .column("shadow_timestamp") .int64() .endColumn() .column("log_file") @@ -131,7 +131,7 @@ public void testCreateFromShadowTableWithUseSqlStatements() throws Exception { // Mock the behavior of the transaction context Struct mockRow = mock(Struct.class); when(mockRow.getLong("id")).thenReturn(1L); - when(mockRow.getLong("timestamp")).thenReturn(1615159728L); // Updated to match new column + when(mockRow.getLong("shadow_timestamp")).thenReturn(1615159728L); // Updated to match new column when(mockRow.getString("log_file")).thenReturn("file1.log"); when(mockRow.getLong("log_position")).thenReturn(2L); // Updated to match new column diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java index df1d15c893..78b6845d50 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java @@ -75,7 +75,7 @@ public void testCreateFromShadowTableWithUseSqlStatements_Oracle() throws Except .column("id") .int64() .endColumn() - .column("timestamp") + .column("shadow_timestamp") .int64() .endColumn() .column("scn") @@ -92,7 +92,7 @@ public void testCreateFromShadowTableWithUseSqlStatements_Oracle() throws Except // Mock the behavior of the transaction context Struct mockRow = mock(Struct.class); when(mockRow.getLong("id")).thenReturn(1L); - when(mockRow.getLong("timestamp")).thenReturn(1615159728L); + when(mockRow.getLong("shadow_timestamp")).thenReturn(1615159728L); when(mockRow.getLong("scn")).thenReturn(100L); ResultSet mockResultSet = mock(ResultSet.class); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequenceTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequenceTest.java index ed3a025dc2..0cf615eff8 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequenceTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequenceTest.java @@ -124,7 +124,7 @@ public void testCreateFromShadowTableWithUseSqlStatements_Postgres() throws Exce .column("id") .int64() .endColumn() - .column("timestamp") + .column("shadow_timestamp") .int64() .endColumn() .column("lsn") @@ -141,7 +141,7 @@ public void testCreateFromShadowTableWithUseSqlStatements_Postgres() throws Exce // Mock the behavior of the transaction context Struct mockRow = mock(Struct.class); when(mockRow.getLong("id")).thenReturn(1L); - when(mockRow.getLong("timestamp")).thenReturn(1615159728L); + when(mockRow.getLong("shadow_timestamp")).thenReturn(1615159728L); when(mockRow.getString("lsn")).thenReturn("0/123456"); ResultSet mockResultSet = mock(ResultSet.class); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ProcessInformationSchemaTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ProcessInformationSchemaTest.java index 3b36c4be99..ab1e7c6026 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ProcessInformationSchemaTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ProcessInformationSchemaTest.java @@ -455,7 +455,7 @@ public void canCreateShadowTablesInSpanner() throws Exception { + "\t`timestamp_field` TIMESTAMP,\n" + "\t`date_field` DATE,\n" + "\t`id` INT64,\n" - + "\t`timestamp` INT64,\n" + + "\t`shadow_timestamp` INT64,\n" + "\t`log_file` STRING(MAX),\n" + "\t`log_position` INT64,\n" + ") PRIMARY KEY (`first_name` ASC, `last_name` DESC, `age` ASC, `bool_field` ASC, `int64_field` ASC, `float64_field` ASC, `string_field` ASC, `bytes_field` ASC, `timestamp_field` ASC, `date_field` ASC, `id` ASC)"); @@ -500,7 +500,7 @@ public void canCreateShadowTablesInSpanner_separateDb() throws Exception { Collections.singletonList( "CREATE TABLE `shadow_table2` (\n" + "\t`id` INT64,\n" - + "\t`timestamp` INT64,\n" + + "\t`shadow_timestamp` INT64,\n" + "\t`log_file` STRING(MAX),\n" + "\t`log_position` INT64,\n" + ") PRIMARY KEY (`id` ASC)"); From c1ee8de3a3727b74c244c72c767866271556ab14 Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Tue, 2 Dec 2025 13:51:40 +0530 Subject: [PATCH 04/13] spotless --- .../v2/templates/datastream/MySqlChangeEventSequenceTest.java | 3 ++- .../v2/templates/datastream/OracleChangeEventSequenceTest.java | 2 +- .../teleport/v2/templates/spanner/ShadowTableCreatorTest.java | 2 -- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java index 2d80ce34a3..17317f78d6 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java @@ -131,7 +131,8 @@ public void testCreateFromShadowTableWithUseSqlStatements() throws Exception { // Mock the behavior of the transaction context Struct mockRow = mock(Struct.class); when(mockRow.getLong("id")).thenReturn(1L); - when(mockRow.getLong("shadow_timestamp")).thenReturn(1615159728L); // Updated to match new column + when(mockRow.getLong("shadow_timestamp")) + .thenReturn(1615159728L); // Updated to match new column when(mockRow.getString("log_file")).thenReturn("file1.log"); when(mockRow.getLong("log_position")).thenReturn(2L); // Updated to match new column diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java index 78b6845d50..b6583fc665 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java @@ -75,7 +75,7 @@ public void testCreateFromShadowTableWithUseSqlStatements_Oracle() throws Except .column("id") .int64() .endColumn() - .column("shadow_timestamp") + .column("shadow_timestamp") .int64() .endColumn() .column("scn") diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java index a6d4786091..a29341b6a3 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java @@ -275,9 +275,7 @@ public void canHandlePkColumnNameCollision() { shadowTableCreator.constructShadowTable(ddl, "MyTable", Dialect.GOOGLE_STANDARD_SQL); assertEquals(shadowTable.name(), "shadow_MyTable"); - // Verify that the original PK column is preserved with the correct type. assertEquals(shadowTable.column("timestamp").type().getCode(), Type.Code.TIMESTAMP); - // Verify that the new metadata column is present with the new name and correct type. assertEquals(shadowTable.column("shadow_timestamp").type().getCode(), Type.Code.INT64); } } From ed92c68f2b6fcd6bf7f27988bcabf2106605924a Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Mon, 8 Dec 2025 19:30:44 +0530 Subject: [PATCH 05/13] dynamic name changes --- .../datastream/ChangeEventContext.java | 11 +++++-- .../datastream/DatastreamConstants.java | 12 +++---- .../datastream/MySqlChangeEventContext.java | 32 +++++++++++++------ .../datastream/OracleChangeEventContext.java | 27 +++++++++++----- .../PostgresChangeEventContext.java | 27 +++++++++++----- .../templates/spanner/ShadowTableCreator.java | 29 ++++++++++++++--- 6 files changed, 101 insertions(+), 37 deletions(-) diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java index 9aa9cbd4c1..19f575db0d 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java @@ -63,13 +63,20 @@ public abstract class ChangeEventContext { protected String dataTable; // Abstract method to generate shadow table mutation. - abstract Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorException; + abstract Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) + throws ChangeEventConvertorException; // Helper method to convert change event to mutation. protected void convertChangeEventToMutation(Ddl ddl, Ddl shadowTableDdl) throws ChangeEventConvertorException, InvalidChangeEventException, DroppedTableException { ChangeEventConvertor.convertChangeEventColumnKeysToLowerCase(changeEvent); ChangeEventConvertor.verifySpannerSchema(ddl, changeEvent); + + LOG.info( + "aastha convertChangeEventToMutation ddl: {} \nshadowTableDdl: {} \nchangeEvent: {}", + ddl, + shadowTableDdl, + changeEvent); this.primaryKey = ChangeEventSpannerConvertor.changeEventToPrimaryKey( changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(), @@ -77,7 +84,7 @@ protected void convertChangeEventToMutation(Ddl ddl, Ddl shadowTableDdl) changeEvent, /* convertNameToLowerCase= */ true); this.dataMutation = ChangeEventConvertor.changeEventToMutation(ddl, changeEvent); - this.shadowTableMutation = generateShadowTableMutation(shadowTableDdl); + this.shadowTableMutation = generateShadowTableMutation(ddl, shadowTableDdl); } public JsonNode getChangeEvent() { diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/DatastreamConstants.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/DatastreamConstants.java index cc354d542d..d852cad0f4 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/DatastreamConstants.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/DatastreamConstants.java @@ -74,10 +74,10 @@ public class DatastreamConstants { public static final String ORACLE_TIMESTAMP_KEY = "_metadata_timestamp"; public static final Pair ORACLE_TIMESTAMP_SHADOW_INFO = - Pair.of("shadow_timestamp", "INT64"); + Pair.of("timestamp", "INT64"); public static final Pair ORACLE_TIMESTAMP_SHADOW_INFO_PG_DIALECT = - Pair.of("shadow_timestamp", "bigint"); + Pair.of("timestamp", "bigint"); public static final String ORACLE_SCN_KEY = "_metadata_scn"; @@ -96,10 +96,10 @@ public class DatastreamConstants { public static final String MYSQL_TIMESTAMP_KEY = "_metadata_timestamp"; public static final Pair MYSQL_TIMESTAMP_SHADOW_INFO = - Pair.of("shadow_timestamp", "INT64"); + Pair.of("timestamp", "INT64"); public static final Pair MYSQL_TIMESTAMP_SHADOW_INFO_PG_DIALECT = - Pair.of("shadow_timestamp", "bigint"); + Pair.of("timestamp", "bigint"); public static final String MYSQL_LOGFILE_KEY = "_metadata_log_file"; @@ -127,10 +127,10 @@ public class DatastreamConstants { public static final String POSTGRES_TIMESTAMP_KEY = "_metadata_timestamp"; public static final Pair POSTGRES_TIMESTAMP_SHADOW_INFO = - Pair.of("shadow_timestamp", "INT64"); + Pair.of("timestamp", "INT64"); public static final Pair POSTGRES_TIMESTAMP_SHADOW_INFO_PG_DIALECT = - Pair.of("shadow_timestamp", "bigint"); + Pair.of("timestamp", "bigint"); public static final String POSTGRES_LSN_KEY = "_metadata_lsn"; diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java index 250d0011c5..4b9ff99570 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java @@ -19,10 +19,14 @@ import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.Value; import com.google.cloud.teleport.v2.spanner.ddl.Ddl; +import com.google.cloud.teleport.v2.spanner.ddl.Table; import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventTypeConvertor; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException; +import com.google.cloud.teleport.v2.templates.spanner.ShadowTableCreator; +import java.util.Set; +import java.util.stream.Collectors; /** * MySql implementation of ChangeEventContext that provides implementation of the @@ -44,19 +48,25 @@ public MySqlChangeEventContext( * Creates shadow table mutation for MySql. */ @Override - Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorException { + Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) + throws ChangeEventConvertorException { // Get shadow information from change event mutation context Mutation.WriteBuilder builder = ChangeEventConvertor.changeEventToShadowTableMutationBuilder( - ddl, changeEvent, shadowTablePrefix); + shadowDdl, changeEvent, shadowTablePrefix); + + Table dataTable = ddl.table(this.dataTable); + Set primaryKeyColNames = + dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet()); // Add timestamp information to shadow table mutation Long changeEventTimestamp = ChangeEventTypeConvertor.toLong( changeEvent, DatastreamConstants.MYSQL_TIMESTAMP_KEY, /* requiredField= */ true); - builder - .set(DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft()) - .to(Value.int64(changeEventTimestamp)); + String timestampColumn = + ShadowTableCreator.getSafeShadowColumnName( + DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft(), primaryKeyColNames); + builder.set(timestampColumn).to(Value.int64(changeEventTimestamp)); /* MySql backfill events "can" have log file and log file position as null. * Set their value to a value (lexicographically) smaller than any real value. @@ -68,7 +78,10 @@ Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorExcepti logFile = ""; } // Add log file information to shadow table mutation - builder.set(DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft()).to(logFile); + String logFileColumn = + ShadowTableCreator.getSafeShadowColumnName( + DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft(), primaryKeyColNames); + builder.set(logFileColumn).to(logFile); Long logPosition = ChangeEventTypeConvertor.toLong( @@ -77,9 +90,10 @@ Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorExcepti logPosition = new Long(-1); } // Add logfile position information to shadow table mutation - builder - .set(DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft()) - .to(Value.int64(logPosition)); + String logPositionColumn = + ShadowTableCreator.getSafeShadowColumnName( + DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft(), primaryKeyColNames); + builder.set(logPositionColumn).to(Value.int64(logPosition)); return builder.build(); } diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java index 666a44ec3d..4f4f092b26 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java @@ -19,10 +19,14 @@ import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.Value; import com.google.cloud.teleport.v2.spanner.ddl.Ddl; +import com.google.cloud.teleport.v2.spanner.ddl.Table; import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventTypeConvertor; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException; +import com.google.cloud.teleport.v2.templates.spanner.ShadowTableCreator; +import java.util.Set; +import java.util.stream.Collectors; /** * Oracle implementation of ChangeEventContext that provides implementation of the @@ -44,19 +48,25 @@ public OracleChangeEventContext( * Creates shadow table mutation for Oracle. */ @Override - Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorException { + Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) + throws ChangeEventConvertorException { // Get shadow information from change event mutation context Mutation.WriteBuilder builder = ChangeEventConvertor.changeEventToShadowTableMutationBuilder( - ddl, changeEvent, shadowTablePrefix); + shadowDdl, changeEvent, shadowTablePrefix); + + Table dataTable = ddl.table(this.dataTable); + Set primaryKeyColNames = + dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet()); // Add timestamp information to shadow table mutation Long changeEventTimestamp = ChangeEventTypeConvertor.toLong( changeEvent, DatastreamConstants.ORACLE_TIMESTAMP_KEY, /* requiredField= */ true); - builder - .set(DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft()) - .to(Value.int64(changeEventTimestamp)); + String timestampColumn = + ShadowTableCreator.getSafeShadowColumnName( + DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft(), primaryKeyColNames); + builder.set(timestampColumn).to(Value.int64(changeEventTimestamp)); /* Oracle backfill events "can" have SCN value as null. * Set the value to a value smaller than any real value. @@ -68,9 +78,10 @@ Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorExcepti changeEventSCN = new Long(-1); } // Add scn information to shadow table mutation - builder - .set(DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft()) - .to(Value.int64(changeEventSCN)); + String scnColumn = + ShadowTableCreator.getSafeShadowColumnName( + DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft(), primaryKeyColNames); + builder.set(scnColumn).to(Value.int64(changeEventSCN)); return builder.build(); } diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java index 18a31153bf..786c3e084e 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java @@ -19,10 +19,14 @@ import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.Value; import com.google.cloud.teleport.v2.spanner.ddl.Ddl; +import com.google.cloud.teleport.v2.spanner.ddl.Table; import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventTypeConvertor; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException; +import com.google.cloud.teleport.v2.templates.spanner.ShadowTableCreator; +import java.util.Set; +import java.util.stream.Collectors; /** * Postgres implementation of ChangeEventContext that provides implementation of the @@ -44,19 +48,25 @@ public PostgresChangeEventContext( * Creates shadow table mutation for Postgres. */ @Override - Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorException { + Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) + throws ChangeEventConvertorException { // Get shadow information from change event mutation context Mutation.WriteBuilder builder = ChangeEventConvertor.changeEventToShadowTableMutationBuilder( - ddl, changeEvent, shadowTablePrefix); + shadowDdl, changeEvent, shadowTablePrefix); + + Table dataTable = ddl.table(this.dataTable); + Set primaryKeyColNames = + dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet()); // Add timestamp information to shadow table mutation Long changeEventTimestamp = ChangeEventTypeConvertor.toLong( changeEvent, DatastreamConstants.POSTGRES_TIMESTAMP_KEY, /* requiredField= */ true); - builder - .set(DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft()) - .to(Value.int64(changeEventTimestamp)); + String timestampColumn = + ShadowTableCreator.getSafeShadowColumnName( + DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft(), primaryKeyColNames); + builder.set(timestampColumn).to(Value.int64(changeEventTimestamp)); /* Postgres backfill events "can" have LSN value as null. * Set the value to a value smaller than any real value. @@ -68,9 +78,10 @@ Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorExcepti changeEventLSN = ""; } // Add lsn information to shadow table mutation - builder - .set(DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft()) - .to(Value.string(changeEventLSN)); + String lsnColumn = + ShadowTableCreator.getSafeShadowColumnName( + DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft(), primaryKeyColNames); + builder.set(lsnColumn).to(Value.string(changeEventLSN)); return builder.build(); } diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java index 86ffec4ad3..fb7de78b61 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java @@ -28,7 +28,7 @@ import org.apache.commons.lang3.tuple.Pair; /** Helper class to create shadow tables for different source types. */ -class ShadowTableCreator { +public class ShadowTableCreator { private final String sourceType; private final String shadowTablePrefix; @@ -83,16 +83,37 @@ Table constructShadowTable(Ddl informationSchema, String dataTableName, Dialect } // Add extra column to track ChangeEventSequence information - addChangeEventSequenceColumns(shadowTableBuilder); + addChangeEventSequenceColumns(shadowTableBuilder, primaryKeyColNames); return shadowTableBuilder.build(); } - private void addChangeEventSequenceColumns(Table.Builder shadowTableBuilder) { + private void addChangeEventSequenceColumns( + Table.Builder shadowTableBuilder, Set primaryKeyColNames) { for (Pair shadowInfo : sortOrderMap.values()) { - Column.Builder versionColumnBuilder = shadowTableBuilder.column(shadowInfo.getLeft()); + String desiredName = shadowInfo.getLeft(); + String safeName = getSafeShadowColumnName(desiredName, primaryKeyColNames); + Column.Builder versionColumnBuilder = shadowTableBuilder.column(safeName); versionColumnBuilder.parseType(shadowInfo.getRight()); versionColumnBuilder.endColumn(); } } + + /** + * Generates a safe column name for a shadow table by checking for collisions with existing column + * names and iteratively prepending a prefix until a unique name is found. + * + * @param desiredName the initial desired name for the shadow column + * @param existingColumnNames a set of existing column names in the data table (should be + * lowercase) + * @return a unique and safe column name + */ + public static String getSafeShadowColumnName( + String desiredName, Set existingPrimaryKeyColumnNames) { + String safeName = desiredName; + while (existingPrimaryKeyColumnNames.contains(safeName.toLowerCase())) { + safeName = "shadow_" + safeName; + } + return safeName; + } } From 6115d57008364b785ed448f5c4a6822446968cc4 Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Wed, 10 Dec 2025 10:57:28 +0530 Subject: [PATCH 06/13] dynamic name changes final code --- .../datastream/ChangeEventContext.java | 28 +++++++++++++++ .../datastream/ChangeEventConvertor.java | 3 ++ .../ChangeEventSequenceFactory.java | 18 ++-------- .../datastream/MySqlChangeEventContext.java | 34 ++++++++++--------- .../datastream/MySqlChangeEventSequence.java | 16 ++++----- .../datastream/OracleChangeEventContext.java | 26 +++++++------- .../datastream/OracleChangeEventSequence.java | 10 +++--- .../PostgresChangeEventContext.java | 28 ++++++++------- .../PostgresChangeEventSequence.java | 10 +++--- .../templates/spanner/ShadowTableCreator.java | 16 ++++----- 10 files changed, 105 insertions(+), 84 deletions(-) diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java index 19f575db0d..502906bcc7 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java @@ -62,6 +62,13 @@ public abstract class ChangeEventContext { // Data table for the change event. protected String dataTable; + // The following fields store the "safe" names (to avoid collision with data column names) of the shadow table columns. + protected String safeTimestampColumn; + protected String safeLogFileColumn; + protected String safeLogPositionColumn; + protected String safeScnColumn; + protected String safeLsnColumn; + // Abstract method to generate shadow table mutation. abstract Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) throws ChangeEventConvertorException; @@ -121,6 +128,27 @@ public String getShadowTable() { return shadowTable; } + // Getters for the safe shadow table column names. + public String getSafeTimestampColumn() { + return this.safeTimestampColumn; + } + + public String getSafeLogFileColumn() { + return this.safeLogFileColumn; + } + + public String getSafeLogPositionColumn() { + return this.safeLogPositionColumn; + } + + public String getSafeScnColumn() { + return this.safeScnColumn; + } + + public String getSafeLsnColumn() { + return this.safeLsnColumn; + } + // Fires a read on Data table with lock scanned ranges. Used to acquire exclusive lock on Data row // at the beginning of a readWriteTransaction public void readDataTableRowWithExclusiveLock( diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventConvertor.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventConvertor.java index 62619772d3..3584b3caad 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventConvertor.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventConvertor.java @@ -119,6 +119,9 @@ static Mutation.WriteBuilder changeEventToShadowTableMutationBuilder( String shadowTableName = shadowTablePrefix + tableName; try { Table table = ddl.table(shadowTableName); + if (table == null) { + throw new DroppedTableException(shadowTableName + " not found in spanner DDL"); + } ImmutableList keyColumns = table.primaryKeys(); List keyColumnNames = keyColumns.stream() diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactory.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactory.java index 8cf77caceb..a9aef9a273 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactory.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactory.java @@ -73,25 +73,13 @@ public static ChangeEventSequence createChangeEventSequenceFromShadowTable( if (DatastreamConstants.MYSQL_SOURCE_TYPE.equals(sourceType)) { return MySqlChangeEventSequence.createFromShadowTable( - transactionContext, - changeEventContext.getShadowTable(), - shadowDdl, - changeEventContext.getPrimaryKey(), - useSqlStatements); + transactionContext, changeEventContext, shadowDdl, useSqlStatements); } else if (DatastreamConstants.ORACLE_SOURCE_TYPE.equals(sourceType)) { return OracleChangeEventSequence.createFromShadowTable( - transactionContext, - changeEventContext.getShadowTable(), - shadowDdl, - changeEventContext.getPrimaryKey(), - useSqlStatements); + transactionContext, changeEventContext, shadowDdl, useSqlStatements); } else if (DatastreamConstants.POSTGRES_SOURCE_TYPE.equals(sourceType)) { return PostgresChangeEventSequence.createFromShadowTable( - transactionContext, - changeEventContext.getShadowTable(), - shadowDdl, - changeEventContext.getPrimaryKey(), - useSqlStatements); + transactionContext, changeEventContext, shadowDdl, useSqlStatements); } throw new InvalidChangeEventException("Unsupported source database: " + sourceType); } diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java index 4b9ff99570..1753b91e0b 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java @@ -41,6 +41,21 @@ public MySqlChangeEventContext( this.shadowTablePrefix = shadowTablePrefix; this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(); this.shadowTable = shadowTablePrefix + this.dataTable; + + Table dataTable = ddl.table(this.dataTable); + Set primaryKeyColNames = + dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet()); + + this.safeTimestampColumn = + ShadowTableCreator.getSafeShadowColumnName( + DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft(), primaryKeyColNames); + this.safeLogFileColumn = + ShadowTableCreator.getSafeShadowColumnName( + DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft(), primaryKeyColNames); + this.safeLogPositionColumn = + ShadowTableCreator.getSafeShadowColumnName( + DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft(), primaryKeyColNames); + convertChangeEventToMutation(ddl, shadowTableDdl); } @@ -55,18 +70,11 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) ChangeEventConvertor.changeEventToShadowTableMutationBuilder( shadowDdl, changeEvent, shadowTablePrefix); - Table dataTable = ddl.table(this.dataTable); - Set primaryKeyColNames = - dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet()); - // Add timestamp information to shadow table mutation Long changeEventTimestamp = ChangeEventTypeConvertor.toLong( changeEvent, DatastreamConstants.MYSQL_TIMESTAMP_KEY, /* requiredField= */ true); - String timestampColumn = - ShadowTableCreator.getSafeShadowColumnName( - DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft(), primaryKeyColNames); - builder.set(timestampColumn).to(Value.int64(changeEventTimestamp)); + builder.set(this.safeTimestampColumn).to(Value.int64(changeEventTimestamp)); /* MySql backfill events "can" have log file and log file position as null. * Set their value to a value (lexicographically) smaller than any real value. @@ -78,10 +86,7 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) logFile = ""; } // Add log file information to shadow table mutation - String logFileColumn = - ShadowTableCreator.getSafeShadowColumnName( - DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft(), primaryKeyColNames); - builder.set(logFileColumn).to(logFile); + builder.set(this.safeLogFileColumn).to(logFile); Long logPosition = ChangeEventTypeConvertor.toLong( @@ -90,10 +95,7 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) logPosition = new Long(-1); } // Add logfile position information to shadow table mutation - String logPositionColumn = - ShadowTableCreator.getSafeShadowColumnName( - DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft(), primaryKeyColNames); - builder.set(logPositionColumn).to(Value.int64(logPosition)); + builder.set(this.safeLogPositionColumn).to(Value.int64(logPosition)); return builder.build(); } diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequence.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequence.java index e5d6760484..0a64240920 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequence.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequence.java @@ -26,7 +26,6 @@ import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException; import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerReadUtils; import java.util.List; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,24 +98,25 @@ public static MySqlChangeEventSequence createFromChangeEvent(ChangeEventContext /* * Creates a MySqlChangeEventSequence by reading from a shadow table. * @param transactionContext The transaction context to use for reading from the shadow table - * @param shadowTable The name of the shadow table to read from - * @param primaryKey The primary key to look up in the shadow table + * @param context The change event context with resolved safe shadow column names * @param useSqlStatements If true, performs shadow table read using SQL statement with exclusive lock on row */ public static MySqlChangeEventSequence createFromShadowTable( final TransactionContext transactionContext, - String shadowTable, + ChangeEventContext context, Ddl shadowTableDdl, - Key primaryKey, boolean useSqlStatements) throws ChangeEventSequenceCreationException { try { + String shadowTable = context.getShadowTable(); + Key primaryKey = context.getPrimaryKey(); // Read columns from shadow table List readColumnList = - DatastreamConstants.MYSQL_SORT_ORDER.values().stream() - .map(p -> p.getLeft()) - .collect(Collectors.toList()); + java.util.Arrays.asList( + context.getSafeTimestampColumn(), + context.getSafeLogFileColumn(), + context.getSafeLogPositionColumn()); Struct row; // TODO: After beam release, use the latest client lib version which supports setting lock // hints via the read api. SQL string generation should be removed. diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java index 4f4f092b26..df1f046b3c 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java @@ -41,6 +41,18 @@ public OracleChangeEventContext( this.shadowTablePrefix = shadowTablePrefix; this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(); this.shadowTable = shadowTablePrefix + this.dataTable; + + Table dataTable = ddl.table(this.dataTable); + Set primaryKeyColNames = + dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet()); + + this.safeTimestampColumn = + ShadowTableCreator.getSafeShadowColumnName( + DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft(), primaryKeyColNames); + this.safeScnColumn = + ShadowTableCreator.getSafeShadowColumnName( + DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft(), primaryKeyColNames); + convertChangeEventToMutation(ddl, shadowTableDdl); } @@ -55,18 +67,11 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) ChangeEventConvertor.changeEventToShadowTableMutationBuilder( shadowDdl, changeEvent, shadowTablePrefix); - Table dataTable = ddl.table(this.dataTable); - Set primaryKeyColNames = - dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet()); - // Add timestamp information to shadow table mutation Long changeEventTimestamp = ChangeEventTypeConvertor.toLong( changeEvent, DatastreamConstants.ORACLE_TIMESTAMP_KEY, /* requiredField= */ true); - String timestampColumn = - ShadowTableCreator.getSafeShadowColumnName( - DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft(), primaryKeyColNames); - builder.set(timestampColumn).to(Value.int64(changeEventTimestamp)); + builder.set(this.safeTimestampColumn).to(Value.int64(changeEventTimestamp)); /* Oracle backfill events "can" have SCN value as null. * Set the value to a value smaller than any real value. @@ -78,10 +83,7 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) changeEventSCN = new Long(-1); } // Add scn information to shadow table mutation - String scnColumn = - ShadowTableCreator.getSafeShadowColumnName( - DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft(), primaryKeyColNames); - builder.set(scnColumn).to(Value.int64(changeEventSCN)); + builder.set(this.safeScnColumn).to(Value.int64(changeEventSCN)); return builder.build(); } diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequence.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequence.java index e994e7455c..b717077a2c 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequence.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequence.java @@ -26,7 +26,6 @@ import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException; import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerReadUtils; import java.util.List; -import java.util.stream.Collectors; /** * Implementation of ChangeEventSequence for Oracle database which stores change event sequence @@ -78,18 +77,17 @@ public static OracleChangeEventSequence createFromChangeEvent(ChangeEventContext */ public static OracleChangeEventSequence createFromShadowTable( final TransactionContext transactionContext, - String shadowTable, + ChangeEventContext context, Ddl shadowTableDdl, - Key primaryKey, boolean useSqlStatements) throws ChangeEventSequenceCreationException { try { + String shadowTable = context.getShadowTable(); + Key primaryKey = context.getPrimaryKey(); // Read columns from shadow table List readColumnList = - DatastreamConstants.ORACLE_SORT_ORDER.values().stream() - .map(p -> p.getLeft()) - .collect(Collectors.toList()); + java.util.Arrays.asList(context.getSafeTimestampColumn(), context.getSafeScnColumn()); Struct row; // TODO: After beam release, use the latest client lib version which supports setting lock // hints via the read api. SQL string generation should be removed. diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java index 786c3e084e..a9dc925b0e 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java @@ -41,6 +41,18 @@ public PostgresChangeEventContext( this.shadowTablePrefix = shadowTablePrefix; this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(); this.shadowTable = shadowTablePrefix + this.dataTable; + + Table dataTable = ddl.table(this.dataTable); + Set primaryKeyColNames = + dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet()); + + this.safeTimestampColumn = + ShadowTableCreator.getSafeShadowColumnName( + DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft(), primaryKeyColNames); + this.safeLsnColumn = + ShadowTableCreator.getSafeShadowColumnName( + DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft(), primaryKeyColNames); + convertChangeEventToMutation(ddl, shadowTableDdl); } @@ -55,18 +67,11 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) ChangeEventConvertor.changeEventToShadowTableMutationBuilder( shadowDdl, changeEvent, shadowTablePrefix); - Table dataTable = ddl.table(this.dataTable); - Set primaryKeyColNames = - dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet()); - // Add timestamp information to shadow table mutation Long changeEventTimestamp = ChangeEventTypeConvertor.toLong( changeEvent, DatastreamConstants.POSTGRES_TIMESTAMP_KEY, /* requiredField= */ true); - String timestampColumn = - ShadowTableCreator.getSafeShadowColumnName( - DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft(), primaryKeyColNames); - builder.set(timestampColumn).to(Value.int64(changeEventTimestamp)); + builder.set(this.safeTimestampColumn).to(Value.int64(changeEventTimestamp)); /* Postgres backfill events "can" have LSN value as null. * Set the value to a value smaller than any real value. @@ -78,11 +83,8 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) changeEventLSN = ""; } // Add lsn information to shadow table mutation - String lsnColumn = - ShadowTableCreator.getSafeShadowColumnName( - DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft(), primaryKeyColNames); - builder.set(lsnColumn).to(Value.string(changeEventLSN)); + builder.set(this.safeLsnColumn).to(changeEventLSN); return builder.build(); } -} +} \ No newline at end of file diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequence.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequence.java index 1d6482a78c..e74ad7332c 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequence.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequence.java @@ -26,7 +26,6 @@ import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException; import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerReadUtils; import java.util.List; -import java.util.stream.Collectors; /** * Implementation of ChangeEventSequence for Postgres database which stores change event sequence @@ -78,18 +77,17 @@ public static PostgresChangeEventSequence createFromChangeEvent(ChangeEventConte */ public static PostgresChangeEventSequence createFromShadowTable( final TransactionContext transactionContext, - String shadowTable, + ChangeEventContext context, Ddl shadowTableDdl, - Key primaryKey, boolean useSqlStatements) throws ChangeEventSequenceCreationException { try { + String shadowTable = context.getShadowTable(); + Key primaryKey = context.getPrimaryKey(); // Read columns from shadow table List readColumnList = - DatastreamConstants.POSTGRES_SORT_ORDER.values().stream() - .map(p -> p.getLeft()) - .collect(Collectors.toList()); + java.util.Arrays.asList(context.getSafeTimestampColumn(), context.getSafeLsnColumn()); Struct row; // TODO: After beam release, use the latest client lib version which supports setting lock // hints via the read api. SQL string generation should be removed. diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java index fb7de78b61..5975c1305f 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java @@ -91,9 +91,9 @@ Table constructShadowTable(Ddl informationSchema, String dataTableName, Dialect private void addChangeEventSequenceColumns( Table.Builder shadowTableBuilder, Set primaryKeyColNames) { for (Pair shadowInfo : sortOrderMap.values()) { - String desiredName = shadowInfo.getLeft(); - String safeName = getSafeShadowColumnName(desiredName, primaryKeyColNames); - Column.Builder versionColumnBuilder = shadowTableBuilder.column(safeName); + String baseShadowColumnName = shadowInfo.getLeft(); + String finalShadowColumnName = getSafeShadowColumnName(baseShadowColumnName, primaryKeyColNames); + Column.Builder versionColumnBuilder = shadowTableBuilder.column(finalShadowColumnName); versionColumnBuilder.parseType(shadowInfo.getRight()); versionColumnBuilder.endColumn(); } @@ -103,15 +103,15 @@ private void addChangeEventSequenceColumns( * Generates a safe column name for a shadow table by checking for collisions with existing column * names and iteratively prepending a prefix until a unique name is found. * - * @param desiredName the initial desired name for the shadow column - * @param existingColumnNames a set of existing column names in the data table (should be + * @param baseShadowColumnName the initial desired name for the shadow column + * @param existingPrimaryKeyColumnNames a set of existing column names in the data table (should be * lowercase) * @return a unique and safe column name */ public static String getSafeShadowColumnName( - String desiredName, Set existingPrimaryKeyColumnNames) { - String safeName = desiredName; - while (existingPrimaryKeyColumnNames.contains(safeName.toLowerCase())) { + String baseShadowColumnName, Set existingPrimaryKeyColumnNames) { + String safeName = baseShadowColumnName; + while (existingPrimaryKeyColumnNames.stream().anyMatch(safeName::equalsIgnoreCase)) { safeName = "shadow_" + safeName; } return safeName; From 9a4ca538961bd9ca9f7369a39414fd7158a18c96 Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Wed, 10 Dec 2025 11:00:21 +0530 Subject: [PATCH 07/13] remove log from ChangeEventContext --- .../v2/templates/datastream/ChangeEventContext.java | 8 ++------ .../templates/datastream/PostgresChangeEventContext.java | 2 +- .../teleport/v2/templates/spanner/ShadowTableCreator.java | 7 ++++--- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java index 502906bcc7..972ffa8e2e 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java @@ -62,7 +62,8 @@ public abstract class ChangeEventContext { // Data table for the change event. protected String dataTable; - // The following fields store the "safe" names (to avoid collision with data column names) of the shadow table columns. + // The following fields store the "safe" names (to avoid collision with data column names) of the + // shadow table columns. protected String safeTimestampColumn; protected String safeLogFileColumn; protected String safeLogPositionColumn; @@ -79,11 +80,6 @@ protected void convertChangeEventToMutation(Ddl ddl, Ddl shadowTableDdl) ChangeEventConvertor.convertChangeEventColumnKeysToLowerCase(changeEvent); ChangeEventConvertor.verifySpannerSchema(ddl, changeEvent); - LOG.info( - "aastha convertChangeEventToMutation ddl: {} \nshadowTableDdl: {} \nchangeEvent: {}", - ddl, - shadowTableDdl, - changeEvent); this.primaryKey = ChangeEventSpannerConvertor.changeEventToPrimaryKey( changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(), diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java index a9dc925b0e..3f3962bebe 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java @@ -87,4 +87,4 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) return builder.build(); } -} \ No newline at end of file +} diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java index 5975c1305f..d455da9126 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java @@ -92,7 +92,8 @@ private void addChangeEventSequenceColumns( Table.Builder shadowTableBuilder, Set primaryKeyColNames) { for (Pair shadowInfo : sortOrderMap.values()) { String baseShadowColumnName = shadowInfo.getLeft(); - String finalShadowColumnName = getSafeShadowColumnName(baseShadowColumnName, primaryKeyColNames); + String finalShadowColumnName = + getSafeShadowColumnName(baseShadowColumnName, primaryKeyColNames); Column.Builder versionColumnBuilder = shadowTableBuilder.column(finalShadowColumnName); versionColumnBuilder.parseType(shadowInfo.getRight()); versionColumnBuilder.endColumn(); @@ -104,8 +105,8 @@ private void addChangeEventSequenceColumns( * names and iteratively prepending a prefix until a unique name is found. * * @param baseShadowColumnName the initial desired name for the shadow column - * @param existingPrimaryKeyColumnNames a set of existing column names in the data table (should be - * lowercase) + * @param existingPrimaryKeyColumnNames a set of existing column names in the data table (should + * be lowercase) * @return a unique and safe column name */ public static String getSafeShadowColumnName( From d09f6368589804970b67bed0ff6ec104b05fe0da Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Wed, 10 Dec 2025 14:54:54 +0530 Subject: [PATCH 08/13] update unit tests --- .../SpannerTransactionWriterDoFnTest.java | 2 +- .../ChangeEventSequenceFactoryTest.java | 76 ++++++++++++++++--- .../MySqlChangeEventContextTest.java | 67 ++++++++++++++++ .../MySqlChangeEventSequenceTest.java | 12 ++- .../OracleChangeEventContextTest.java | 60 +++++++++++++++ .../OracleChangeEventSequenceTest.java | 10 ++- .../PostgresChangeEventContextTest.java | 61 +++++++++++++++ .../PostgresChangeEventSequenceTest.java | 11 ++- .../spanner/ProcessInformationSchemaTest.java | 4 +- .../spanner/ShadowTableCreatorTest.java | 71 +++++++++++++++-- 10 files changed, 345 insertions(+), 29 deletions(-) diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFnTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFnTest.java index d76c8b8614..1d594f4b95 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFnTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFnTest.java @@ -204,7 +204,7 @@ public void testProcessElement() { Mutation.WriteBuilder shadowBuilder = Mutation.newInsertOrUpdateBuilder("shadow_Users"); shadowBuilder.set("first_name").to("Johnny"); shadowBuilder.set("last_name").to("Depp"); - shadowBuilder.set("shadow_timestamp").to(12345); + shadowBuilder.set("timestamp").to(12345); shadowBuilder.set("log_file").to(""); shadowBuilder.set("log_position").to(-1); Mutation expectedShadowMutation = shadowBuilder.build(); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactoryTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactoryTest.java index 9107abbea5..daf3673701 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactoryTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactoryTest.java @@ -186,12 +186,22 @@ public void canCreateMySqlChangeEventSequenceFromShadowTable() throws Exception ChangeEventContext mockContext = getMockMySqlChangeEventContext(/* addMysqlPositionFields= */ true, /* cdcEvent= */ true); + when(mockContext.getSafeTimestampColumn()) + .thenReturn(DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft()); + when(mockContext.getSafeLogFileColumn()) + .thenReturn(DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft()); + when(mockContext.getSafeLogPositionColumn()) + .thenReturn(DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft()); // Mock transaction which can read a row from shadow table. TransactionContext mockTransaction = mock(TransactionContext.class); Struct mockRow = mock(Struct.class); - when(mockRow.getLong(any(String.class))).thenReturn(previousEventTimestamp, 1L); - when(mockRow.getString(any(String.class))).thenReturn("oldlogfile.log"); + when(mockRow.getLong(DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft())) + .thenReturn(previousEventTimestamp); + when(mockRow.getLong(DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft())) + .thenReturn(1L); + when(mockRow.getString(DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft())) + .thenReturn("oldlogfile.log"); when(mockTransaction.readRow(any(String.class), any(Key.class), any(Iterable.class))) .thenReturn(mockRow); @@ -214,12 +224,21 @@ public void canCreateMySqlChangeEventSequenceFromShadowTableForDumpEvent() throw ChangeEventContext mockContext = getMockMySqlChangeEventContext(/* addMysqlPositionFields= */ true, /* cdcEvent= */ true); + when(mockContext.getSafeTimestampColumn()) + .thenReturn(DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft()); + when(mockContext.getSafeLogFileColumn()) + .thenReturn(DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft()); + when(mockContext.getSafeLogPositionColumn()) + .thenReturn(DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft()); // Mock transaction which can read a row from shadow table. TransactionContext mockTransaction = mock(TransactionContext.class); Struct mockRow = mock(Struct.class); - when(mockRow.getLong(any(String.class))).thenReturn(previousEventTimestamp, -1L); - when(mockRow.getString(any(String.class))).thenReturn(""); + when(mockRow.getLong(DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft())) + .thenReturn(previousEventTimestamp); + when(mockRow.getLong(DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft())) + .thenReturn(-1L); + when(mockRow.getString(DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft())).thenReturn(""); when(mockTransaction.readRow(any(String.class), any(Key.class), any(Iterable.class))) .thenReturn(mockRow); @@ -241,6 +260,12 @@ public void cannotCreateMySqlChangeEventSequenceWhenMissingRecordInShadowTable() ChangeEventContext mockContext = getMockMySqlChangeEventContext(/* addMysqlPositionFields= */ true, /* cdcEvent= */ true); + when(mockContext.getSafeTimestampColumn()) + .thenReturn(DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft()); + when(mockContext.getSafeLogFileColumn()) + .thenReturn(DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft()); + when(mockContext.getSafeLogPositionColumn()) + .thenReturn(DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft()); // mock transaction which cannot find a row from shadow table. TransactionContext mockTransaction = mock(TransactionContext.class); @@ -311,11 +336,17 @@ public void canCreateOracleChangeEventSequenceFromShadowTable() throws Exception ChangeEventContext mockContext = getMockOracleChangeEventContext(/* addOraclePositionFields= */ true, /* cdcEvent= */ true); + when(mockContext.getSafeTimestampColumn()) + .thenReturn(DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft()); + when(mockContext.getSafeScnColumn()) + .thenReturn(DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft()); // Mock transaction which can read a row from shadow table. TransactionContext mockTransaction = mock(TransactionContext.class); Struct mockRow = mock(Struct.class); - when(mockRow.getLong(any(String.class))).thenReturn(previousEventTimestamp, 1L); + when(mockRow.getLong(DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft())) + .thenReturn(previousEventTimestamp); + when(mockRow.getLong(DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft())).thenReturn(1L); when(mockTransaction.readRow(any(String.class), any(Key.class), any(Iterable.class))) .thenReturn(mockRow); @@ -337,11 +368,17 @@ public void canCreateOracleChangeEventSequenceFromShadowTableForDumpEvent() thro ChangeEventContext mockContext = getMockOracleChangeEventContext(/* addOraclePositionFields= */ true, /* cdcEvent= */ true); + when(mockContext.getSafeTimestampColumn()) + .thenReturn(DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft()); + when(mockContext.getSafeScnColumn()) + .thenReturn(DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft()); // Mock transaction which can read a row from shadow table. TransactionContext mockTransaction = mock(TransactionContext.class); Struct mockRow = mock(Struct.class); - when(mockRow.getLong(any(String.class))).thenReturn(previousEventTimestamp, -1L); + when(mockRow.getLong(DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft())) + .thenReturn(previousEventTimestamp); + when(mockRow.getLong(DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft())).thenReturn(-1L); when(mockTransaction.readRow(any(String.class), any(Key.class), any(Iterable.class))) .thenReturn(mockRow); @@ -362,6 +399,10 @@ public void cannotCreateOracleChangeEventSequenceWhenMissingRecordInShadowTable( ChangeEventContext mockContext = getMockOracleChangeEventContext(/* addOraclePositionFields= */ true, /* cdcEvent= */ true); + when(mockContext.getSafeTimestampColumn()) + .thenReturn(DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft()); + when(mockContext.getSafeScnColumn()) + .thenReturn(DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft()); // mock transaction which cannot find a row from shadow table. TransactionContext mockTransaction = mock(TransactionContext.class); @@ -435,12 +476,18 @@ public void canCreatePostgresChangeEventSequenceFromShadowTable() throws Excepti ChangeEventContext mockContext = getMockPostgresChangeEventContext( /* addPostgresPositionFields= */ true, /* cdcEvent= */ true); + when(mockContext.getSafeTimestampColumn()) + .thenReturn(DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft()); + when(mockContext.getSafeLsnColumn()) + .thenReturn(DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft()); // Mock transaction which can read a row from shadow table. TransactionContext mockTransaction = mock(TransactionContext.class); Struct mockRow = mock(Struct.class); - when(mockRow.getLong(any(String.class))).thenReturn(previousEventTimestamp); - when(mockRow.getString(any(String.class))).thenReturn("13/314"); + when(mockRow.getLong(DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft())) + .thenReturn(previousEventTimestamp); + when(mockRow.getString(DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft())) + .thenReturn("13/314"); when(mockTransaction.readRow(any(String.class), any(Key.class), any(Iterable.class))) .thenReturn(mockRow); @@ -463,12 +510,17 @@ public void canCreatePostgresChangeEventSequenceFromShadowTableForDumpEvent() th ChangeEventContext mockContext = getMockPostgresChangeEventContext( /* addPostgresPositionFields= */ true, /* cdcEvent= */ true); + when(mockContext.getSafeTimestampColumn()) + .thenReturn(DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft()); + when(mockContext.getSafeLsnColumn()) + .thenReturn(DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft()); // Mock transaction which can read a row from shadow table. TransactionContext mockTransaction = mock(TransactionContext.class); Struct mockRow = mock(Struct.class); - when(mockRow.getLong(any(String.class))).thenReturn(previousEventTimestamp); - when(mockRow.getString(any(String.class))).thenReturn(""); + when(mockRow.getLong(DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft())) + .thenReturn(previousEventTimestamp); + when(mockRow.getString(DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft())).thenReturn(""); when(mockTransaction.readRow(any(String.class), any(Key.class), any(Iterable.class))) .thenReturn(mockRow); @@ -490,6 +542,10 @@ public void cannotCreatePostgresChangeEventSequenceWhenMissingRecordInShadowTabl ChangeEventContext mockContext = getMockPostgresChangeEventContext( /* addPostgresPositionFields= */ true, /* cdcEvent= */ true); + when(mockContext.getSafeTimestampColumn()) + .thenReturn(DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft()); + when(mockContext.getSafeLsnColumn()) + .thenReturn(DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft()); // mock transaction which cannot find a row from shadow table. TransactionContext mockTransaction = mock(TransactionContext.class); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContextTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContextTest.java index 11413195ed..92df7d2345 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContextTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContextTest.java @@ -179,6 +179,73 @@ public void canGenerateShadowTableMutationForBackfillEventsWithMissingSortOrderK assertEquals(shadowMutation.getOperation(), Mutation.Op.INSERT_OR_UPDATE); } + @Test + public void canGenerateShadowTableMutationWithCollision() throws Exception { + long eventTimestamp = 1615159728L; + Ddl ddl = + Ddl.builder() + .createTable("MyTable") + .column("log_file") + .int64() + .max() + .endColumn() + .column("data") + .string() + .max() + .endColumn() + .primaryKey() + .asc("log_file") + .end() + .endTable() + .build(); + Ddl shadowDdl = + Ddl.builder() + .createTable("shadow_MyTable") + .column("log_file") + .int64() + .max() + .endColumn() + .column("data") + .string() + .max() + .endColumn() + .column("shadow_log_file") + .string() + .max() + .endColumn() + .primaryKey() + .asc("log_file") + .end() + .endTable() + .build(); + JSONObject changeEvent = new JSONObject(); + changeEvent.put("log_file", 3); + changeEvent.put(DatastreamConstants.EVENT_TABLE_NAME_KEY, "MyTable"); + changeEvent.put( + DatastreamConstants.EVENT_SOURCE_TYPE_KEY, DatastreamConstants.MYSQL_SOURCE_TYPE); + changeEvent.put(DatastreamConstants.MYSQL_TIMESTAMP_KEY, eventTimestamp); + changeEvent.put(DatastreamConstants.MYSQL_LOGFILE_KEY, "mysql-bin.00001"); + changeEvent.put(DatastreamConstants.MYSQL_LOGPOSITION_KEY, 100L); + + ChangeEventContext changeEventContext = + ChangeEventContextFactory.createChangeEventContext( + getJsonNode(changeEvent.toString()), + ddl, + shadowDdl, + "shadow_", + DatastreamConstants.MYSQL_SOURCE_TYPE); + Mutation shadowMutation = changeEventContext.getShadowTableMutation(); + Map actual = shadowMutation.asMap(); + + // The PK column 'log_file' should be present. + assertEquals(actual.get("log_file"), Value.int64(3)); + // The conflicting shadow column should be renamed to 'shadow_log_file'. + assertEquals(actual.get("shadow_log_file"), Value.string("mysql-bin.00001")); + // The other shadow columns should be present with their default names. + assertEquals(actual.get("timestamp"), Value.int64(eventTimestamp)); + assertEquals(actual.get("log_position"), Value.int64(100L)); + } + @Test public void testReadDataTable() throws Exception { long eventTimestamp = 1615159728L; diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java index 17317f78d6..6965a361dc 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java @@ -104,7 +104,6 @@ public void canOrderDumpEventAndCDCEventAtSameTimestamp() { public void testCreateFromShadowTableWithUseSqlStatements() throws Exception { // Arrange TransactionContext transactionContext = mock(TransactionContext.class); - String shadowTable = "shadow_table1"; Ddl shadowTableDdl = Ddl.builder() .createTable("shadow_table1") @@ -125,9 +124,16 @@ public void testCreateFromShadowTableWithUseSqlStatements() throws Exception { .end() .endTable() .build(); - Key primaryKey = Key.of(1L); boolean useSqlStatements = true; + // Mock the ChangeEventContext + ChangeEventContext mockContext = mock(ChangeEventContext.class); + when(mockContext.getShadowTable()).thenReturn("shadow_table1"); + when(mockContext.getPrimaryKey()).thenReturn(Key.of(1L)); + when(mockContext.getSafeTimestampColumn()).thenReturn("shadow_timestamp"); + when(mockContext.getSafeLogFileColumn()).thenReturn("log_file"); + when(mockContext.getSafeLogPositionColumn()).thenReturn("log_position"); + // Mock the behavior of the transaction context Struct mockRow = mock(Struct.class); when(mockRow.getLong("id")).thenReturn(1L); @@ -144,7 +150,7 @@ public void testCreateFromShadowTableWithUseSqlStatements() throws Exception { // Act MySqlChangeEventSequence result = MySqlChangeEventSequence.createFromShadowTable( - transactionContext, shadowTable, shadowTableDdl, primaryKey, useSqlStatements); + transactionContext, mockContext, shadowTableDdl, useSqlStatements); // Assert assertNotNull(result); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContextTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContextTest.java index 9eb39a40e6..127b2dd46f 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContextTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContextTest.java @@ -157,4 +157,64 @@ public void canGenerateShadowTableMutationForBackfillEventWithMissingKeys() thro assertEquals(shadowMutation.getTable(), "shadow_Users2"); assertEquals(shadowMutation.getOperation(), Mutation.Op.INSERT_OR_UPDATE); } + + @Test + public void canGenerateShadowTableMutationWithCollision() throws Exception { + long eventTimestamp = 1615159728L; + Ddl ddl = + Ddl.builder() + .createTable("MyTable") + .column("scn") + .string() + .endColumn() + .column("data") + .string() + .max() + .endColumn() + .primaryKey() + .asc("scn") + .end() + .endTable() + .build(); + Ddl shadowDdl = + Ddl.builder() + .createTable("shadow_MyTable") + .column("scn") + .string() + .max() + .endColumn() + .column("data") + .string() + .max() + .endColumn() + .column("shadow_scn") + .int64() + .max() + .endColumn() + .primaryKey() + .asc("scn") + .end() + .endTable() + .build(); + JSONObject changeEvent = new JSONObject(); + changeEvent.put("scn", "scn_string"); + changeEvent.put(DatastreamConstants.EVENT_TABLE_NAME_KEY, "MyTable"); + changeEvent.put( + DatastreamConstants.EVENT_SOURCE_TYPE_KEY, DatastreamConstants.ORACLE_SOURCE_TYPE); + changeEvent.put(DatastreamConstants.ORACLE_TIMESTAMP_KEY, eventTimestamp); + changeEvent.put(DatastreamConstants.ORACLE_SCN_KEY, 999L); + + ChangeEventContext changeEventContext = + ChangeEventContextFactory.createChangeEventContext( + getJsonNode(changeEvent.toString()), ddl, shadowDdl, "shadow_", "oracle"); + Mutation shadowMutation = changeEventContext.getShadowTableMutation(); + Map actual = shadowMutation.asMap(); + + // The PK column 'scn' should be present. + assertEquals(actual.get("scn"), Value.string("scn_string")); + // The conflicting shadow column should be renamed to 'shadow_scn'. + assertEquals(actual.get("shadow_scn"), Value.int64(999L)); + // The other shadow columns should be present with their default names. + assertEquals(actual.get("timestamp"), Value.int64(eventTimestamp)); + } } diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java index b6583fc665..64429bd24d 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java @@ -68,7 +68,6 @@ public void canOrderDumpEventAndCDCEventAtSameTimestamp() { public void testCreateFromShadowTableWithUseSqlStatements_Oracle() throws Exception { // Arrange TransactionContext transactionContext = mock(TransactionContext.class); - String shadowTable = "shadow_table_oracle"; Ddl shadowTableDdl = Ddl.builder() .createTable("shadow_table_oracle") @@ -86,9 +85,14 @@ public void testCreateFromShadowTableWithUseSqlStatements_Oracle() throws Except .end() .endTable() .build(); - Key primaryKey = Key.of(1L); boolean useSqlStatements = true; + ChangeEventContext mockContext = mock(ChangeEventContext.class); + when(mockContext.getShadowTable()).thenReturn("shadow_table_oracle"); + when(mockContext.getPrimaryKey()).thenReturn(Key.of(1L)); + when(mockContext.getSafeTimestampColumn()).thenReturn("shadow_timestamp"); + when(mockContext.getSafeScnColumn()).thenReturn("scn"); + // Mock the behavior of the transaction context Struct mockRow = mock(Struct.class); when(mockRow.getLong("id")).thenReturn(1L); @@ -103,7 +107,7 @@ public void testCreateFromShadowTableWithUseSqlStatements_Oracle() throws Except // Act OracleChangeEventSequence result = OracleChangeEventSequence.createFromShadowTable( - transactionContext, shadowTable, shadowTableDdl, primaryKey, useSqlStatements); + transactionContext, mockContext, shadowTableDdl, useSqlStatements); // Assert assertNotNull(result); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContextTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContextTest.java index 9f1bda5fe7..e9c61bd091 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContextTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContextTest.java @@ -157,4 +157,65 @@ public void canGenerateShadowTableMutationForBackfillEventWithMissingKeys() thro assertEquals(shadowMutation.getTable(), "shadow_Users2"); assertEquals(shadowMutation.getOperation(), Mutation.Op.INSERT_OR_UPDATE); } + + @Test + public void canGenerateShadowTableMutationWithCollision() throws Exception { + long eventTimestamp = 1615159728L; + Ddl ddl = + Ddl.builder() + .createTable("MyTable") + .column("lsn") + .int64() + .max() + .endColumn() + .column("data") + .string() + .max() + .endColumn() + .primaryKey() + .asc("lsn") + .end() + .endTable() + .build(); + Ddl shadowDdl = + Ddl.builder() + .createTable("shadow_MyTable") + .column("lsn") + .int64() + .max() + .endColumn() + .column("data") + .string() + .max() + .endColumn() + .column("shadow_lsn") + .string() + .max() + .endColumn() + .primaryKey() + .asc("lsn") + .end() + .endTable() + .build(); + JSONObject changeEvent = new JSONObject(); + changeEvent.put("lsn", 123); + changeEvent.put(DatastreamConstants.EVENT_TABLE_NAME_KEY, "MyTable"); + changeEvent.put( + DatastreamConstants.EVENT_SOURCE_TYPE_KEY, DatastreamConstants.POSTGRES_SOURCE_TYPE); + changeEvent.put(DatastreamConstants.POSTGRES_TIMESTAMP_KEY, eventTimestamp); + changeEvent.put(DatastreamConstants.POSTGRES_LSN_KEY, "1/ABC"); + + ChangeEventContext changeEventContext = + ChangeEventContextFactory.createChangeEventContext( + getJsonNode(changeEvent.toString()), ddl, shadowDdl, "shadow_", "postgresql"); + Mutation shadowMutation = changeEventContext.getShadowTableMutation(); + Map actual = shadowMutation.asMap(); + + // The PK column 'lsn' should be present. + assertEquals(actual.get("lsn"), Value.int64(123)); + // The conflicting shadow column should be renamed to 'shadow_lsn'. + assertEquals(actual.get("shadow_lsn"), Value.string("1/ABC")); + // The other shadow columns should be present with their default names. + assertEquals(actual.get("timestamp"), Value.int64(eventTimestamp)); + } } diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequenceTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequenceTest.java index 0cf615eff8..60a1e97012 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequenceTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequenceTest.java @@ -117,7 +117,6 @@ public void canOrderDumpEventAndCDCEventAtSameTimestamp() { public void testCreateFromShadowTableWithUseSqlStatements_Postgres() throws Exception { // Arrange TransactionContext transactionContext = mock(TransactionContext.class); - String shadowTable = "shadow_table_postgres"; Ddl shadowTableDdl = Ddl.builder() .createTable("shadow_table_postgres") @@ -135,12 +134,16 @@ public void testCreateFromShadowTableWithUseSqlStatements_Postgres() throws Exce .end() .endTable() .build(); - Key primaryKey = Key.of(1L); boolean useSqlStatements = true; + ChangeEventContext mockContext = mock(ChangeEventContext.class); + when(mockContext.getShadowTable()).thenReturn("shadow_table_postgres"); + when(mockContext.getPrimaryKey()).thenReturn(Key.of(1L)); + when(mockContext.getSafeTimestampColumn()).thenReturn("shadow_timestamp"); + when(mockContext.getSafeLsnColumn()).thenReturn("lsn"); + // Mock the behavior of the transaction context Struct mockRow = mock(Struct.class); - when(mockRow.getLong("id")).thenReturn(1L); when(mockRow.getLong("shadow_timestamp")).thenReturn(1615159728L); when(mockRow.getString("lsn")).thenReturn("0/123456"); @@ -152,7 +155,7 @@ public void testCreateFromShadowTableWithUseSqlStatements_Postgres() throws Exce // Act PostgresChangeEventSequence result = PostgresChangeEventSequence.createFromShadowTable( - transactionContext, shadowTable, shadowTableDdl, primaryKey, useSqlStatements); + transactionContext, mockContext, shadowTableDdl, useSqlStatements); // Assert assertNotNull(result); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ProcessInformationSchemaTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ProcessInformationSchemaTest.java index ab1e7c6026..3b36c4be99 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ProcessInformationSchemaTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ProcessInformationSchemaTest.java @@ -455,7 +455,7 @@ public void canCreateShadowTablesInSpanner() throws Exception { + "\t`timestamp_field` TIMESTAMP,\n" + "\t`date_field` DATE,\n" + "\t`id` INT64,\n" - + "\t`shadow_timestamp` INT64,\n" + + "\t`timestamp` INT64,\n" + "\t`log_file` STRING(MAX),\n" + "\t`log_position` INT64,\n" + ") PRIMARY KEY (`first_name` ASC, `last_name` DESC, `age` ASC, `bool_field` ASC, `int64_field` ASC, `float64_field` ASC, `string_field` ASC, `bytes_field` ASC, `timestamp_field` ASC, `date_field` ASC, `id` ASC)"); @@ -500,7 +500,7 @@ public void canCreateShadowTablesInSpanner_separateDb() throws Exception { Collections.singletonList( "CREATE TABLE `shadow_table2` (\n" + "\t`id` INT64,\n" - + "\t`shadow_timestamp` INT64,\n" + + "\t`timestamp` INT64,\n" + "\t`log_file` STRING(MAX),\n" + "\t`log_position` INT64,\n" + ") PRIMARY KEY (`id` ASC)"); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java index a29341b6a3..2ab79e9eb3 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreatorTest.java @@ -55,7 +55,7 @@ public void canConstructShadowTableForOracleWithGsqlDialect() { testDdl.table("Users_interleaved").primaryKeys().stream() .map(c -> c.name()) .collect(Collectors.toSet()); - expectedColumns.add("shadow_timestamp"); + expectedColumns.add("timestamp"); expectedColumns.add("scn"); assertThat(columns, is(expectedColumns)); } @@ -82,7 +82,7 @@ public void canConstructShadowTableForOracleWithPostgresDialect() { testDdl.table("Users_interleaved").primaryKeys().stream() .map(c -> c.name()) .collect(Collectors.toSet()); - expectedColumns.add("shadow_timestamp"); + expectedColumns.add("timestamp"); expectedColumns.add("scn"); assertThat(columns, is(expectedColumns)); List columnTypes = @@ -127,7 +127,7 @@ public void canConstructShadowTableForMySqlWithGsqlDialect() { testDdl.table("Users_interleaved").primaryKeys().stream() .map(c -> c.name()) .collect(Collectors.toSet()); - expectedColumns.add("shadow_timestamp"); + expectedColumns.add("timestamp"); expectedColumns.add("log_file"); expectedColumns.add("log_position"); assertThat(columns, is(expectedColumns)); @@ -155,7 +155,7 @@ public void canConstructShadowTableForMySqlWithPostgresDialect() { testDdl.table("Users_interleaved").primaryKeys().stream() .map(c -> c.name()) .collect(Collectors.toSet()); - expectedColumns.add("shadow_timestamp"); + expectedColumns.add("timestamp"); expectedColumns.add("log_file"); expectedColumns.add("log_position"); assertThat(columns, is(expectedColumns)); @@ -202,7 +202,7 @@ public void canConstructShadowTableForPostgresWithGsqlDialect() { testDdl.table("Users_interleaved").primaryKeys().stream() .map(c -> c.name()) .collect(Collectors.toSet()); - expectedColumns.add("shadow_timestamp"); + expectedColumns.add("timestamp"); expectedColumns.add("lsn"); assertThat(columns, is(expectedColumns)); } @@ -229,7 +229,7 @@ public void canConstructShadowTableForPostgresWithPostgresDialect() { testDdl.table("Users_interleaved").primaryKeys().stream() .map(c -> c.name()) .collect(Collectors.toSet()); - expectedColumns.add("shadow_timestamp"); + expectedColumns.add("timestamp"); expectedColumns.add("lsn"); assertThat(columns, is(expectedColumns)); List columnTypes = @@ -275,7 +275,66 @@ public void canHandlePkColumnNameCollision() { shadowTableCreator.constructShadowTable(ddl, "MyTable", Dialect.GOOGLE_STANDARD_SQL); assertEquals(shadowTable.name(), "shadow_MyTable"); + // The original PK column should exist. assertEquals(shadowTable.column("timestamp").type().getCode(), Type.Code.TIMESTAMP); + // The new shadow column should have been renamed. assertEquals(shadowTable.column("shadow_timestamp").type().getCode(), Type.Code.INT64); } + + @Test + public void canHandleMultiplePkColumnNameCollisions() { + Ddl ddl = + Ddl.builder(Dialect.GOOGLE_STANDARD_SQL) + .createTable("MyTable") + .column("log_file") + .string() + .max() + .endColumn() + .column("shadow_log_file") + .int64() + .max() + .endColumn() + .primaryKey() + .asc("log_file") + .asc("shadow_log_file") + .end() + .endTable() + .build(); + + ShadowTableCreator shadowTableCreator = + new ShadowTableCreator("mysql", "shadow_", Dialect.GOOGLE_STANDARD_SQL); + Table shadowTable = + shadowTableCreator.constructShadowTable(ddl, "MyTable", Dialect.GOOGLE_STANDARD_SQL); + + assertEquals(shadowTable.name(), "shadow_MyTable"); + // The original PK columns should exist. + assertEquals(shadowTable.column("log_file").type().getCode(), Type.Code.STRING); + assertEquals(shadowTable.column("shadow_log_file").type().getCode(), Type.Code.INT64); + // The new shadow column should have been renamed twice. + assertEquals(shadowTable.column("shadow_shadow_log_file").type().getCode(), Type.Code.STRING); + } + + @Test + public void testGetSafeShadowColumnName() { + // Base case: no collision + assertEquals( + "log_file", + ShadowTableCreator.getSafeShadowColumnName("log_file", Set.of("column1", "column2"))); + + // Single collision: should add one prefix + assertEquals( + "shadow_log_file", + ShadowTableCreator.getSafeShadowColumnName("log_file", Set.of("column1", "log_file"))); + + // Multiple collisions: should add two prefixes + assertEquals( + "shadow_shadow_log_file", + ShadowTableCreator.getSafeShadowColumnName( + "log_file", Set.of("log_file", "shadow_log_file"))); + + // Case-insensitivity test + assertEquals( + "shadow_log_file", + ShadowTableCreator.getSafeShadowColumnName("log_file", Set.of("column1", "LOG_FILE"))); + } } From 0d8b8144a4400d71bda5cb18989c389dd69f5936 Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Thu, 11 Dec 2025 11:56:56 +0530 Subject: [PATCH 09/13] immutable changes --- .../datastream/ChangeEventContext.java | 59 +++++++++++-------- .../datastream/ChangeEventConvertor.java | 3 - .../datastream/MySqlChangeEventContext.java | 19 ++---- .../datastream/MySqlChangeEventSequence.java | 10 ++-- .../datastream/OracleChangeEventContext.java | 14 ++--- .../datastream/OracleChangeEventSequence.java | 3 +- .../PostgresChangeEventContext.java | 14 ++--- .../PostgresChangeEventSequence.java | 4 +- .../templates/spanner/ShadowTableCreator.java | 4 +- 9 files changed, 61 insertions(+), 69 deletions(-) diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java index 972ffa8e2e..5f81765884 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java @@ -22,17 +22,24 @@ import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.TransactionContext; import com.google.cloud.teleport.v2.spanner.ddl.Ddl; +import com.google.cloud.teleport.v2.spanner.ddl.Table; import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventSpannerConvertor; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException; import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerReadUtils; +import com.google.cloud.teleport.v2.templates.spanner.ShadowTableCreator; +import com.google.common.collect.ImmutableMap; import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * ChangeEventContext class converts change events to Cloud Spanner mutations and stores all * intermediary objects prior to applying them to Cloud Spanner. @@ -64,11 +71,31 @@ public abstract class ChangeEventContext { // The following fields store the "safe" names (to avoid collision with data column names) of the // shadow table columns. - protected String safeTimestampColumn; - protected String safeLogFileColumn; - protected String safeLogPositionColumn; - protected String safeScnColumn; - protected String safeLsnColumn; + protected final ImmutableMap safeShadowColNames; + + protected ChangeEventContext( + JsonNode changeEvent, Ddl ddl, Map> shadowColumnConstants) + throws InvalidChangeEventException, DroppedTableException { + this.changeEvent = changeEvent; + this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(); + Table table = ddl.table(this.dataTable); + if (table == null) { + throw new DroppedTableException("Table not found in DDL: " + this.dataTable); + } + + Set existingPrimaryKeyColumnNames = + table.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet()); + + ImmutableMap.Builder mapBuilder = ImmutableMap.builder(); + for (Map.Entry> entry : shadowColumnConstants.entrySet()) { + String metadataKey = entry.getKey(); + String originalShadowName = entry.getValue().getLeft(); + String safeName = + ShadowTableCreator.getSafeShadowColumnName(originalShadowName, existingPrimaryKeyColumnNames); + mapBuilder.put(metadataKey, safeName); + } + this.safeShadowColNames = mapBuilder.build(); + } // Abstract method to generate shadow table mutation. abstract Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) @@ -124,25 +151,9 @@ public String getShadowTable() { return shadowTable; } - // Getters for the safe shadow table column names. - public String getSafeTimestampColumn() { - return this.safeTimestampColumn; - } - - public String getSafeLogFileColumn() { - return this.safeLogFileColumn; - } - - public String getSafeLogPositionColumn() { - return this.safeLogPositionColumn; - } - - public String getSafeScnColumn() { - return this.safeScnColumn; - } - - public String getSafeLsnColumn() { - return this.safeLsnColumn; + // Getter for the safe shadow table column names. + public String getSafeShadowColumn(String key) { + return safeShadowColNames.get(key); } // Fires a read on Data table with lock scanned ranges. Used to acquire exclusive lock on Data row diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventConvertor.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventConvertor.java index 3584b3caad..62619772d3 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventConvertor.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventConvertor.java @@ -119,9 +119,6 @@ static Mutation.WriteBuilder changeEventToShadowTableMutationBuilder( String shadowTableName = shadowTablePrefix + tableName; try { Table table = ddl.table(shadowTableName); - if (table == null) { - throw new DroppedTableException(shadowTableName + " not found in spanner DDL"); - } ImmutableList keyColumns = table.primaryKeys(); List keyColumnNames = keyColumns.stream() diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java index 1753b91e0b..ec0761420d 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java @@ -37,7 +37,8 @@ class MySqlChangeEventContext extends ChangeEventContext { public MySqlChangeEventContext( JsonNode changeEvent, Ddl ddl, Ddl shadowTableDdl, String shadowTablePrefix) throws ChangeEventConvertorException, InvalidChangeEventException, DroppedTableException { - this.changeEvent = changeEvent; + super(changeEvent, ddl, DatastreamConstants.MYSQL_SORT_ORDER); + this.changeEvent = changeEvent; this.shadowTablePrefix = shadowTablePrefix; this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(); this.shadowTable = shadowTablePrefix + this.dataTable; @@ -46,16 +47,6 @@ public MySqlChangeEventContext( Set primaryKeyColNames = dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet()); - this.safeTimestampColumn = - ShadowTableCreator.getSafeShadowColumnName( - DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft(), primaryKeyColNames); - this.safeLogFileColumn = - ShadowTableCreator.getSafeShadowColumnName( - DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft(), primaryKeyColNames); - this.safeLogPositionColumn = - ShadowTableCreator.getSafeShadowColumnName( - DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft(), primaryKeyColNames); - convertChangeEventToMutation(ddl, shadowTableDdl); } @@ -74,7 +65,7 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) Long changeEventTimestamp = ChangeEventTypeConvertor.toLong( changeEvent, DatastreamConstants.MYSQL_TIMESTAMP_KEY, /* requiredField= */ true); - builder.set(this.safeTimestampColumn).to(Value.int64(changeEventTimestamp)); + builder.set(getSafeShadowColumn(DatastreamConstants.MYSQL_TIMESTAMP_KEY)).to(Value.int64(changeEventTimestamp)); /* MySql backfill events "can" have log file and log file position as null. * Set their value to a value (lexicographically) smaller than any real value. @@ -86,7 +77,7 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) logFile = ""; } // Add log file information to shadow table mutation - builder.set(this.safeLogFileColumn).to(logFile); + builder.set(getSafeShadowColumn(DatastreamConstants.MYSQL_LOGFILE_KEY)).to(logFile); Long logPosition = ChangeEventTypeConvertor.toLong( @@ -95,7 +86,7 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) logPosition = new Long(-1); } // Add logfile position information to shadow table mutation - builder.set(this.safeLogPositionColumn).to(Value.int64(logPosition)); + builder.set(getSafeShadowColumn(DatastreamConstants.MYSQL_LOGPOSITION_KEY)).to(Value.int64(logPosition)); return builder.build(); } diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequence.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequence.java index 0a64240920..18d8b74242 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequence.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequence.java @@ -112,11 +112,11 @@ public static MySqlChangeEventSequence createFromShadowTable( String shadowTable = context.getShadowTable(); Key primaryKey = context.getPrimaryKey(); // Read columns from shadow table - List readColumnList = - java.util.Arrays.asList( - context.getSafeTimestampColumn(), - context.getSafeLogFileColumn(), - context.getSafeLogPositionColumn()); + List readColumnList = + java.util.Arrays.asList( + context.getSafeShadowColumn(DatastreamConstants.MYSQL_TIMESTAMP_KEY), + context.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGFILE_KEY), + context.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGPOSITION_KEY)); Struct row; // TODO: After beam release, use the latest client lib version which supports setting lock // hints via the read api. SQL string generation should be removed. diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java index df1f046b3c..e9facdeb0d 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java @@ -37,7 +37,8 @@ class OracleChangeEventContext extends ChangeEventContext { public OracleChangeEventContext( JsonNode changeEvent, Ddl ddl, Ddl shadowTableDdl, String shadowTablePrefix) throws ChangeEventConvertorException, InvalidChangeEventException, DroppedTableException { - this.changeEvent = changeEvent; + super(changeEvent, ddl, DatastreamConstants.ORACLE_SORT_ORDER); + this.changeEvent = changeEvent; this.shadowTablePrefix = shadowTablePrefix; this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(); this.shadowTable = shadowTablePrefix + this.dataTable; @@ -46,13 +47,6 @@ public OracleChangeEventContext( Set primaryKeyColNames = dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet()); - this.safeTimestampColumn = - ShadowTableCreator.getSafeShadowColumnName( - DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft(), primaryKeyColNames); - this.safeScnColumn = - ShadowTableCreator.getSafeShadowColumnName( - DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft(), primaryKeyColNames); - convertChangeEventToMutation(ddl, shadowTableDdl); } @@ -71,7 +65,7 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) Long changeEventTimestamp = ChangeEventTypeConvertor.toLong( changeEvent, DatastreamConstants.ORACLE_TIMESTAMP_KEY, /* requiredField= */ true); - builder.set(this.safeTimestampColumn).to(Value.int64(changeEventTimestamp)); + builder.set(getSafeShadowColumn(DatastreamConstants.ORACLE_TIMESTAMP_KEY)).to(Value.int64(changeEventTimestamp)); /* Oracle backfill events "can" have SCN value as null. * Set the value to a value smaller than any real value. @@ -83,7 +77,7 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) changeEventSCN = new Long(-1); } // Add scn information to shadow table mutation - builder.set(this.safeScnColumn).to(Value.int64(changeEventSCN)); + builder.set(getSafeShadowColumn(DatastreamConstants.ORACLE_SCN_KEY)).to(Value.int64(changeEventSCN)); return builder.build(); } diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequence.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequence.java index b717077a2c..65be6ba0c0 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequence.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequence.java @@ -87,7 +87,8 @@ public static OracleChangeEventSequence createFromShadowTable( Key primaryKey = context.getPrimaryKey(); // Read columns from shadow table List readColumnList = - java.util.Arrays.asList(context.getSafeTimestampColumn(), context.getSafeScnColumn()); + java.util.Arrays.asList(context.getSafeShadowColumn(DatastreamConstants.ORACLE_TIMESTAMP_KEY), + context.getSafeShadowColumn(DatastreamConstants.ORACLE_SCN_KEY)); Struct row; // TODO: After beam release, use the latest client lib version which supports setting lock // hints via the read api. SQL string generation should be removed. diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java index 3f3962bebe..54cf154e62 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java @@ -37,7 +37,8 @@ class PostgresChangeEventContext extends ChangeEventContext { public PostgresChangeEventContext( JsonNode changeEvent, Ddl ddl, Ddl shadowTableDdl, String shadowTablePrefix) throws ChangeEventConvertorException, InvalidChangeEventException, DroppedTableException { - this.changeEvent = changeEvent; + super(changeEvent, ddl, DatastreamConstants.POSTGRES_SORT_ORDER); + this.changeEvent = changeEvent; this.shadowTablePrefix = shadowTablePrefix; this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(); this.shadowTable = shadowTablePrefix + this.dataTable; @@ -46,13 +47,6 @@ public PostgresChangeEventContext( Set primaryKeyColNames = dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet()); - this.safeTimestampColumn = - ShadowTableCreator.getSafeShadowColumnName( - DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft(), primaryKeyColNames); - this.safeLsnColumn = - ShadowTableCreator.getSafeShadowColumnName( - DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft(), primaryKeyColNames); - convertChangeEventToMutation(ddl, shadowTableDdl); } @@ -71,7 +65,7 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) Long changeEventTimestamp = ChangeEventTypeConvertor.toLong( changeEvent, DatastreamConstants.POSTGRES_TIMESTAMP_KEY, /* requiredField= */ true); - builder.set(this.safeTimestampColumn).to(Value.int64(changeEventTimestamp)); + builder.set(getSafeShadowColumn(DatastreamConstants.POSTGRES_TIMESTAMP_KEY)).to(Value.int64(changeEventTimestamp)); /* Postgres backfill events "can" have LSN value as null. * Set the value to a value smaller than any real value. @@ -83,7 +77,7 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) changeEventLSN = ""; } // Add lsn information to shadow table mutation - builder.set(this.safeLsnColumn).to(changeEventLSN); + builder.set(getSafeShadowColumn(DatastreamConstants.POSTGRES_LSN_KEY)).to(changeEventLSN); return builder.build(); } diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequence.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequence.java index e74ad7332c..f1dc392fbc 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequence.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequence.java @@ -87,7 +87,9 @@ public static PostgresChangeEventSequence createFromShadowTable( Key primaryKey = context.getPrimaryKey(); // Read columns from shadow table List readColumnList = - java.util.Arrays.asList(context.getSafeTimestampColumn(), context.getSafeLsnColumn()); + java.util.Arrays.asList( + context.getSafeShadowColumn(DatastreamConstants.POSTGRES_TIMESTAMP_KEY), + context.getSafeShadowColumn(DatastreamConstants.POSTGRES_LSN_KEY)); Struct row; // TODO: After beam release, use the latest client lib version which supports setting lock // hints via the read api. SQL string generation should be removed. diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java index d455da9126..e82f961cf1 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/spanner/ShadowTableCreator.java @@ -111,8 +111,10 @@ private void addChangeEventSequenceColumns( */ public static String getSafeShadowColumnName( String baseShadowColumnName, Set existingPrimaryKeyColumnNames) { + Set normalizedKeys = + existingPrimaryKeyColumnNames.stream().map(String::toLowerCase).collect(Collectors.toSet()); String safeName = baseShadowColumnName; - while (existingPrimaryKeyColumnNames.stream().anyMatch(safeName::equalsIgnoreCase)) { + while (normalizedKeys.contains(safeName.toLowerCase())) { safeName = "shadow_" + safeName; } return safeName; From 418b90c8f9647e9467412762f6fc56f5fa98d7db Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Thu, 11 Dec 2025 12:08:27 +0530 Subject: [PATCH 10/13] spotless + test fixes --- .../datastream/ChangeEventContext.java | 4 +- .../datastream/MySqlChangeEventContext.java | 13 +++--- .../datastream/MySqlChangeEventSequence.java | 10 ++--- .../datastream/OracleChangeEventContext.java | 13 +++--- .../datastream/OracleChangeEventSequence.java | 5 ++- .../PostgresChangeEventContext.java | 9 ++-- .../PostgresChangeEventSequence.java | 4 +- .../ChangeEventSequenceFactoryTest.java | 42 +++++++++---------- .../MySqlChangeEventSequenceTest.java | 9 ++-- .../OracleChangeEventSequenceTest.java | 5 ++- .../PostgresChangeEventSequenceTest.java | 5 ++- 11 files changed, 66 insertions(+), 53 deletions(-) diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java index 5f81765884..4bac33395e 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java @@ -39,7 +39,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * ChangeEventContext class converts change events to Cloud Spanner mutations and stores all * intermediary objects prior to applying them to Cloud Spanner. @@ -91,7 +90,8 @@ protected ChangeEventContext( String metadataKey = entry.getKey(); String originalShadowName = entry.getValue().getLeft(); String safeName = - ShadowTableCreator.getSafeShadowColumnName(originalShadowName, existingPrimaryKeyColumnNames); + ShadowTableCreator.getSafeShadowColumnName( + originalShadowName, existingPrimaryKeyColumnNames); mapBuilder.put(metadataKey, safeName); } this.safeShadowColNames = mapBuilder.build(); diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java index ec0761420d..92a0e5d001 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventContext.java @@ -24,7 +24,6 @@ import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException; -import com.google.cloud.teleport.v2.templates.spanner.ShadowTableCreator; import java.util.Set; import java.util.stream.Collectors; @@ -37,8 +36,8 @@ class MySqlChangeEventContext extends ChangeEventContext { public MySqlChangeEventContext( JsonNode changeEvent, Ddl ddl, Ddl shadowTableDdl, String shadowTablePrefix) throws ChangeEventConvertorException, InvalidChangeEventException, DroppedTableException { - super(changeEvent, ddl, DatastreamConstants.MYSQL_SORT_ORDER); - this.changeEvent = changeEvent; + super(changeEvent, ddl, DatastreamConstants.MYSQL_SORT_ORDER); + this.changeEvent = changeEvent; this.shadowTablePrefix = shadowTablePrefix; this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(); this.shadowTable = shadowTablePrefix + this.dataTable; @@ -65,7 +64,9 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) Long changeEventTimestamp = ChangeEventTypeConvertor.toLong( changeEvent, DatastreamConstants.MYSQL_TIMESTAMP_KEY, /* requiredField= */ true); - builder.set(getSafeShadowColumn(DatastreamConstants.MYSQL_TIMESTAMP_KEY)).to(Value.int64(changeEventTimestamp)); + builder + .set(getSafeShadowColumn(DatastreamConstants.MYSQL_TIMESTAMP_KEY)) + .to(Value.int64(changeEventTimestamp)); /* MySql backfill events "can" have log file and log file position as null. * Set their value to a value (lexicographically) smaller than any real value. @@ -86,7 +87,9 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) logPosition = new Long(-1); } // Add logfile position information to shadow table mutation - builder.set(getSafeShadowColumn(DatastreamConstants.MYSQL_LOGPOSITION_KEY)).to(Value.int64(logPosition)); + builder + .set(getSafeShadowColumn(DatastreamConstants.MYSQL_LOGPOSITION_KEY)) + .to(Value.int64(logPosition)); return builder.build(); } diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequence.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequence.java index 18d8b74242..85e0e9377d 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequence.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequence.java @@ -112,11 +112,11 @@ public static MySqlChangeEventSequence createFromShadowTable( String shadowTable = context.getShadowTable(); Key primaryKey = context.getPrimaryKey(); // Read columns from shadow table - List readColumnList = - java.util.Arrays.asList( - context.getSafeShadowColumn(DatastreamConstants.MYSQL_TIMESTAMP_KEY), - context.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGFILE_KEY), - context.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGPOSITION_KEY)); + List readColumnList = + java.util.Arrays.asList( + context.getSafeShadowColumn(DatastreamConstants.MYSQL_TIMESTAMP_KEY), + context.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGFILE_KEY), + context.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGPOSITION_KEY)); Struct row; // TODO: After beam release, use the latest client lib version which supports setting lock // hints via the read api. SQL string generation should be removed. diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java index e9facdeb0d..50344d16ed 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventContext.java @@ -24,7 +24,6 @@ import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException; -import com.google.cloud.teleport.v2.templates.spanner.ShadowTableCreator; import java.util.Set; import java.util.stream.Collectors; @@ -37,8 +36,8 @@ class OracleChangeEventContext extends ChangeEventContext { public OracleChangeEventContext( JsonNode changeEvent, Ddl ddl, Ddl shadowTableDdl, String shadowTablePrefix) throws ChangeEventConvertorException, InvalidChangeEventException, DroppedTableException { - super(changeEvent, ddl, DatastreamConstants.ORACLE_SORT_ORDER); - this.changeEvent = changeEvent; + super(changeEvent, ddl, DatastreamConstants.ORACLE_SORT_ORDER); + this.changeEvent = changeEvent; this.shadowTablePrefix = shadowTablePrefix; this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(); this.shadowTable = shadowTablePrefix + this.dataTable; @@ -65,7 +64,9 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) Long changeEventTimestamp = ChangeEventTypeConvertor.toLong( changeEvent, DatastreamConstants.ORACLE_TIMESTAMP_KEY, /* requiredField= */ true); - builder.set(getSafeShadowColumn(DatastreamConstants.ORACLE_TIMESTAMP_KEY)).to(Value.int64(changeEventTimestamp)); + builder + .set(getSafeShadowColumn(DatastreamConstants.ORACLE_TIMESTAMP_KEY)) + .to(Value.int64(changeEventTimestamp)); /* Oracle backfill events "can" have SCN value as null. * Set the value to a value smaller than any real value. @@ -77,7 +78,9 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) changeEventSCN = new Long(-1); } // Add scn information to shadow table mutation - builder.set(getSafeShadowColumn(DatastreamConstants.ORACLE_SCN_KEY)).to(Value.int64(changeEventSCN)); + builder + .set(getSafeShadowColumn(DatastreamConstants.ORACLE_SCN_KEY)) + .to(Value.int64(changeEventSCN)); return builder.build(); } diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequence.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequence.java index 65be6ba0c0..f56853617d 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequence.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequence.java @@ -87,8 +87,9 @@ public static OracleChangeEventSequence createFromShadowTable( Key primaryKey = context.getPrimaryKey(); // Read columns from shadow table List readColumnList = - java.util.Arrays.asList(context.getSafeShadowColumn(DatastreamConstants.ORACLE_TIMESTAMP_KEY), - context.getSafeShadowColumn(DatastreamConstants.ORACLE_SCN_KEY)); + java.util.Arrays.asList( + context.getSafeShadowColumn(DatastreamConstants.ORACLE_TIMESTAMP_KEY), + context.getSafeShadowColumn(DatastreamConstants.ORACLE_SCN_KEY)); Struct row; // TODO: After beam release, use the latest client lib version which supports setting lock // hints via the read api. SQL string generation should be removed. diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java index 54cf154e62..57c420be45 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java @@ -24,7 +24,6 @@ import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException; import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException; -import com.google.cloud.teleport.v2.templates.spanner.ShadowTableCreator; import java.util.Set; import java.util.stream.Collectors; @@ -37,8 +36,8 @@ class PostgresChangeEventContext extends ChangeEventContext { public PostgresChangeEventContext( JsonNode changeEvent, Ddl ddl, Ddl shadowTableDdl, String shadowTablePrefix) throws ChangeEventConvertorException, InvalidChangeEventException, DroppedTableException { - super(changeEvent, ddl, DatastreamConstants.POSTGRES_SORT_ORDER); - this.changeEvent = changeEvent; + super(changeEvent, ddl, DatastreamConstants.POSTGRES_SORT_ORDER); + this.changeEvent = changeEvent; this.shadowTablePrefix = shadowTablePrefix; this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(); this.shadowTable = shadowTablePrefix + this.dataTable; @@ -65,7 +64,9 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) Long changeEventTimestamp = ChangeEventTypeConvertor.toLong( changeEvent, DatastreamConstants.POSTGRES_TIMESTAMP_KEY, /* requiredField= */ true); - builder.set(getSafeShadowColumn(DatastreamConstants.POSTGRES_TIMESTAMP_KEY)).to(Value.int64(changeEventTimestamp)); + builder + .set(getSafeShadowColumn(DatastreamConstants.POSTGRES_TIMESTAMP_KEY)) + .to(Value.int64(changeEventTimestamp)); /* Postgres backfill events "can" have LSN value as null. * Set the value to a value smaller than any real value. diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequence.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequence.java index f1dc392fbc..1ef41b6464 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequence.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequence.java @@ -88,8 +88,8 @@ public static PostgresChangeEventSequence createFromShadowTable( // Read columns from shadow table List readColumnList = java.util.Arrays.asList( - context.getSafeShadowColumn(DatastreamConstants.POSTGRES_TIMESTAMP_KEY), - context.getSafeShadowColumn(DatastreamConstants.POSTGRES_LSN_KEY)); + context.getSafeShadowColumn(DatastreamConstants.POSTGRES_TIMESTAMP_KEY), + context.getSafeShadowColumn(DatastreamConstants.POSTGRES_LSN_KEY)); Struct row; // TODO: After beam release, use the latest client lib version which supports setting lock // hints via the read api. SQL string generation should be removed. diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactoryTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactoryTest.java index daf3673701..7a8068f619 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactoryTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventSequenceFactoryTest.java @@ -186,11 +186,11 @@ public void canCreateMySqlChangeEventSequenceFromShadowTable() throws Exception ChangeEventContext mockContext = getMockMySqlChangeEventContext(/* addMysqlPositionFields= */ true, /* cdcEvent= */ true); - when(mockContext.getSafeTimestampColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_TIMESTAMP_KEY)) .thenReturn(DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft()); - when(mockContext.getSafeLogFileColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGFILE_KEY)) .thenReturn(DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft()); - when(mockContext.getSafeLogPositionColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGPOSITION_KEY)) .thenReturn(DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft()); // Mock transaction which can read a row from shadow table. @@ -224,11 +224,11 @@ public void canCreateMySqlChangeEventSequenceFromShadowTableForDumpEvent() throw ChangeEventContext mockContext = getMockMySqlChangeEventContext(/* addMysqlPositionFields= */ true, /* cdcEvent= */ true); - when(mockContext.getSafeTimestampColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_TIMESTAMP_KEY)) .thenReturn(DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft()); - when(mockContext.getSafeLogFileColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGFILE_KEY)) .thenReturn(DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft()); - when(mockContext.getSafeLogPositionColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGPOSITION_KEY)) .thenReturn(DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft()); // Mock transaction which can read a row from shadow table. @@ -260,11 +260,11 @@ public void cannotCreateMySqlChangeEventSequenceWhenMissingRecordInShadowTable() ChangeEventContext mockContext = getMockMySqlChangeEventContext(/* addMysqlPositionFields= */ true, /* cdcEvent= */ true); - when(mockContext.getSafeTimestampColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_TIMESTAMP_KEY)) .thenReturn(DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft()); - when(mockContext.getSafeLogFileColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGFILE_KEY)) .thenReturn(DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft()); - when(mockContext.getSafeLogPositionColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGPOSITION_KEY)) .thenReturn(DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft()); // mock transaction which cannot find a row from shadow table. @@ -336,9 +336,9 @@ public void canCreateOracleChangeEventSequenceFromShadowTable() throws Exception ChangeEventContext mockContext = getMockOracleChangeEventContext(/* addOraclePositionFields= */ true, /* cdcEvent= */ true); - when(mockContext.getSafeTimestampColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.ORACLE_TIMESTAMP_KEY)) .thenReturn(DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft()); - when(mockContext.getSafeScnColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.ORACLE_SCN_KEY)) .thenReturn(DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft()); // Mock transaction which can read a row from shadow table. @@ -368,9 +368,9 @@ public void canCreateOracleChangeEventSequenceFromShadowTableForDumpEvent() thro ChangeEventContext mockContext = getMockOracleChangeEventContext(/* addOraclePositionFields= */ true, /* cdcEvent= */ true); - when(mockContext.getSafeTimestampColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.ORACLE_TIMESTAMP_KEY)) .thenReturn(DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft()); - when(mockContext.getSafeScnColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.ORACLE_SCN_KEY)) .thenReturn(DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft()); // Mock transaction which can read a row from shadow table. @@ -399,9 +399,9 @@ public void cannotCreateOracleChangeEventSequenceWhenMissingRecordInShadowTable( ChangeEventContext mockContext = getMockOracleChangeEventContext(/* addOraclePositionFields= */ true, /* cdcEvent= */ true); - when(mockContext.getSafeTimestampColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.ORACLE_TIMESTAMP_KEY)) .thenReturn(DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft()); - when(mockContext.getSafeScnColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.ORACLE_SCN_KEY)) .thenReturn(DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft()); // mock transaction which cannot find a row from shadow table. @@ -476,9 +476,9 @@ public void canCreatePostgresChangeEventSequenceFromShadowTable() throws Excepti ChangeEventContext mockContext = getMockPostgresChangeEventContext( /* addPostgresPositionFields= */ true, /* cdcEvent= */ true); - when(mockContext.getSafeTimestampColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.POSTGRES_TIMESTAMP_KEY)) .thenReturn(DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft()); - when(mockContext.getSafeLsnColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.POSTGRES_LSN_KEY)) .thenReturn(DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft()); // Mock transaction which can read a row from shadow table. @@ -510,9 +510,9 @@ public void canCreatePostgresChangeEventSequenceFromShadowTableForDumpEvent() th ChangeEventContext mockContext = getMockPostgresChangeEventContext( /* addPostgresPositionFields= */ true, /* cdcEvent= */ true); - when(mockContext.getSafeTimestampColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.POSTGRES_TIMESTAMP_KEY)) .thenReturn(DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft()); - when(mockContext.getSafeLsnColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.POSTGRES_LSN_KEY)) .thenReturn(DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft()); // Mock transaction which can read a row from shadow table. @@ -542,9 +542,9 @@ public void cannotCreatePostgresChangeEventSequenceWhenMissingRecordInShadowTabl ChangeEventContext mockContext = getMockPostgresChangeEventContext( /* addPostgresPositionFields= */ true, /* cdcEvent= */ true); - when(mockContext.getSafeTimestampColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.POSTGRES_TIMESTAMP_KEY)) .thenReturn(DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft()); - when(mockContext.getSafeLsnColumn()) + when(mockContext.getSafeShadowColumn(DatastreamConstants.POSTGRES_LSN_KEY)) .thenReturn(DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft()); // mock transaction which cannot find a row from shadow table. diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java index 6965a361dc..1fe028d53e 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MySqlChangeEventSequenceTest.java @@ -130,9 +130,12 @@ public void testCreateFromShadowTableWithUseSqlStatements() throws Exception { ChangeEventContext mockContext = mock(ChangeEventContext.class); when(mockContext.getShadowTable()).thenReturn("shadow_table1"); when(mockContext.getPrimaryKey()).thenReturn(Key.of(1L)); - when(mockContext.getSafeTimestampColumn()).thenReturn("shadow_timestamp"); - when(mockContext.getSafeLogFileColumn()).thenReturn("log_file"); - when(mockContext.getSafeLogPositionColumn()).thenReturn("log_position"); + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_TIMESTAMP_KEY)) + .thenReturn("shadow_timestamp"); + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGFILE_KEY)) + .thenReturn("log_file"); + when(mockContext.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGPOSITION_KEY)) + .thenReturn("log_position"); // Mock the behavior of the transaction context Struct mockRow = mock(Struct.class); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java index 64429bd24d..3aec91c4b1 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/OracleChangeEventSequenceTest.java @@ -90,8 +90,9 @@ public void testCreateFromShadowTableWithUseSqlStatements_Oracle() throws Except ChangeEventContext mockContext = mock(ChangeEventContext.class); when(mockContext.getShadowTable()).thenReturn("shadow_table_oracle"); when(mockContext.getPrimaryKey()).thenReturn(Key.of(1L)); - when(mockContext.getSafeTimestampColumn()).thenReturn("shadow_timestamp"); - when(mockContext.getSafeScnColumn()).thenReturn("scn"); + when(mockContext.getSafeShadowColumn(DatastreamConstants.ORACLE_TIMESTAMP_KEY)) + .thenReturn("shadow_timestamp"); + when(mockContext.getSafeShadowColumn(DatastreamConstants.ORACLE_SCN_KEY)).thenReturn("scn"); // Mock the behavior of the transaction context Struct mockRow = mock(Struct.class); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequenceTest.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequenceTest.java index 60a1e97012..2fcb99344c 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequenceTest.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventSequenceTest.java @@ -139,8 +139,9 @@ public void testCreateFromShadowTableWithUseSqlStatements_Postgres() throws Exce ChangeEventContext mockContext = mock(ChangeEventContext.class); when(mockContext.getShadowTable()).thenReturn("shadow_table_postgres"); when(mockContext.getPrimaryKey()).thenReturn(Key.of(1L)); - when(mockContext.getSafeTimestampColumn()).thenReturn("shadow_timestamp"); - when(mockContext.getSafeLsnColumn()).thenReturn("lsn"); + when(mockContext.getSafeShadowColumn(DatastreamConstants.POSTGRES_TIMESTAMP_KEY)) + .thenReturn("shadow_timestamp"); + when(mockContext.getSafeShadowColumn(DatastreamConstants.POSTGRES_LSN_KEY)).thenReturn("lsn"); // Mock the behavior of the transaction context Struct mockRow = mock(Struct.class); From f64b89e25843b73fdb37dac8a65889dd256ea99a Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Thu, 11 Dec 2025 12:13:21 +0530 Subject: [PATCH 11/13] minor changes --- .../teleport/v2/templates/datastream/ChangeEventContext.java | 4 ++-- .../v2/templates/datastream/PostgresChangeEventContext.java | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java index 4bac33395e..a253528819 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java @@ -68,8 +68,8 @@ public abstract class ChangeEventContext { // Data table for the change event. protected String dataTable; - // The following fields store the "safe" names (to avoid collision with data column names) of the - // shadow table columns. + // Immutable map to store the "safe" column names of the shadow table (to avoid collision with + // data column names). protected final ImmutableMap safeShadowColNames; protected ChangeEventContext( diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java index 57c420be45..922d1a46a0 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/PostgresChangeEventContext.java @@ -78,7 +78,9 @@ Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl) changeEventLSN = ""; } // Add lsn information to shadow table mutation - builder.set(getSafeShadowColumn(DatastreamConstants.POSTGRES_LSN_KEY)).to(changeEventLSN); + builder + .set(getSafeShadowColumn(DatastreamConstants.POSTGRES_LSN_KEY)) + .to(Value.string(changeEventLSN)); return builder.build(); } From 0276b6c18a38e7cea8581f1f126fddffcb1b1f28 Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Thu, 11 Dec 2025 16:30:07 +0530 Subject: [PATCH 12/13] minor changes --- .../teleport/v2/templates/datastream/ChangeEventContext.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java index a253528819..9264ea4127 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java @@ -78,10 +78,6 @@ protected ChangeEventContext( this.changeEvent = changeEvent; this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(); Table table = ddl.table(this.dataTable); - if (table == null) { - throw new DroppedTableException("Table not found in DDL: " + this.dataTable); - } - Set existingPrimaryKeyColumnNames = table.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet()); From 3faf25e782a43dd0b76d8b706ee7c943fc9379bd Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Fri, 12 Dec 2025 13:05:33 +0530 Subject: [PATCH 13/13] add null table exception --- .../teleport/v2/templates/datastream/ChangeEventContext.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java index 9264ea4127..7fa5560304 100644 --- a/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java +++ b/v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/datastream/ChangeEventContext.java @@ -78,6 +78,10 @@ protected ChangeEventContext( this.changeEvent = changeEvent; this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(); Table table = ddl.table(this.dataTable); + if (table == null) { + throw new DroppedTableException( + "Table from change event does not exist in Spanner. table=" + this.dataTable); + } Set existingPrimaryKeyColumnNames = table.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet());