Skip to content

Commit 9ad5345

Browse files
sm745052Sandeep Mishra
andauthored
Exposing spannerio maxCommitDelay as a DF job parameter. (#2928)
set maxCommitDelay=-1 to use the default setting --------- Co-authored-by: Sandeep Mishra <iamsandeep@google.com>
1 parent e6c8692 commit 9ad5345

File tree

3 files changed

+34
-6
lines changed

3 files changed

+34
-6
lines changed

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/SourceDbToSpannerOptions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,4 +415,18 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
415415
String getFailureInjectionParameter();
416416

417417
void setFailureInjectionParameter(String value);
418+
419+
@TemplateParameter.Text(
420+
order = 33,
421+
optional = true,
422+
description =
423+
"Maximum commit delay time (in milliseconds) to optimize write throughput in Spanner. Reference https://cloud.google.com/spanner/docs/throughput-optimized-writes",
424+
helpText =
425+
"Maximum commit delay time to optimize write throughput in Spanner. Reference https://cloud.google.com/spanner/docs/throughput-optimized-writes."
426+
+ "Set -1 to let spanner choose the default. Set to a positive value to override for best suited tradeoff of throughput vs latency."
427+
+ "Defaults to -1.")
428+
@Default.Long(-1)
429+
Long getMaxCommitDelay();
430+
431+
void setMaxCommitDelay(Long value);
418432
}

v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/SourceDbToSpanner.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,16 @@ private static PipelineResult executeJdbcMigration(
131131

132132
@VisibleForTesting
133133
static SpannerConfig createSpannerConfig(SourceDbToSpannerOptions options) {
134-
return SpannerConfig.create()
135-
.withProjectId(ValueProvider.StaticValueProvider.of(options.getProjectId()))
136-
.withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
137-
.withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId()))
138-
.withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId()))
139-
.withRpcPriority(ValueProvider.StaticValueProvider.of(options.getSpannerPriority()));
134+
SpannerConfig spannerConfig =
135+
SpannerConfig.create()
136+
.withProjectId(ValueProvider.StaticValueProvider.of(options.getProjectId()))
137+
.withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
138+
.withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId()))
139+
.withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId()))
140+
.withRpcPriority(ValueProvider.StaticValueProvider.of(options.getSpannerPriority()));
141+
if (options.getMaxCommitDelay() >= 0) {
142+
spannerConfig = spannerConfig.withMaxCommitDelay(options.getMaxCommitDelay());
143+
}
144+
return spannerConfig;
140145
}
141146
}

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
*/
1616
package com.google.cloud.teleport.v2.templates;
1717

18+
import static com.google.common.truth.Truth.assertThat;
1819
import static org.junit.Assert.assertEquals;
1920
import static org.mockito.Mockito.mock;
2021
import static org.mockito.Mockito.when;
2122

2223
import com.google.cloud.teleport.v2.options.SourceDbToSpannerOptions;
2324
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
25+
import org.joda.time.Duration;
2426
import org.junit.Test;
2527
import org.mockito.Mockito;
2628

@@ -34,10 +36,17 @@ public void testCreateSpannerConfig() {
3436
when(mockOptions.getSpannerHost()).thenReturn("testHost");
3537
when(mockOptions.getInstanceId()).thenReturn("testInstance");
3638
when(mockOptions.getDatabaseId()).thenReturn("testDatabaseId");
39+
when(mockOptions.getMaxCommitDelay()).thenReturn(-1L).thenReturn(42L);
3740

41+
// For first set of mocks.
3842
SpannerConfig config = SourceDbToSpanner.createSpannerConfig(mockOptions);
3943
assertEquals(config.getProjectId().get(), "testProject");
4044
assertEquals(config.getInstanceId().get(), "testInstance");
4145
assertEquals(config.getDatabaseId().get(), "testDatabaseId");
46+
assertThat(config.getMaxCommitDelay()).isNull();
47+
48+
// For second set of mocks.
49+
config = SourceDbToSpanner.createSpannerConfig(mockOptions);
50+
assertEquals(config.getMaxCommitDelay().get(), Duration.millis(42L));
4251
}
4352
}

0 commit comments

Comments
 (0)