Skip to content

Commit 9ac64e0

Browse files
authored
Spanner: Extend SpannerIO support for experimental hosts (#36048)
1 parent f10fbf0 commit 9ac64e0

File tree

5 files changed

+261
-0
lines changed

5 files changed

+261
-0
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ public class SpannerAccessor implements AutoCloseable {
6161
*/
6262
private static final String USER_AGENT_PREFIX = "Apache_Beam_Java";
6363

64+
/** Instance ID to use when connecting to an experimental host. */
65+
public static final String EXPERIMENTAL_HOST_INSTANCE_ID = "default";
66+
6467
// Only create one SpannerAccessor for each different SpannerConfig.
6568
private static final ConcurrentHashMap<SpannerConfig, SpannerAccessor> spannerAccessors =
6669
new ConcurrentHashMap<>();
@@ -220,6 +223,24 @@ static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) {
220223
builder.setServiceFactory(serviceFactory);
221224
}
222225
builder.setHost(spannerConfig.getHostValue());
226+
227+
ValueProvider<String> experimentalHost = spannerConfig.getExperimentalHost();
228+
if (experimentalHost != null && !Strings.isNullOrEmpty(experimentalHost.get())) {
229+
builder.setExperimentalHost(experimentalHost.get());
230+
ValueProvider<Boolean> plainText = spannerConfig.getPlainText();
231+
ValueProvider<String> instanceId = spannerConfig.getInstanceId();
232+
if (Strings.isNullOrEmpty(instanceId.get())
233+
|| !instanceId.get().equals(EXPERIMENTAL_HOST_INSTANCE_ID)) {
234+
throw new IllegalArgumentException(
235+
"Experimental host can only be used with instance id: "
236+
+ EXPERIMENTAL_HOST_INSTANCE_ID);
237+
}
238+
if (plainText != null && Boolean.TRUE.equals(plainText.get())) {
239+
builder.setChannelConfigurator(b -> b.usePlaintext());
240+
builder.setCredentials(NoCredentials.getInstance());
241+
}
242+
}
243+
223244
ValueProvider<String> emulatorHost = spannerConfig.getEmulatorHost();
224245
if (emulatorHost != null) {
225246
builder.setEmulatorHost(emulatorHost.get());

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ public abstract class SpannerConfig implements Serializable {
4848
private static final Duration DEFAULT_COMMIT_DEADLINE = Duration.standardSeconds(15);
4949
// Total allowable backoff time.
5050
private static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = Duration.standardMinutes(15);
51+
// Instance id of experimental hosts
52+
private static final ValueProvider<String> EXPERIMENTAL_HOST_INSTANCE_ID =
53+
ValueProvider.StaticValueProvider.of("default");
5154
// A default priority for batch traffic.
5255
static final RpcPriority DEFAULT_RPC_PRIORITY = RpcPriority.MEDIUM;
5356

@@ -68,6 +71,8 @@ public String getHostValue() {
6871

6972
public abstract @Nullable ValueProvider<String> getEmulatorHost();
7073

74+
public abstract @Nullable ValueProvider<String> getExperimentalHost();
75+
7176
public abstract @Nullable ValueProvider<Boolean> getIsLocalChannelProvider();
7277

7378
public abstract @Nullable ValueProvider<Duration> getCommitDeadline();
@@ -90,6 +95,8 @@ public String getHostValue() {
9095

9196
public abstract @Nullable ValueProvider<Duration> getPartitionReadTimeout();
9297

98+
public abstract @Nullable ValueProvider<Boolean> getPlainText();
99+
93100
@VisibleForTesting
94101
abstract @Nullable ServiceFactory<Spanner, SpannerOptions> getServiceFactory();
95102

@@ -149,6 +156,8 @@ public abstract static class Builder {
149156

150157
abstract Builder setEmulatorHost(ValueProvider<String> emulatorHost);
151158

159+
abstract Builder setExperimentalHost(ValueProvider<String> experimentalHost);
160+
152161
abstract Builder setIsLocalChannelProvider(ValueProvider<Boolean> isLocalChannelProvider);
153162

154163
abstract Builder setCommitDeadline(ValueProvider<Duration> commitDeadline);
@@ -178,6 +187,8 @@ abstract Builder setExecuteStreamingSqlRetrySettings(
178187

179188
abstract Builder setCredentials(ValueProvider<Credentials> credentials);
180189

190+
abstract Builder setPlainText(ValueProvider<Boolean> plainText);
191+
181192
public abstract SpannerConfig build();
182193
}
183194

@@ -345,4 +356,37 @@ public SpannerConfig withCredentials(Credentials credentials) {
345356
public SpannerConfig withCredentials(ValueProvider<Credentials> credentials) {
346357
return toBuilder().setCredentials(credentials).build();
347358
}
359+
360+
/** Specifies the experimental host to set on SpannerOptions (setExperimentalHost). */
361+
public SpannerConfig withExperimentalHost(ValueProvider<String> experimentalHost) {
362+
return toBuilder()
363+
.setInstanceId(EXPERIMENTAL_HOST_INSTANCE_ID)
364+
.setExperimentalHost(experimentalHost)
365+
.build();
366+
}
367+
368+
/** Specifies the experimental host to set on SpannerOptions (setExperimentalHost). */
369+
public SpannerConfig withExperimentalHost(String experimentalHost) {
370+
return withExperimentalHost(ValueProvider.StaticValueProvider.of(experimentalHost));
371+
}
372+
373+
/**
374+
* Specifies whether to use plaintext channel.
375+
*
376+
* <p>Note: This parameter is only valid when using an experimental host (set via {@code
377+
* withExperimentalHost}).
378+
*/
379+
public SpannerConfig withUsingPlainTextChannel(ValueProvider<Boolean> plainText) {
380+
return toBuilder().setPlainText(plainText).build();
381+
}
382+
383+
/**
384+
* Specifies whether to use plaintext channel.
385+
*
386+
* <p>Note: This parameter is only valid when using an experimental host (set via {@code
387+
* withExperimentalHost}).
388+
*/
389+
public SpannerConfig withUsingPlainTextChannel(boolean plainText) {
390+
return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
391+
}
348392
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,37 @@ public ReadAll withEmulatorHost(String emulatorHost) {
612612
return withEmulatorHost(ValueProvider.StaticValueProvider.of(emulatorHost));
613613
}
614614

615+
/** Specifies the SpannerOptions experimental host (setExperimentalHost). */
616+
public ReadAll withExperimentalHost(ValueProvider<String> experimentalHost) {
617+
SpannerConfig config = getSpannerConfig();
618+
return withSpannerConfig(config.withExperimentalHost(experimentalHost));
619+
}
620+
621+
public ReadAll withExperimentalHost(String experimentalHost) {
622+
return withExperimentalHost(ValueProvider.StaticValueProvider.of(experimentalHost));
623+
}
624+
625+
/**
626+
* Specifies whether to use plaintext channel.
627+
*
628+
* <p>Note: This parameter is only valid when using an experimental host (set via {@code
629+
* withExperimentalHost}).
630+
*/
631+
public ReadAll withUsingPlainTextChannel(ValueProvider<Boolean> plainText) {
632+
SpannerConfig config = getSpannerConfig();
633+
return withSpannerConfig(config.withUsingPlainTextChannel(plainText));
634+
}
635+
636+
/**
637+
* Specifies whether to use plaintext channel.
638+
*
639+
* <p>Note: This parameter is only valid when using an experimental host (set via {@code
640+
* withExperimentalHost}).
641+
*/
642+
public ReadAll withUsingPlainTextChannel(boolean plainText) {
643+
return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
644+
}
645+
615646
/** Specifies the Cloud Spanner database. */
616647
public ReadAll withDatabaseId(ValueProvider<String> databaseId) {
617648
SpannerConfig config = getSpannerConfig();
@@ -841,6 +872,37 @@ public Read withEmulatorHost(String emulatorHost) {
841872
return withEmulatorHost(ValueProvider.StaticValueProvider.of(emulatorHost));
842873
}
843874

875+
/** Specifies the SpannerOptions experimental host (setExperimentalHost). */
876+
public Read withExperimentalHost(ValueProvider<String> experimentalHost) {
877+
SpannerConfig config = getSpannerConfig();
878+
return withSpannerConfig(config.withExperimentalHost(experimentalHost));
879+
}
880+
881+
public Read withExperimentalHost(String experimentalHost) {
882+
return withExperimentalHost(ValueProvider.StaticValueProvider.of(experimentalHost));
883+
}
884+
885+
/**
886+
* Specifies whether to use plaintext channel.
887+
*
888+
* <p>Note: This parameter is only valid when using an experimental host (set via {@code
889+
* withExperimentalHost}).
890+
*/
891+
public Read withUsingPlainTextChannel(ValueProvider<Boolean> plainText) {
892+
SpannerConfig config = getSpannerConfig();
893+
return withSpannerConfig(config.withUsingPlainTextChannel(plainText));
894+
}
895+
896+
/**
897+
* Specifies whether to use plaintext channel.
898+
*
899+
* <p>Note: This parameter is only valid when using an experimental host (set via {@code
900+
* withExperimentalHost}).
901+
*/
902+
public Read withUsingPlainTextChannel(boolean plainText) {
903+
return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
904+
}
905+
844906
/** If true the uses Cloud Spanner batch API. */
845907
public Read withBatching(boolean batching) {
846908
return toBuilder().setBatching(batching).build();
@@ -1137,6 +1199,37 @@ public CreateTransaction withEmulatorHost(String emulatorHost) {
11371199
return withEmulatorHost(ValueProvider.StaticValueProvider.of(emulatorHost));
11381200
}
11391201

1202+
/** Specifies the SpannerOptions experimental host (setExperimentalHost). */
1203+
public CreateTransaction withExperimentalHost(ValueProvider<String> experimentalHost) {
1204+
SpannerConfig config = getSpannerConfig();
1205+
return withSpannerConfig(config.withExperimentalHost(experimentalHost));
1206+
}
1207+
1208+
public CreateTransaction withExperimentalHost(String experimentalHost) {
1209+
return withExperimentalHost(ValueProvider.StaticValueProvider.of(experimentalHost));
1210+
}
1211+
1212+
/**
1213+
* Specifies whether to use plaintext channel.
1214+
*
1215+
* <p>Note: This parameter is only valid when using an experimental host (set via {@code
1216+
* withExperimentalHost}).
1217+
*/
1218+
public CreateTransaction withUsingPlainTextChannel(ValueProvider<Boolean> plainText) {
1219+
SpannerConfig config = getSpannerConfig();
1220+
return withSpannerConfig(config.withUsingPlainTextChannel(plainText));
1221+
}
1222+
1223+
/**
1224+
* Specifies whether to use plaintext channel.
1225+
*
1226+
* <p>Note: This parameter is only valid when using an experimental host (set via {@code
1227+
* withExperimentalHost}).
1228+
*/
1229+
public CreateTransaction withUsingPlainTextChannel(boolean plainText) {
1230+
return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
1231+
}
1232+
11401233
@VisibleForTesting
11411234
CreateTransaction withServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory) {
11421235
SpannerConfig config = getSpannerConfig();
@@ -1274,6 +1367,37 @@ public Write withEmulatorHost(String emulatorHost) {
12741367
return withEmulatorHost(ValueProvider.StaticValueProvider.of(emulatorHost));
12751368
}
12761369

1370+
/** Specifies the SpannerOptions experimental host (setExperimentalHost). */
1371+
public Write withExperimentalHost(ValueProvider<String> experimentalHost) {
1372+
SpannerConfig config = getSpannerConfig();
1373+
return withSpannerConfig(config.withExperimentalHost(experimentalHost));
1374+
}
1375+
1376+
public Write withExperimentalHost(String experimentalHost) {
1377+
return withExperimentalHost(ValueProvider.StaticValueProvider.of(experimentalHost));
1378+
}
1379+
1380+
/**
1381+
* Specifies whether to use plaintext channel.
1382+
*
1383+
* <p>Note: This parameter is only valid when using an experimental host (set via {@code
1384+
* withExperimentalHost}).
1385+
*/
1386+
public Write withUsingPlainTextChannel(ValueProvider<Boolean> plainText) {
1387+
SpannerConfig config = getSpannerConfig();
1388+
return withSpannerConfig(config.withUsingPlainTextChannel(plainText));
1389+
}
1390+
1391+
/**
1392+
* Specifies whether to use plaintext channel.
1393+
*
1394+
* <p>Note: This parameter is only valid when using an experimental host (set via {@code
1395+
* withExperimentalHost}).
1396+
*/
1397+
public Write withUsingPlainTextChannel(boolean plainText) {
1398+
return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
1399+
}
1400+
12771401
public Write withDialectView(PCollectionView<Dialect> dialect) {
12781402
return toBuilder().setDialectView(dialect).build();
12791403
}
@@ -1626,6 +1750,10 @@ public abstract static class ReadChangeStream
16261750

16271751
abstract @Nullable Duration getWatermarkRefreshRate();
16281752

1753+
abstract @Nullable ValueProvider<String> getExperimentalHost();
1754+
1755+
abstract @Nullable ValueProvider<Boolean> getPlainText();
1756+
16291757
abstract Builder toBuilder();
16301758

16311759
@AutoValue.Builder
@@ -1651,6 +1779,10 @@ abstract static class Builder {
16511779

16521780
abstract Builder setWatermarkRefreshRate(Duration refreshRate);
16531781

1782+
abstract Builder setExperimentalHost(ValueProvider<String> experimentalHost);
1783+
1784+
abstract Builder setPlainText(ValueProvider<Boolean> plainText);
1785+
16541786
abstract ReadChangeStream build();
16551787
}
16561788

@@ -1741,6 +1873,38 @@ public ReadChangeStream withWatermarkRefreshRate(Duration refreshRate) {
17411873
return toBuilder().setWatermarkRefreshRate(refreshRate).build();
17421874
}
17431875

1876+
/** Specifies the experimental host to set on SpannerOptions (setExperimentalHost). */
1877+
public ReadChangeStream withExperimentalHost(ValueProvider<String> experimentalHost) {
1878+
SpannerConfig config = getSpannerConfig();
1879+
return withSpannerConfig(config.withExperimentalHost(experimentalHost));
1880+
}
1881+
1882+
/** Specifies the experimental host to set on SpannerOptions (setExperimentalHost). */
1883+
public ReadChangeStream withExperimentalHost(String experimentalHost) {
1884+
return withExperimentalHost(ValueProvider.StaticValueProvider.of(experimentalHost));
1885+
}
1886+
1887+
/**
1888+
* Specifies whether to use plaintext channel.
1889+
*
1890+
* <p>Note: This parameter is only valid when using an experimental host (set via {@code
1891+
* withExperimentalHost}).
1892+
*/
1893+
public ReadChangeStream withUsingPlainTextChannel(ValueProvider<Boolean> plainText) {
1894+
SpannerConfig config = getSpannerConfig();
1895+
return withSpannerConfig(config.withUsingPlainTextChannel(plainText));
1896+
}
1897+
1898+
/**
1899+
* Specifies whether to use plaintext channel.
1900+
*
1901+
* <p>Note: This parameter is only valid when using an experimental host (set via {@code
1902+
* withExperimentalHost}).
1903+
*/
1904+
public ReadChangeStream withUsingPlainTextChannel(boolean plainText) {
1905+
return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
1906+
}
1907+
17441908
@Override
17451909
public PCollection<DataChangeRecord> expand(PBegin input) {
17461910
checkArgument(

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ public abstract static class CrossLanguageConfiguration {
7979
String projectId = "";
8080
@Nullable String host;
8181
@Nullable String emulatorHost;
82+
@Nullable String experimentalHost;
83+
@Nullable Boolean plainText;
8284

8385
public void setInstanceId(String instanceId) {
8486
this.instanceId = instanceId;
@@ -100,6 +102,14 @@ public void setEmulatorHost(@Nullable String emulatorHost) {
100102
this.emulatorHost = emulatorHost;
101103
}
102104

105+
public void setExperimentalHost(@Nullable String experimentalHost) {
106+
this.experimentalHost = experimentalHost;
107+
}
108+
109+
public void setPlainText(@Nullable Boolean plainText) {
110+
this.plainText = plainText;
111+
}
112+
103113
void checkMandatoryFields() {
104114
if (projectId.isEmpty()) {
105115
throw new IllegalArgumentException("projectId can't be empty");
@@ -233,6 +243,12 @@ public PTransform<PBegin, PCollection<Row>> buildExternal(
233243
if (configuration.emulatorHost != null) {
234244
readTransform = readTransform.withEmulatorHost(configuration.emulatorHost);
235245
}
246+
if (configuration.experimentalHost != null) {
247+
readTransform = readTransform.withExperimentalHost(configuration.experimentalHost);
248+
}
249+
if (configuration.plainText != null) {
250+
readTransform = readTransform.withUsingPlainTextChannel(configuration.plainText);
251+
}
236252
@Nullable TimestampBound timestampBound = configuration.getTimestampBound();
237253
if (timestampBound != null) {
238254
readTransform = readTransform.withTimestampBound(timestampBound);
@@ -371,6 +387,12 @@ public PTransform<PCollection<Row>, PDone> buildExternal(
371387
if (configuration.emulatorHost != null) {
372388
writeTransform = writeTransform.withEmulatorHost(configuration.emulatorHost);
373389
}
390+
if (configuration.experimentalHost != null) {
391+
writeTransform = writeTransform.withExperimentalHost(configuration.experimentalHost);
392+
}
393+
if (configuration.plainText != null) {
394+
writeTransform = writeTransform.withUsingPlainTextChannel(configuration.plainText);
395+
}
374396
if (configuration.commitDeadline != null) {
375397
writeTransform = writeTransform.withCommitDeadline(configuration.commitDeadline);
376398
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,16 @@ public static SpannerConfig create(
7777
config = config.withEmulatorHost(StaticValueProvider.of(emulatorHost.get()));
7878
}
7979

80+
ValueProvider<String> experimentalHost = primaryConfig.getExperimentalHost();
81+
if (experimentalHost != null && experimentalHost.get() != null) {
82+
config = config.withExperimentalHost(experimentalHost.get());
83+
}
84+
85+
ValueProvider<Boolean> plainText = primaryConfig.getPlainText();
86+
if (plainText != null && plainText.get() != null) {
87+
config = config.withUsingPlainTextChannel(plainText.get());
88+
}
89+
8090
ValueProvider<Boolean> isLocalChannelProvider = primaryConfig.getIsLocalChannelProvider();
8191
if (isLocalChannelProvider != null) {
8292
config =

0 commit comments

Comments
 (0)