Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException;
Expand Down Expand Up @@ -279,7 +280,7 @@ public void processElement(ProcessContext c) {
c, changeEventContext, currentChangeEventSequence, shadowTableDdl, ddl);
} else {
processSingleDatabaseTransaction(
c, changeEventContext, currentChangeEventSequence, shadowTableDdl);
c, changeEventContext, currentChangeEventSequence, shadowTableDdl, ddl);
}
com.google.cloud.Timestamp timestamp = com.google.cloud.Timestamp.now();
c.output(timestamp);
Expand Down Expand Up @@ -378,7 +379,8 @@ private void processSingleDatabaseTransaction(
ProcessContext c,
ChangeEventContext changeEventContext,
ChangeEventSequence currentChangeEventSequence,
Ddl shadowDdl) {
Ddl shadowDdl,
Ddl ddl) {

spannerAccessor
.getDatabaseClient()
Expand All @@ -403,7 +405,14 @@ private void processSingleDatabaseTransaction(
skippedEvents.inc();
return null;
}
// Apply shadow and data table mutations.
// Execute DML if applicable
Statement dataDml = changeEventContext.getDataDmlStatement(ddl);

if (dataDml != null) {
transaction.executeUpdate(dataDml);
}

// Apply shadow and data table mutations (only if they exist)
transaction.buffer(changeEventContext.getMutations());
isInTransaction.set(false);
return null;
Expand Down Expand Up @@ -504,8 +513,17 @@ void processCrossDatabaseTransaction(
"Shadow table sequence changed during transaction");
}

// Execute Data DML if applicable
Statement dataDml =
changeEventContext.getDataDmlStatement(dataTableDdl);
if (dataDml != null) {
mainTxn.executeUpdate(dataDml);
}

// Write to main table
mainTxn.buffer(changeEventContext.getDataMutation());
if (changeEventContext.getDataMutation() != null) {
mainTxn.buffer(changeEventContext.getDataMutation());
}
return null;
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.Value;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.ddl.IndexColumn;
import com.google.cloud.teleport.v2.spanner.ddl.Table;
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventSpannerConvertor;
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventTypeConvertor;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException;
import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerReadUtils;
import com.google.cloud.teleport.v2.templates.spanner.ShadowTableCreator;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -107,23 +110,94 @@ protected void convertChangeEventToMutation(Ddl ddl, Ddl shadowTableDdl)
ChangeEventConvertor.convertChangeEventColumnKeysToLowerCase(changeEvent);
ChangeEventConvertor.verifySpannerSchema(ddl, changeEvent);

boolean hasGeneratedPK = false;
Table table = ddl.table(this.dataTable);
if (table != null) {
hasGeneratedPK = hasGeneratedPK(table);
}

this.primaryKey =
ChangeEventSpannerConvertor.changeEventToPrimaryKey(
changeEvent.get(DatastreamConstants.EVENT_TABLE_NAME_KEY).asText(),
ddl,
changeEvent,
/* convertNameToLowerCase= */ true);
this.dataMutation = ChangeEventConvertor.changeEventToMutation(ddl, changeEvent);

String changeType = getChangeType(changeEvent);
boolean isDelete = DatastreamConstants.DELETE_EVENT.equalsIgnoreCase(changeType);

if (hasGeneratedPK && isDelete) {
// For delete events on tables with generated primary keys, we need to use DML
// to delete the row.
this.dataMutation = null;
} else {
this.dataMutation = ChangeEventConvertor.changeEventToMutation(ddl, changeEvent);
}

this.shadowTableMutation = generateShadowTableMutation(ddl, shadowTableDdl);
}

public Statement getDataDmlStatement(Ddl ddl) throws ChangeEventConvertorException {
String changeType = getChangeType(changeEvent);
boolean isDelete = DatastreamConstants.DELETE_EVENT.equalsIgnoreCase(changeType);
if (!isDelete) {
return null;
}
Table table = ddl.table(this.dataTable);
if (table != null && hasGeneratedPK(table)) {
return generateDeleteDml(table, this.dataTable, changeEvent);
}
return null;
}

private boolean hasGeneratedPK(Table table) {
for (IndexColumn keyColumn : table.primaryKeys()) {
if (table.column(keyColumn.name()).isGenerated()) {
return true;
}
}
return false;
}

private Statement generateDeleteDml(Table table, String tableName, JsonNode event)
throws ChangeEventConvertorException {
// TODO: Add support for PostgreSQL
StringBuilder sql = new StringBuilder("DELETE FROM ").append(tableName).append(" WHERE ");
Statement.Builder builder = Statement.newBuilder("");
boolean first = true;
for (com.google.cloud.teleport.v2.spanner.ddl.Column column : table.columns()) {
String colName = column.name();
if (column.isGenerated()) {
continue;
}
if (!first) {
sql.append(" AND ");
}
sql.append(colName).append(" = @").append(colName);
// Bind value
Value value =
ChangeEventTypeConvertor.toValue(event, column.type(), colName, /* requiredField */ true);
builder.bind(colName).to(value);
first = false;
}
builder.replace(sql.toString());
return builder.build();
}

public JsonNode getChangeEvent() {
return changeEvent;
}

// Returns an array of data and shadow table mutations.
public Iterable<Mutation> getMutations() {
return Arrays.asList(dataMutation, shadowTableMutation);
List<Mutation> mutations = new ArrayList<>();
if (dataMutation != null) {
mutations.add(dataMutation);
}
if (shadowTableMutation != null) {
mutations.add(shadowTableMutation);
}
return mutations;
}

// Returns the data table mutation
Expand Down Expand Up @@ -176,4 +250,12 @@ public void readDataTableRowWithExclusiveLock(
// Read the row in order to acquire the lock and discard it.
resultSet.getCurrentRowAsStruct();
}

public static String getChangeType(JsonNode changeEvent) {
return changeEvent.has(DatastreamConstants.EVENT_CHANGE_TYPE_KEY)
? changeEvent
.get(DatastreamConstants.EVENT_CHANGE_TYPE_KEY)
.asText(DatastreamConstants.EMPTY_EVENT)
: DatastreamConstants.EMPTY_EVENT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ Table constructShadowTable(Ddl informationSchema, String dataTableName, Dialect
.filter(col -> primaryKeyColNames.contains(col.name()))
.collect(Collectors.toList());
for (Column col : primaryKeyCols) {
// In Shadow table we only have primary keys. If primary key is dependent on
// non-primary key column, then shadow table creation will fail.
// Hence, generated expression should be removed from the shadow table columns.
col = col.toBuilder().isGenerated(false).generationExpression("").autoBuild();
shadowTableBuilder.addColumn(col);
}

Expand Down
Loading
Loading