Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@
import com.google.cloud.teleport.v2.templates.exceptions.InvalidDMLGenerationException;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorRequest;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.joda.time.Duration;
Expand Down Expand Up @@ -132,18 +129,6 @@ public static boolean processRecord(
break;
}

Counter numRecProcessedMetric =
Metrics.counter(shardId, "records_written_to_source_" + shardId);

numRecProcessedMetric.inc(1); // update the number of records processed metric
Distribution lagMetric =
Metrics.distribution(shardId, "replication_lag_in_seconds_" + shardId);

Instant instTime = Instant.now();
Instant commitTsInst = spannerRecord.getCommitTimestamp().toSqlTimestamp().toInstant();
long replicationLag = ChronoUnit.SECONDS.between(commitTsInst, instTime);

lagMetric.update(replicationLag); // update the lag metric
return false;
} catch (Exception e) {
// Not logging the error here since the error can be retryable error and high number of them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@
import com.google.common.collect.ImmutableList;
import com.google.gson.Gson;
import java.io.Serializable;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.metrics.Counter;
Expand Down Expand Up @@ -221,83 +224,100 @@ public void processElement(ProcessContext c) {
ChangeEventSpannerConvertor.changeEventToPrimaryKey(
tableName, ddl, keysJson, /* convertNameToLowerCase= */ false);
String shadowTableName = shadowTablePrefix + tableName;
spannerDao
.getDatabaseClient()
.readWriteTransaction(Options.priority(spannerConfig.getRpcPriority().get()))
.run(
(TransactionRunner.TransactionCallable<Void>)
shadowTransaction -> {
boolean isSourceAhead = false;
ShadowTableRecord shadowTableRecord =
spannerDao.readShadowTableRecordWithExclusiveLock(
shadowTableName, primaryKey, shadowTableDdl, shadowTransaction);
isSourceAhead =
shadowTableRecord != null
&& ((shadowTableRecord
.getProcessedCommitTimestamp()
.compareTo(spannerRec.getCommitTimestamp())
> 0) // either the source already has record with greater
// commit
// timestamp
|| (shadowTableRecord // or the source has the same commit
// timestamp but
// greater record sequence
Boolean transactionResult =
spannerDao
.getDatabaseClient()
.readWriteTransaction(Options.priority(spannerConfig.getRpcPriority().get()))
.run(
(TransactionRunner.TransactionCallable<Boolean>)
shadowTransaction -> {
boolean isSourceAhead = false;
// Boolean reference to capture if the record was written in the
// transaction
AtomicBoolean isRecordWritten = new AtomicBoolean(false);
ShadowTableRecord shadowTableRecord =
spannerDao.readShadowTableRecordWithExclusiveLock(
shadowTableName, primaryKey, shadowTableDdl, shadowTransaction);
isSourceAhead =
shadowTableRecord != null
&& ((shadowTableRecord
.getProcessedCommitTimestamp()
.compareTo(spannerRec.getCommitTimestamp())
== 0
&& shadowTableRecord.getRecordSequence()
> Long.parseLong(spannerRec.getRecordSequence())));
> 0) // either the source already has record with greater
// commit
// timestamp
|| (shadowTableRecord // or the source has the same commit
// timestamp but
// greater record sequence
.getProcessedCommitTimestamp()
.compareTo(spannerRec.getCommitTimestamp())
== 0
&& shadowTableRecord.getRecordSequence()
>= Long.parseLong(spannerRec.getRecordSequence())));

if (!isSourceAhead) {
IDao sourceDao = sourceProcessor.getSourceDao(shardId);
TransactionalCheck check =
() -> {
ShadowTableRecord newShadowTableRecord =
spannerDao.readShadowTableRecordWithExclusiveLock(
shadowTableName,
primaryKey,
shadowTableDdl,
shadowTransaction);
if (!ShadowTableRecord.isEquals(
shadowTableRecord, newShadowTableRecord)) {
throw new TransactionalCheckException(
"Shadow table sequence changed during transaction");
}
};
boolean isEventFiltered =
InputRecordProcessor.processRecord(
spannerRec,
schemaMapper,
ddl,
sourceSchema,
sourceDao,
shardId,
sourceDbTimezoneOffset,
sourceProcessor.getDmlGenerator(),
spannerToSourceTransformer,
this.source,
check);
if (isEventFiltered) {
outputWithTag(
c,
Constants.FILTERED_TAG,
Constants.FILTERED_TAG_MESSAGE,
spannerRec);
}
if (!isSourceAhead) {
IDao sourceDao = sourceProcessor.getSourceDao(shardId);
TransactionalCheck check =
() -> {
ShadowTableRecord newShadowTableRecord =
spannerDao.readShadowTableRecordWithExclusiveLock(
shadowTableName,
primaryKey,
shadowTableDdl,
shadowTransaction);
if (!ShadowTableRecord.isEquals(
shadowTableRecord, newShadowTableRecord)) {
throw new TransactionalCheckException(
"Shadow table sequence changed during transaction");
}
};
boolean isEventFiltered =
InputRecordProcessor.processRecord(
spannerRec,
schemaMapper,
ddl,
sourceSchema,
sourceDao,
shardId,
sourceDbTimezoneOffset,
sourceProcessor.getDmlGenerator(),
spannerToSourceTransformer,
this.source,
check);
isRecordWritten.set(!isEventFiltered);
if (isEventFiltered) {
outputWithTag(
c,
Constants.FILTERED_TAG,
Constants.FILTERED_TAG_MESSAGE,
spannerRec);
}

spannerDao.updateShadowTable(
getShadowTableMutation(
tableName,
shadowTableName,
keysJson,
spannerRec.getCommitTimestamp(),
spannerRec.getRecordSequence(),
ddl),
shadowTransaction);
}
return null;
});
successRecordCountMetric.inc();
spannerDao.updateShadowTable(
getShadowTableMutation(
tableName,
shadowTableName,
keysJson,
spannerRec.getCommitTimestamp(),
spannerRec.getRecordSequence(),
ddl),
shadowTransaction);
}
return isRecordWritten.get();
});
if (Boolean.TRUE.equals(transactionResult)) {
successRecordCountMetric.inc();
Counter recordsWrittenToSource =
Metrics.counter(shardId, "records_written_to_source_" + shardId);
recordsWrittenToSource.inc(1);
Distribution lagMetric =
Metrics.distribution(shardId, "replication_lag_in_seconds_" + shardId);
Instant instTime = Instant.now();
Instant commitTsInst = spannerRec.getCommitTimestamp().toSqlTimestamp().toInstant();
long replicationLag = ChronoUnit.SECONDS.between(commitTsInst, instTime);
lagMetric.update(replicationLag);
SUCCESSFUL_WRITE_LATENCY_MS.update(timer.elapsed(TimeUnit.MILLISECONDS));
}
if (spannerRec.isRetryRecord()) {
retryableRecordCountMetric.dec();
}
Expand All @@ -306,7 +326,6 @@ public void processElement(ProcessContext c) {
// Since we have wrapped the logic inside Spanner transaction, the exceptions would also be
// wrapped inside a SpannerException.
// We need to get and inspect the cause while handling the exception.
SUCCESSFUL_WRITE_LATENCY_MS.update(timer.elapsed(TimeUnit.MILLISECONDS));
} catch (Exception ex) {
Throwable cause = ex.getCause();
String message = ex.getMessage();
Expand Down
Loading