Skip to content

Commit 5d1163c

Browse files
committed
Update ChangeStreamDao to query different TVF for postgresSQL based on
the change stream partition mode For MUTABLE_KEY_RANGE change stream, use read_proto_bytes_, else use read_json_
1 parent 01e1cf6 commit 5d1163c

File tree

5 files changed

+368
-114
lines changed

5 files changed

+368
-114
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java

Lines changed: 127 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import com.google.cloud.spanner.Options;
4747
import com.google.cloud.spanner.Options.RpcPriority;
4848
import com.google.cloud.spanner.PartitionOptions;
49+
import com.google.cloud.spanner.ReadOnlyTransaction;
50+
import com.google.cloud.spanner.ResultSet;
4951
import com.google.cloud.spanner.Spanner;
5052
import com.google.cloud.spanner.SpannerException;
5153
import com.google.cloud.spanner.SpannerOptions;
@@ -177,10 +179,10 @@
177179
* .withQuery("SELECT id, name, email FROM users"));
178180
* }</pre>
179181
*
180-
* <p>Reads by default use the <a
181-
* href="https://cloud.google.com/spanner/docs/reads#read_data_in_parallel">PartitionQuery API</a>
182-
* which enforces some limitations on the type of queries that can be used so that the data can be
183-
* read in parallel. If the query is not supported by the PartitionQuery API, then you can specify a
182+
* <p>Reads by default use the <a href=
183+
* "https://cloud.google.com/spanner/docs/reads#read_data_in_parallel">PartitionQuery API</a> which
184+
* enforces some limitations on the type of queries that can be used so that the data can be read in
185+
* parallel. If the query is not supported by the PartitionQuery API, then you can specify a
184186
* non-partitioned read by setting {@link Read#withBatching(boolean) withBatching(false)}. If the
185187
* amount of data being read by a non-partitioned read is very large, it may be useful to add a
186188
* {@link Reshuffle#viaRandomKey()} transform on the output so that the downstream transforms can
@@ -191,33 +193,33 @@
191193
*
192194
* <pre>{@code
193195
* PCollection<Struct> rows = p.apply(
194-
* SpannerIO.read()
195-
* .withInstanceId(instanceId)
196-
* .withDatabaseId(dbId)
197-
* .withTable("users")
198-
* .withColumns("id", "name", "email"));
196+
* SpannerIO.read()
197+
* .withInstanceId(instanceId)
198+
* .withDatabaseId(dbId)
199+
* .withTable("users")
200+
* .withColumns("id", "name", "email"));
199201
* }</pre>
200202
*
201203
* <p>To read using an <strong>Index</strong>, specify the index name using {@link
202204
* Read#withIndex(String)}.
203205
*
204206
* <pre>{@code
205207
* PCollection<Struct> rows = p.apply(
206-
* SpannerIO.read()
207-
* .withInstanceId(instanceId)
208-
* .withDatabaseId(dbId)
209-
* .withTable("users")
210-
* .withIndex("users_by_name")
211-
* .withColumns("id", "name", "email"));
208+
* SpannerIO.read()
209+
* .withInstanceId(instanceId)
210+
* .withDatabaseId(dbId)
211+
* .withTable("users")
212+
* .withIndex("users_by_name")
213+
* .withColumns("id", "name", "email"));
212214
* }</pre>
213215
*
214216
* <h4>Read consistency</h4>
215217
*
216218
* <p>The transform is guaranteed to be executed on a consistent snapshot of data, utilizing the
217219
* power of read only transactions. Staleness of data can be controlled using {@link
218-
* Read#withTimestampBound} or {@link Read#withTimestamp(Timestamp)} methods. <a
219-
* href="https://cloud.google.com/spanner/docs/transactions#read-only_transactions">Read more</a>
220-
* about transactions in Cloud Spanner.
220+
* Read#withTimestampBound} or {@link Read#withTimestamp(Timestamp)} methods. <a href=
221+
* "https://cloud.google.com/spanner/docs/transactions#read-only_transactions">Read more</a> about
222+
* transactions in Cloud Spanner.
221223
*
222224
* <p>It is possible to read several {@link PCollection PCollections} within a single transaction.
223225
* Apply {@link SpannerIO#createTransaction()} transform, that lazily creates a transaction. The
@@ -303,8 +305,8 @@
303305
* withMaxNumMutations()} or {@link Write#withMaxNumRows(long) withMaxNumRows()}. Setting either to
304306
* a small value or zero disables batching.
305307
*
306-
* <p>Note that the <a
307-
* href="https://cloud.google.com/spanner/quotas#limits_for_creating_reading_updating_and_deleting_data">maximum
308+
* <p>Note that the <a href=
309+
* "https://cloud.google.com/spanner/quotas#limits_for_creating_reading_updating_and_deleting_data">maximum
308310
* size of a single transaction</a> is 20,000 mutated cells - including cells in indexes. If you
309311
* have a large number of indexes and are getting exceptions with message: <tt>INVALID_ARGUMENT: The
310312
* transaction contains too many mutations</tt> you will need to specify a smaller number of {@code
@@ -435,9 +437,8 @@
435437
* <h3>Updates to the I/O connector code</h3>
436438
*
437439
* For any significant significant updates to this I/O connector, please consider involving
438-
* corresponding code reviewers mentioned <a
439-
* href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS">
440-
* here</a>.
440+
* corresponding code reviewers mentioned <a href=
441+
* "https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS"> here</a>.
441442
*/
442443
@SuppressWarnings({
443444
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
@@ -455,8 +456,10 @@ public class SpannerIO {
455456
private static final int DEFAULT_GROUPING_FACTOR = 1000;
456457

457458
// Size of caches for read/write ServiceCallMetric objects .
458-
// This is a reasonable limit, as for reads, each worker will process very few different table
459-
// read requests, and for writes, batching will ensure that write operations for the same
459+
// This is a reasonable limit, as for reads, each worker will process very few
460+
// different table
461+
// read requests, and for writes, batching will ensure that write operations for
462+
// the same
460463
// table occur at the same time (within a bundle).
461464
static final int METRICS_CACHE_SIZE = 100;
462465

@@ -673,10 +676,10 @@ public ReadAll withTimestampBound(TimestampBound timestampBound) {
673676
}
674677

675678
/**
676-
* By default the <a
677-
* href="https://cloud.google.com/spanner/docs/reads#read_data_in_parallel">PartitionQuery
678-
* API</a> is used to read data from Cloud Spanner. It is useful to disable batching when the
679-
* underlying query is not root-partitionable.
679+
* By default the <a href=
680+
* "https://cloud.google.com/spanner/docs/reads#read_data_in_parallel">PartitionQuery API</a> is
681+
* used to read data from Cloud Spanner. It is useful to disable batching when the underlying
682+
* query is not root-partitionable.
680683
*/
681684
public ReadAll withBatching(boolean batching) {
682685
return toBuilder().setBatching(batching).build();
@@ -968,8 +971,8 @@ public Read withIndex(String index) {
968971
}
969972

970973
/**
971-
* Note that {@link PartitionOptions} are currently ignored. See <a
972-
* href="https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.PartitionOptions">
974+
* Note that {@link PartitionOptions} are currently ignored. See <a href=
975+
* "https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.PartitionOptions">
973976
* PartitionOptions in RPC documents</a>
974977
*/
975978
public Read withPartitionOptions(PartitionOptions partitionOptions) {
@@ -1420,8 +1423,8 @@ public Write withCommitDeadline(Duration commitDeadline) {
14201423
/**
14211424
* Specifies max commit delay for the Commit API call for throughput optimized writes. If not
14221425
* set, Spanner might set a small delay if it thinks that will amortize the cost of the writes.
1423-
* For more information about the feature, <a
1424-
* href="https://cloud.google.com/spanner/docs/throughput-optimized-writes#default-behavior">see
1426+
* For more information about the feature, <a href=
1427+
* "https://cloud.google.com/spanner/docs/throughput-optimized-writes#default-behavior">see
14251428
* documentation</a>
14261429
*/
14271430
public Write withMaxCommitDelay(long millis) {
@@ -1540,7 +1543,8 @@ private void populateDisplayDataWithParamaters(DisplayData.Builder builder) {
15401543
builder.add(
15411544
DisplayData.item("maxNumRows", getMaxNumRows())
15421545
.withLabel("Max number of rows in each batch"));
1543-
// Grouping factor default value depends on whether it is a batch or streaming pipeline.
1546+
// Grouping factor default value depends on whether it is a batch or streaming
1547+
// pipeline.
15441548
// This function is not aware of that state, so use 'DEFAULT' if unset.
15451549
builder.add(
15461550
DisplayData.item(
@@ -1989,7 +1993,8 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
19891993
.orElse(PartitionMetadataTableNames.generateRandom(partitionMetadataDatabaseId));
19901994
final String changeStreamName = getChangeStreamName();
19911995
final Timestamp startTimestamp = getInclusiveStartAt();
1992-
// Uses (Timestamp.MAX - 1ns) at max for end timestamp to indicate this connector is expected
1996+
// Uses (Timestamp.MAX - 1ns) at max for end timestamp to indicate this
1997+
// connector is expected
19931998
// to run forever.
19941999
final Timestamp endTimestamp =
19952000
getInclusiveEndAt().compareTo(MAX_INCLUSIVE_END_AT) > 0
@@ -1998,6 +2003,11 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
19982003
final MapperFactory mapperFactory = new MapperFactory(changeStreamDatabaseDialect);
19992004
final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
20002005
final RpcPriority rpcPriority = MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
2006+
final SpannerAccessor spannerAccessor =
2007+
SpannerAccessor.getOrCreate(changeStreamSpannerConfig);
2008+
final boolean isMutableChangeStream =
2009+
isMutableChangeStream(
2010+
spannerAccessor.getDatabaseClient(), changeStreamDatabaseDialect, changeStreamName);
20012011
final DaoFactory daoFactory =
20022012
new DaoFactory(
20032013
changeStreamSpannerConfig,
@@ -2007,7 +2017,8 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
20072017
rpcPriority,
20082018
input.getPipeline().getOptions().getJobName(),
20092019
changeStreamDatabaseDialect,
2010-
metadataDatabaseDialect);
2020+
metadataDatabaseDialect,
2021+
isMutableChangeStream);
20112022
final ActionFactory actionFactory = new ActionFactory();
20122023

20132024
final Duration watermarkRefreshRate =
@@ -2097,7 +2108,8 @@ static SpannerConfig buildSpannerConfigWithCredential(
20972108
return spannerConfig;
20982109
}
20992110

2100-
private static Dialect getDialect(SpannerConfig spannerConfig, PipelineOptions pipelineOptions) {
2111+
protected static Dialect getDialect(
2112+
SpannerConfig spannerConfig, PipelineOptions pipelineOptions) {
21012113
// Allow passing the credential from pipeline options to the getDialect() call.
21022114
SpannerConfig spannerConfigWithCredential =
21032115
buildSpannerConfigWithCredential(spannerConfig, pipelineOptions);
@@ -2198,7 +2210,8 @@ private synchronized void sortAndOutputBatches(OutputReceiver<Iterable<MutationG
21982210
}
21992211

22002212
if (maxSortableNumMutations == maxBatchNumMutations) {
2201-
// no grouping is occurring, no need to sort and make batches, just output what we have.
2213+
// no grouping is occurring, no need to sort and make batches, just output what
2214+
// we have.
22022215
outputBatch(out, 0, mutationsToSort.size());
22032216
return;
22042217
}
@@ -2279,7 +2292,8 @@ public synchronized void processElement(
22792292
}
22802293
}
22812294

2282-
// Container class to store a MutationGroup, its sortable encoded key and its statistics.
2295+
// Container class to store a MutationGroup, its sortable encoded key and its
2296+
// statistics.
22832297
private static final class MutationGroupContainer
22842298
implements Comparable<MutationGroupContainer> {
22852299

@@ -2308,7 +2322,8 @@ public int compareTo(MutationGroupContainer o) {
23082322
}
23092323
}
23102324

2311-
// TODO(https://github.com/apache/beam/issues/18203): Remove this when FinishBundle has added
2325+
// TODO(https://github.com/apache/beam/issues/18203): Remove this when
2326+
// FinishBundle has added
23122327
// support for an {@link OutputReceiver}
23132328
private static class OutputReceiverForFinishBundle
23142329
implements OutputReceiver<Iterable<MutationGroup>> {
@@ -2409,9 +2424,11 @@ static class WriteToSpannerFn extends DoFn<Iterable<MutationGroup>, Void> {
24092424
private final SpannerConfig spannerConfig;
24102425
private final FailureMode failureMode;
24112426

2412-
// SpannerAccessor can not be serialized so must be initialized at runtime in setup().
2427+
// SpannerAccessor can not be serialized so must be initialized at runtime in
2428+
// setup().
24132429
private transient SpannerAccessor spannerAccessor;
2414-
// resolved at runtime for metrics report purpose. SpannerConfig may not have projectId set.
2430+
// resolved at runtime for metrics report purpose. SpannerConfig may not have
2431+
// projectId set.
24152432
private transient String projectId;
24162433
/* Number of times an aborted write to spanner could be retried */
24172434
private static final int ABORTED_RETRY_ATTEMPTS = 5;
@@ -2420,7 +2437,10 @@ static class WriteToSpannerFn extends DoFn<Iterable<MutationGroup>, Void> {
24202437
"Transaction aborted. "
24212438
+ "Database schema probably changed during transaction, retry may succeed.";
24222439

2423-
/* Error string in Aborted exception for concurrent transaction in Spanner Emulator */
2440+
/*
2441+
* Error string in Aborted exception for concurrent transaction in Spanner
2442+
* Emulator
2443+
*/
24242444
private final String emulatorErrorString =
24252445
"The emulator only supports one transaction at a time.";
24262446

@@ -2472,8 +2492,10 @@ public void setup() {
24722492
.withMaxCumulativeBackoff(spannerConfig.getMaxCumulativeBackoff().get())
24732493
.withInitialBackoff(spannerConfig.getMaxCumulativeBackoff().get().dividedBy(60));
24742494

2475-
// Use a LoadingCache for metrics as there can be different tables being written to which
2476-
// result in different service call metrics labels. ServiceCallMetric items are created
2495+
// Use a LoadingCache for metrics as there can be different tables being written
2496+
// to which
2497+
// result in different service call metrics labels. ServiceCallMetric items are
2498+
// created
24772499
// on-demand and added to the cache.
24782500
writeMetricsByTableName =
24792501
CacheBuilder.newBuilder()
@@ -2535,9 +2557,10 @@ public void processElement(ProcessContext c) throws Exception {
25352557
}
25362558

25372559
/*
2538-
Spanner aborts all inflight transactions during a schema change. Client is expected
2539-
to retry silently. These must not be counted against retry backoff.
2540-
*/
2560+
* Spanner aborts all inflight transactions during a schema change. Client is
2561+
* expected
2562+
* to retry silently. These must not be counted against retry backoff.
2563+
*/
25412564
private void spannerWriteWithRetryIfSchemaChange(List<Mutation> batch) throws SpannerException {
25422565
Set<String> tableNames = batch.stream().map(Mutation::getTable).collect(Collectors.toSet());
25432566
for (int retry = 1; ; retry++) {
@@ -2623,7 +2646,8 @@ private void writeMutations(Iterable<Mutation> mutationIterable)
26232646

26242647
while (true) {
26252648
Stopwatch timer = Stopwatch.createStarted();
2626-
// loop is broken on success, timeout backoff/retry attempts exceeded, or other failure.
2649+
// loop is broken on success, timeout backoff/retry attempts exceeded, or other
2650+
// failure.
26272651
try {
26282652
spannerWriteWithRetryIfSchemaChange(mutations);
26292653
spannerWriteSuccess.inc();
@@ -2688,4 +2712,58 @@ static String resolveSpannerProjectId(SpannerConfig config) {
26882712
? SpannerOptions.getDefaultProjectId()
26892713
: config.getProjectId().get();
26902714
}
2715+
2716+
@VisibleForTesting
2717+
static boolean isMutableChangeStream(
2718+
DatabaseClient databaseClient, Dialect dialect, String changeStreamName) {
2719+
String fetchedPartitionMode = fetchPartitionMode(databaseClient, dialect, changeStreamName);
2720+
if (fetchedPartitionMode.isEmpty()
2721+
|| fetchedPartitionMode.equalsIgnoreCase("IMMUTABLE_KEY_RANGE")) {
2722+
return false;
2723+
}
2724+
return true;
2725+
}
2726+
2727+
private static String fetchPartitionMode(
2728+
DatabaseClient databaseClient, Dialect dialect, String changeStreamName) {
2729+
try (ReadOnlyTransaction tx = databaseClient.readOnlyTransaction()) {
2730+
Statement statement;
2731+
if (dialect == Dialect.POSTGRESQL) {
2732+
statement =
2733+
Statement.newBuilder(
2734+
"select option_value\n"
2735+
+ "from information_schema.change_stream_options\n"
2736+
+ "where change_stream_name = $1 and option_name = 'partition_mode'")
2737+
.bind("p1")
2738+
.to(changeStreamName)
2739+
.build();
2740+
} else {
2741+
statement =
2742+
Statement.newBuilder(
2743+
"select option_value\n"
2744+
+ "from information_schema.change_stream_options\n"
2745+
+ "where change_stream_name = @changeStreamName and option_name = 'partition_mode'")
2746+
.bind("changeStreamName")
2747+
.to(changeStreamName)
2748+
.build();
2749+
}
2750+
ResultSet resultSet = tx.executeQuery(statement);
2751+
while (resultSet.next()) {
2752+
String value = resultSet.getString(0);
2753+
if (value != null) {
2754+
return value;
2755+
}
2756+
}
2757+
return "";
2758+
} catch (RuntimeException e) {
2759+
// Log the failure (with stack trace) but rethrow so the caller still observes
2760+
// the error.
2761+
LOG.warn(
2762+
"Failed to fetch partition_mode for change stream '{}', dialect={} - will propagate exception",
2763+
changeStreamName,
2764+
dialect,
2765+
e);
2766+
throw e;
2767+
}
2768+
}
26912769
}

0 commit comments

Comments
 (0)