Skip to content

Commit 88b5b28

Browse files
Exclude forward transaction from change streams (#2601)
1 parent 00f1184 commit 88b5b28

File tree

15 files changed

+34
-16
lines changed

15 files changed

+34
-16
lines changed

v2/datastream-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFn.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,7 @@ private void processSingleDatabaseTransaction(
383383
.getDatabaseClient()
384384
.readWriteTransaction(
385385
Options.tag(getTxnTag(c.getPipelineOptions())),
386+
Options.excludeTxnFromChangeStreams(),
386387
Options.priority(spannerConfig.getRpcPriority().get()))
387388
.run(
388389
(TransactionRunner.TransactionCallable<Void>)
@@ -439,6 +440,7 @@ void processCrossDatabaseTransaction(
439440
.getDatabaseClient()
440441
.readWriteTransaction(
441442
Options.tag(getTxnTag(c.getPipelineOptions())),
443+
Options.excludeTxnFromChangeStreams(),
442444
Options.priority(spannerConfig.getRpcPriority().get()))
443445
.allowNestedTransaction()
444446
.run(
@@ -463,6 +465,7 @@ void processCrossDatabaseTransaction(
463465
.getDatabaseClient()
464466
.readWriteTransaction(
465467
Options.tag(getTxnTag(c.getPipelineOptions())),
468+
Options.excludeTxnFromChangeStreams(),
466469
Options.priority(spannerConfig.getRpcPriority().get()))
467470
.run(
468471
(TransactionRunner.TransactionCallable<Void>)

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SpannerTransactionWriterDoFnTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ public void testProcessElement() {
166166
TransactionRunner.TransactionCallable<Void> callable = invocation.getArgument(0);
167167
return callable.run(transactionContext);
168168
});
169-
when(databaseClientMock.readWriteTransaction(any(), any())).thenReturn(transactionCallableMock);
169+
when(databaseClientMock.readWriteTransaction(any(), any(), any()))
170+
.thenReturn(transactionCallableMock);
170171

171172
SpannerTransactionWriterDoFn spannerTransactionWriterDoFn =
172173
new SpannerTransactionWriterDoFn(
@@ -307,7 +308,8 @@ public void testProcessElementWithRetryableSpannerException() {
307308
throw SpannerExceptionFactory.newSpannerException(
308309
ErrorCode.ABORTED, "Transaction Aborted");
309310
});
310-
when(databaseClientMock.readWriteTransaction(any(), any())).thenReturn(transactionCallableMock);
311+
when(databaseClientMock.readWriteTransaction(any(), any(), any()))
312+
.thenReturn(transactionCallableMock);
311313

312314
SpannerTransactionWriterDoFn spannerTransactionWriterDoFn =
313315
new SpannerTransactionWriterDoFn(
@@ -372,7 +374,8 @@ public void testProcessElementWithNonRetryableSpannerException() {
372374
throw SpannerExceptionFactory.newSpannerException(
373375
ErrorCode.FAILED_PRECONDITION, "title must not be NULL in table Books");
374376
});
375-
when(databaseClientMock.readWriteTransaction(any(), any())).thenReturn(transactionCallableMock);
377+
when(databaseClientMock.readWriteTransaction(any(), any(), any()))
378+
.thenReturn(transactionCallableMock);
376379

377380
SpannerTransactionWriterDoFn spannerTransactionWriterDoFn =
378381
new SpannerTransactionWriterDoFn(

v2/spanner-to-sourcedb/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ A few prerequisites must be considered before starting with reverse replication.
109109
CREATE CHANGE STREAM allstream
110110
FOR ALL OPTIONS (
111111
retention_period = '7d',
112-
value_capture_type = 'NEW_ROW'
112+
value_capture_type = 'NEW_ROW',
113+
allow_txn_exclusion = true
113114
);
114115
```
115116
15. The Dataflow template creates a pool of database connections per Dataflow worker. The maxShardConnections template parameter, defaulting to 10,000 represents the maximum connections allowed for a given database. The maxWorkers Dataflow configuration should not exceed the maxShardConnections value, else the template launch will fail as we do not want to overload the database.

v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbITBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ protected SpannerResourceManager createSpannerDBAndTableWithNColumns(
358358
}
359359

360360
String ddlStream =
361-
"CREATE CHANGE STREAM allstream FOR ALL OPTIONS (value_capture_type = 'NEW_ROW', retention_period = '7d')";
361+
"CREATE CHANGE STREAM allstream FOR ALL OPTIONS (value_capture_type = 'NEW_ROW', retention_period = '7d', allow_txn_exclusion = true)";
362362
try {
363363
spannerResourceManager.executeDdlStatement(ddlStream);
364364
} catch (Exception e) {

v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/spanner-schema.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,5 +237,6 @@ CREATE TABLE testtable_03tpcovf16ed0klxm3v808ch3btgq0uk (
237237
CREATE CHANGE STREAM allstream
238238
FOR ALL OPTIONS (
239239
value_capture_type = 'NEW_ROW',
240-
retention_period = '7d'
240+
retention_period = '7d',
241+
allow_txn_exclusion = true
241242
);

v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/spanner-transformation-schema.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@ CREATE TABLE IF NOT EXISTS customers (
1010
CREATE CHANGE STREAM allstream
1111
FOR ALL OPTIONS (
1212
value_capture_type = 'NEW_ROW',
13-
retention_period = '7d'
13+
retention_period = '7d',
14+
allow_txn_exclusion = true
1415
);

v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceLT/spanner-schema.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@ CREATE TABLE `person` (
1111
CREATE CHANGE STREAM allstream
1212
FOR ALL OPTIONS (
1313
value_capture_type = 'NEW_ROW',
14-
retention_period = '7d'
14+
retention_period = '7d',
15+
allow_txn_exclusion = true
1516
);

v2/spanner-to-sourcedb/src/test/resources/SpannerToMySqlSourceLT/spanner-schema.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@ CREATE TABLE IF NOT EXISTS `Person` (
1111
CREATE CHANGE STREAM allstream
1212
FOR ALL OPTIONS (
1313
value_capture_type = 'NEW_ROW',
14-
retention_period = '7d'
14+
retention_period = '7d',
15+
allow_txn_exclusion = true
1516
);

v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomShardIT/spanner-schema.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@ CREATE TABLE IF NOT EXISTS Singers (
66
CREATE CHANGE STREAM allstream
77
FOR ALL OPTIONS (
88
value_capture_type = 'NEW_ROW',
9-
retention_period = '7d'
9+
retention_period = '7d',
10+
allow_txn_exclusion = true
1011
);

v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbCustomTransformationIT/spanner-schema.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,6 @@ CREATE TABLE IF NOT EXISTS AllDatatypeTransformation (
3030
CREATE CHANGE STREAM allstream
3131
FOR ALL OPTIONS (
3232
value_capture_type = 'NEW_ROW',
33-
retention_period = '7d'
33+
retention_period = '7d',
34+
allow_txn_exclusion = true
3435
);

0 commit comments

Comments
 (0)