Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,20 @@
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.ddl.Table;
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventSpannerConvertor;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException;
import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerReadUtils;
import com.google.cloud.teleport.v2.templates.spanner.ShadowTableCreator;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -62,22 +68,53 @@ public abstract class ChangeEventContext {
// Data table for the change event.
protected String dataTable;

// Immutable map to store the "safe" column names of the shadow table (to avoid collision with
// data column names). <key: metadata name, value: safe shadow col name>
protected final ImmutableMap<String, String> safeShadowColNames;

protected ChangeEventContext(
JsonNode changeEvent, Ddl ddl, Map<String, Pair<String, String>> shadowColumnConstants)
throws InvalidChangeEventException, DroppedTableException {
this.changeEvent = changeEvent;
this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText();
Table table = ddl.table(this.dataTable);
if (table == null) {
throw new DroppedTableException(
"Table from change event does not exist in Spanner. table=" + this.dataTable);
}
Set<String> existingPrimaryKeyColumnNames =
table.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet());

ImmutableMap.Builder<String, String> mapBuilder = ImmutableMap.builder();
for (Map.Entry<String, Pair<String, String>> entry : shadowColumnConstants.entrySet()) {
String metadataKey = entry.getKey();
String originalShadowName = entry.getValue().getLeft();
String safeName =
ShadowTableCreator.getSafeShadowColumnName(
originalShadowName, existingPrimaryKeyColumnNames);
mapBuilder.put(metadataKey, safeName);
}
this.safeShadowColNames = mapBuilder.build();
}

// Abstract method to generate shadow table mutation.
abstract Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorException;
abstract Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl)
throws ChangeEventConvertorException;

// Helper method to convert change event to mutation.
protected void convertChangeEventToMutation(Ddl ddl, Ddl shadowTableDdl)
throws ChangeEventConvertorException, InvalidChangeEventException, DroppedTableException {
ChangeEventConvertor.convertChangeEventColumnKeysToLowerCase(changeEvent);
ChangeEventConvertor.verifySpannerSchema(ddl, changeEvent);

this.primaryKey =
ChangeEventSpannerConvertor.changeEventToPrimaryKey(
changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(),
ddl,
changeEvent,
/* convertNameToLowerCase= */ true);
this.dataMutation = ChangeEventConvertor.changeEventToMutation(ddl, changeEvent);
this.shadowTableMutation = generateShadowTableMutation(shadowTableDdl);
this.shadowTableMutation = generateShadowTableMutation(ddl, shadowTableDdl);
}

public JsonNode getChangeEvent() {
Expand Down Expand Up @@ -114,6 +151,11 @@ public String getShadowTable() {
return shadowTable;
}

// Getter for the safe shadow table column names.
public String getSafeShadowColumn(String key) {
return safeShadowColNames.get(key);
}

// Fires a read on Data table with lock scanned ranges. Used to acquire exclusive lock on Data row
// at the beginning of a readWriteTransaction
public void readDataTableRowWithExclusiveLock(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,13 @@ public static ChangeEventSequence createChangeEventSequenceFromShadowTable(

if (DatastreamConstants.MYSQL_SOURCE_TYPE.equals(sourceType)) {
return MySqlChangeEventSequence.createFromShadowTable(
transactionContext,
changeEventContext.getShadowTable(),
shadowDdl,
changeEventContext.getPrimaryKey(),
useSqlStatements);
transactionContext, changeEventContext, shadowDdl, useSqlStatements);
} else if (DatastreamConstants.ORACLE_SOURCE_TYPE.equals(sourceType)) {
return OracleChangeEventSequence.createFromShadowTable(
transactionContext,
changeEventContext.getShadowTable(),
shadowDdl,
changeEventContext.getPrimaryKey(),
useSqlStatements);
transactionContext, changeEventContext, shadowDdl, useSqlStatements);
} else if (DatastreamConstants.POSTGRES_SOURCE_TYPE.equals(sourceType)) {
return PostgresChangeEventSequence.createFromShadowTable(
transactionContext,
changeEventContext.getShadowTable(),
shadowDdl,
changeEventContext.getPrimaryKey(),
useSqlStatements);
transactionContext, changeEventContext, shadowDdl, useSqlStatements);
}
throw new InvalidChangeEventException("Unsupported source database: " + sourceType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Value;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.ddl.Table;
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventTypeConvertor;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException;
import java.util.Set;
import java.util.stream.Collectors;

/**
* MySql implementation of ChangeEventContext that provides implementation of the
Expand All @@ -33,29 +36,36 @@ class MySqlChangeEventContext extends ChangeEventContext {
public MySqlChangeEventContext(
JsonNode changeEvent, Ddl ddl, Ddl shadowTableDdl, String shadowTablePrefix)
throws ChangeEventConvertorException, InvalidChangeEventException, DroppedTableException {
super(changeEvent, ddl, DatastreamConstants.MYSQL_SORT_ORDER);
this.changeEvent = changeEvent;
this.shadowTablePrefix = shadowTablePrefix;
this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText();
this.shadowTable = shadowTablePrefix + this.dataTable;

Table dataTable = ddl.table(this.dataTable);
Set<String> primaryKeyColNames =
dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet());

convertChangeEventToMutation(ddl, shadowTableDdl);
}

/*
* Creates shadow table mutation for MySql.
*/
@Override
Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorException {
Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl)
throws ChangeEventConvertorException {
// Get shadow information from change event mutation context
Mutation.WriteBuilder builder =
ChangeEventConvertor.changeEventToShadowTableMutationBuilder(
ddl, changeEvent, shadowTablePrefix);
shadowDdl, changeEvent, shadowTablePrefix);

// Add timestamp information to shadow table mutation
Long changeEventTimestamp =
ChangeEventTypeConvertor.toLong(
changeEvent, DatastreamConstants.MYSQL_TIMESTAMP_KEY, /* requiredField= */ true);
builder
.set(DatastreamConstants.MYSQL_TIMESTAMP_SHADOW_INFO.getLeft())
.set(getSafeShadowColumn(DatastreamConstants.MYSQL_TIMESTAMP_KEY))
.to(Value.int64(changeEventTimestamp));

/* MySql backfill events "can" have log file and log file position as null.
Expand All @@ -68,7 +78,7 @@ Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorExcepti
logFile = "";
}
// Add log file information to shadow table mutation
builder.set(DatastreamConstants.MYSQL_LOGFILE_SHADOW_INFO.getLeft()).to(logFile);
builder.set(getSafeShadowColumn(DatastreamConstants.MYSQL_LOGFILE_KEY)).to(logFile);

Long logPosition =
ChangeEventTypeConvertor.toLong(
Expand All @@ -78,7 +88,7 @@ Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorExcepti
}
// Add logfile position information to shadow table mutation
builder
.set(DatastreamConstants.MYSQL_LOGPOSITION_SHADOW_INFO.getLeft())
.set(getSafeShadowColumn(DatastreamConstants.MYSQL_LOGPOSITION_KEY))
.to(Value.int64(logPosition));

return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException;
import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerReadUtils;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -99,24 +98,25 @@ public static MySqlChangeEventSequence createFromChangeEvent(ChangeEventContext
/*
* Creates a MySqlChangeEventSequence by reading from a shadow table.
* @param transactionContext The transaction context to use for reading from the shadow table
* @param shadowTable The name of the shadow table to read from
* @param primaryKey The primary key to look up in the shadow table
* @param context The change event context with resolved safe shadow column names
* @param useSqlStatements If true, performs shadow table read using SQL statement with exclusive lock on row
*/
public static MySqlChangeEventSequence createFromShadowTable(
final TransactionContext transactionContext,
String shadowTable,
ChangeEventContext context,
Ddl shadowTableDdl,
Key primaryKey,
boolean useSqlStatements)
throws ChangeEventSequenceCreationException {

try {
String shadowTable = context.getShadowTable();
Key primaryKey = context.getPrimaryKey();
// Read columns from shadow table
List<String> readColumnList =
DatastreamConstants.MYSQL_SORT_ORDER.values().stream()
.map(p -> p.getLeft())
.collect(Collectors.toList());
java.util.Arrays.asList(
context.getSafeShadowColumn(DatastreamConstants.MYSQL_TIMESTAMP_KEY),
context.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGFILE_KEY),
context.getSafeShadowColumn(DatastreamConstants.MYSQL_LOGPOSITION_KEY));
Struct row;
// TODO: After beam release, use the latest client lib version which supports setting lock
// hints via the read api. SQL string generation should be removed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Value;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.ddl.Table;
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventTypeConvertor;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Oracle implementation of ChangeEventContext that provides implementation of the
Expand All @@ -33,29 +36,36 @@ class OracleChangeEventContext extends ChangeEventContext {
public OracleChangeEventContext(
JsonNode changeEvent, Ddl ddl, Ddl shadowTableDdl, String shadowTablePrefix)
throws ChangeEventConvertorException, InvalidChangeEventException, DroppedTableException {
super(changeEvent, ddl, DatastreamConstants.ORACLE_SORT_ORDER);
this.changeEvent = changeEvent;
this.shadowTablePrefix = shadowTablePrefix;
this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText();
this.shadowTable = shadowTablePrefix + this.dataTable;

Table dataTable = ddl.table(this.dataTable);
Set<String> primaryKeyColNames =
dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet());

convertChangeEventToMutation(ddl, shadowTableDdl);
}

/*
* Creates shadow table mutation for Oracle.
*/
@Override
Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorException {
Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl)
throws ChangeEventConvertorException {
// Get shadow information from change event mutation context
Mutation.WriteBuilder builder =
ChangeEventConvertor.changeEventToShadowTableMutationBuilder(
ddl, changeEvent, shadowTablePrefix);
shadowDdl, changeEvent, shadowTablePrefix);

// Add timestamp information to shadow table mutation
Long changeEventTimestamp =
ChangeEventTypeConvertor.toLong(
changeEvent, DatastreamConstants.ORACLE_TIMESTAMP_KEY, /* requiredField= */ true);
builder
.set(DatastreamConstants.ORACLE_TIMESTAMP_SHADOW_INFO.getLeft())
.set(getSafeShadowColumn(DatastreamConstants.ORACLE_TIMESTAMP_KEY))
.to(Value.int64(changeEventTimestamp));

/* Oracle backfill events "can" have SCN value as null.
Expand All @@ -69,7 +79,7 @@ Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorExcepti
}
// Add scn information to shadow table mutation
builder
.set(DatastreamConstants.ORACLE_SCN_SHADOW_INFO.getLeft())
.set(getSafeShadowColumn(DatastreamConstants.ORACLE_SCN_KEY))
.to(Value.int64(changeEventSCN));

return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException;
import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerReadUtils;
import java.util.List;
import java.util.stream.Collectors;

/**
* Implementation of ChangeEventSequence for Oracle database which stores change event sequence
Expand Down Expand Up @@ -78,18 +77,19 @@ public static OracleChangeEventSequence createFromChangeEvent(ChangeEventContext
*/
public static OracleChangeEventSequence createFromShadowTable(
final TransactionContext transactionContext,
String shadowTable,
ChangeEventContext context,
Ddl shadowTableDdl,
Key primaryKey,
boolean useSqlStatements)
throws ChangeEventSequenceCreationException {

try {
String shadowTable = context.getShadowTable();
Key primaryKey = context.getPrimaryKey();
// Read columns from shadow table
List<String> readColumnList =
DatastreamConstants.ORACLE_SORT_ORDER.values().stream()
.map(p -> p.getLeft())
.collect(Collectors.toList());
java.util.Arrays.asList(
context.getSafeShadowColumn(DatastreamConstants.ORACLE_TIMESTAMP_KEY),
context.getSafeShadowColumn(DatastreamConstants.ORACLE_SCN_KEY));
Struct row;
// TODO: After beam release, use the latest client lib version which supports setting lock
// hints via the read api. SQL string generation should be removed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Value;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.ddl.Table;
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventTypeConvertor;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Postgres implementation of ChangeEventContext that provides implementation of the
Expand All @@ -33,29 +36,36 @@ class PostgresChangeEventContext extends ChangeEventContext {
public PostgresChangeEventContext(
JsonNode changeEvent, Ddl ddl, Ddl shadowTableDdl, String shadowTablePrefix)
throws ChangeEventConvertorException, InvalidChangeEventException, DroppedTableException {
super(changeEvent, ddl, DatastreamConstants.POSTGRES_SORT_ORDER);
this.changeEvent = changeEvent;
this.shadowTablePrefix = shadowTablePrefix;
this.dataTable = changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText();
this.shadowTable = shadowTablePrefix + this.dataTable;

Table dataTable = ddl.table(this.dataTable);
Set<String> primaryKeyColNames =
dataTable.primaryKeys().stream().map(k -> k.name()).collect(Collectors.toSet());

convertChangeEventToMutation(ddl, shadowTableDdl);
}

/*
* Creates shadow table mutation for Postgres.
*/
@Override
Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorException {
Mutation generateShadowTableMutation(Ddl ddl, Ddl shadowDdl)
throws ChangeEventConvertorException {
// Get shadow information from change event mutation context
Mutation.WriteBuilder builder =
ChangeEventConvertor.changeEventToShadowTableMutationBuilder(
ddl, changeEvent, shadowTablePrefix);
shadowDdl, changeEvent, shadowTablePrefix);

// Add timestamp information to shadow table mutation
Long changeEventTimestamp =
ChangeEventTypeConvertor.toLong(
changeEvent, DatastreamConstants.POSTGRES_TIMESTAMP_KEY, /* requiredField= */ true);
builder
.set(DatastreamConstants.POSTGRES_TIMESTAMP_SHADOW_INFO.getLeft())
.set(getSafeShadowColumn(DatastreamConstants.POSTGRES_TIMESTAMP_KEY))
.to(Value.int64(changeEventTimestamp));

/* Postgres backfill events "can" have LSN value as null.
Expand All @@ -69,7 +79,7 @@ Mutation generateShadowTableMutation(Ddl ddl) throws ChangeEventConvertorExcepti
}
// Add lsn information to shadow table mutation
builder
.set(DatastreamConstants.POSTGRES_LSN_SHADOW_INFO.getLeft())
.set(getSafeShadowColumn(DatastreamConstants.POSTGRES_LSN_KEY))
.to(Value.string(changeEventLSN));

return builder.build();
Expand Down
Loading
Loading