Skip to content

Commit 47bab7e

Browse files
authored
[SpannerIO] Add low-latency configuration in Spanner Change Streams (#37718)
When enabled, low latency mode will stop SDF polling after 1 second or the first heartbeat response received with 100ms heartbeat latency configured. This reduces e2e processing latency by completing bundles faster which is necessary for messages to progress to the next fused stage in runners such as Dataflow.
1 parent 5c5c710 commit 47bab7e

File tree

11 files changed

+382
-57
lines changed

11 files changed

+382
-57
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,14 @@
2020
import static java.util.stream.Collectors.toList;
2121
import static org.apache.beam.sdk.io.gcp.spanner.MutationUtils.isPointDelete;
2222
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME;
23+
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_HEARTBEAT_MILLIS;
2324
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_END_AT;
2425
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_START_AT;
26+
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL;
2527
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY;
2628
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_WATERMARK_REFRESH_RATE;
29+
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.LOW_LATENCY_DEFAULT_HEARTBEAT_MILLIS;
30+
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.LOW_LATENCY_REAL_TIME_CHECKPOINT_INTERVAL;
2731
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
2832
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.THROUGHPUT_WINDOW_SECONDS;
2933
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
@@ -537,6 +541,9 @@ public static ReadChangeStream readChangeStream() {
537541
.setRpcPriority(DEFAULT_RPC_PRIORITY)
538542
.setInclusiveStartAt(DEFAULT_INCLUSIVE_START_AT)
539543
.setInclusiveEndAt(DEFAULT_INCLUSIVE_END_AT)
544+
.setRealTimeCheckpointInterval(DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL)
545+
.setHeartbeatMillis(DEFAULT_HEARTBEAT_MILLIS)
546+
.setCancelQueryOnHeartbeat(false)
540547
.build();
541548
}
542549

@@ -1761,6 +1768,12 @@ public abstract static class ReadChangeStream
17611768

17621769
abstract @Nullable ValueProvider<Boolean> getPlainText();
17631770

1771+
abstract Duration getRealTimeCheckpointInterval();
1772+
1773+
abstract int getHeartbeatMillis();
1774+
1775+
abstract boolean getCancelQueryOnHeartbeat();
1776+
17641777
abstract Builder toBuilder();
17651778

17661779
@AutoValue.Builder
@@ -1790,6 +1803,18 @@ abstract static class Builder {
17901803

17911804
abstract Builder setPlainText(ValueProvider<Boolean> plainText);
17921805

1806+
/**
1807+
* When caught up to real-time, checkpoint processing of change stream this often. This sets a
1808+
* bound on latency of processing if a steady trickle of elements prevents the heartbeat
1809+
* interval from triggering.
1810+
*/
1811+
abstract Builder setRealTimeCheckpointInterval(Duration realTimeCheckpointInterval);
1812+
1813+
/** Heartbeat interval for all change stream queries. */
1814+
abstract Builder setHeartbeatMillis(int heartbeatMillis);
1815+
1816+
abstract Builder setCancelQueryOnHeartbeat(boolean cancelQueryOnHeartbeat);
1817+
17931818
abstract ReadChangeStream build();
17941819
}
17951820

@@ -1912,6 +1937,37 @@ public ReadChangeStream withUsingPlainTextChannel(boolean plainText) {
19121937
return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
19131938
}
19141939

1940+
/**
1941+
* Configures low latency experiment for readChangeStream transform. Example usage:
1942+
*
1943+
* <pre>{@code
1944+
* PCollection<Struct> rows = p.apply(
1945+
* SpannerIO.readChangeStream()
1946+
* .withSpannerConfig(
1947+
* SpannerConfig.create()
1948+
* .withProjectId(projectId)
1949+
* .withInstanceId(instanceId)
1950+
* .withDatabaseId(dbId))
1951+
* .withChangeStreamName(changeStreamName)
1952+
* .withMetadataInstance(metadataInstanceId)
1953+
* .withMetadataDatabase(metadataDatabase)
1954+
* .withInclusiveStartAt(Timestamp.now()))
1955+
* .withLowLatency();
1956+
* }</pre>
1957+
*/
1958+
public ReadChangeStream withLowLatency() {
1959+
// Set both the realtime end timestamp and the heartbeat interval.
1960+
// Heartbeats might not trigger if data arrives continuously (e.g. every 50ms),
1961+
// which could delay the bundle completion up to the runner's default split time (often 5s).
1962+
// Since end-to-end processing requires the bundle to finish and commit,
1963+
// adding a realtime end timeout of 1s bounds this delay and improves latency.
1964+
return toBuilder()
1965+
.setHeartbeatMillis(LOW_LATENCY_DEFAULT_HEARTBEAT_MILLIS)
1966+
.setCancelQueryOnHeartbeat(true)
1967+
.setRealTimeCheckpointInterval(LOW_LATENCY_REAL_TIME_CHECKPOINT_INTERVAL)
1968+
.build();
1969+
}
1970+
19151971
@Override
19161972
public PCollection<DataChangeRecord> expand(PBegin input) {
19171973
checkArgument(
@@ -2018,13 +2074,23 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
20182074
MoreObjects.firstNonNull(getWatermarkRefreshRate(), DEFAULT_WATERMARK_REFRESH_RATE);
20192075
final CacheFactory cacheFactory = new CacheFactory(daoFactory, watermarkRefreshRate);
20202076

2077+
final long heartbeatMillis = getHeartbeatMillis();
2078+
20212079
final InitializeDoFn initializeDoFn =
2022-
new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp);
2080+
new InitializeDoFn(
2081+
daoFactory, mapperFactory, startTimestamp, endTimestamp, heartbeatMillis);
20232082
final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
20242083
new DetectNewPartitionsDoFn(
20252084
daoFactory, mapperFactory, actionFactory, cacheFactory, metrics);
2085+
20262086
final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
2027-
new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics);
2087+
new ReadChangeStreamPartitionDoFn(
2088+
daoFactory,
2089+
mapperFactory,
2090+
actionFactory,
2091+
metrics,
2092+
getRealTimeCheckpointInterval(),
2093+
getCancelQueryOnHeartbeat());
20282094
final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
20292095
new PostProcessingMetricsDoFn(metrics);
20302096

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,15 @@ public class ChangeStreamsConstants {
4949
*/
5050
public static final Timestamp DEFAULT_INCLUSIVE_END_AT = MAX_INCLUSIVE_END_AT;
5151

52+
public static final Duration DEFAULT_REAL_TIME_CHECKPOINT_INTERVAL = Duration.standardMinutes(2);
53+
54+
public static final int DEFAULT_HEARTBEAT_MILLIS = 2000;
55+
56+
public static final int LOW_LATENCY_DEFAULT_HEARTBEAT_MILLIS = 100;
57+
58+
public static final Duration LOW_LATENCY_REAL_TIME_CHECKPOINT_INTERVAL =
59+
Duration.standardSeconds(1);
60+
5261
/** The default priority for a change stream query is {@link RpcPriority#HIGH}. */
5362
public static final RpcPriority DEFAULT_RPC_PRIORITY = RpcPriority.HIGH;
5463

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,10 @@ public synchronized DataChangeRecordAction dataChangeRecordAction(
7171
* @param metrics metrics gathering class
7272
* @return singleton instance of the {@link HeartbeatRecordAction}
7373
*/
74-
public synchronized HeartbeatRecordAction heartbeatRecordAction(ChangeStreamMetrics metrics) {
74+
public synchronized HeartbeatRecordAction heartbeatRecordAction(
75+
ChangeStreamMetrics metrics, boolean cancelQueryOnHeartbeat) {
7576
if (heartbeatRecordActionInstance == null) {
76-
heartbeatRecordActionInstance = new HeartbeatRecordAction(metrics);
77+
heartbeatRecordActionInstance = new HeartbeatRecordAction(metrics, cancelQueryOnHeartbeat);
7778
}
7879
return heartbeatRecordActionInstance;
7980
}
@@ -174,6 +175,7 @@ public synchronized PartitionEventRecordAction partitionEventRecordAction(
174175
* @param partitionEventRecordAction action class to process {@link
175176
* org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord}s
176177
* @param metrics metrics gathering class
178+
* @param realTimeCheckpointInterval the duration added to current time for the end timestamp
177179
* @return single instance of the {@link QueryChangeStreamAction}
178180
*/
179181
public synchronized QueryChangeStreamAction queryChangeStreamAction(
@@ -188,7 +190,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction(
188190
PartitionEndRecordAction partitionEndRecordAction,
189191
PartitionEventRecordAction partitionEventRecordAction,
190192
ChangeStreamMetrics metrics,
191-
boolean isMutableChangeStream) {
193+
boolean isMutableChangeStream,
194+
Duration realTimeCheckpointInterval) {
192195
if (queryChangeStreamActionInstance == null) {
193196
queryChangeStreamActionInstance =
194197
new QueryChangeStreamAction(
@@ -203,7 +206,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction(
203206
partitionEndRecordAction,
204207
partitionEventRecordAction,
205208
metrics,
206-
isMutableChangeStream);
209+
isMutableChangeStream,
210+
realTimeCheckpointInterval);
207211
}
208212
return queryChangeStreamActionInstance;
209213
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,16 @@
4141
public class HeartbeatRecordAction {
4242
private static final Logger LOG = LoggerFactory.getLogger(HeartbeatRecordAction.class);
4343
private final ChangeStreamMetrics metrics;
44+
private final boolean cancelQueryOnHeartbeat;
4445

4546
/**
4647
* Constructs an action class for handling {@link HeartbeatRecord}s.
4748
*
4849
* @param metrics metrics gathering class
4950
*/
50-
HeartbeatRecordAction(ChangeStreamMetrics metrics) {
51+
HeartbeatRecordAction(ChangeStreamMetrics metrics, boolean cancelQueryOnHeartbeat) {
5152
this.metrics = metrics;
53+
this.cancelQueryOnHeartbeat = cancelQueryOnHeartbeat;
5254
}
5355

5456
/**
@@ -76,7 +78,8 @@ public Optional<ProcessContinuation> run(
7678
HeartbeatRecord record,
7779
RestrictionTracker<TimestampRange, Timestamp> tracker,
7880
RestrictionInterrupter<Timestamp> interrupter,
79-
ManualWatermarkEstimator<Instant> watermarkEstimator) {
81+
ManualWatermarkEstimator<Instant> watermarkEstimator,
82+
Timestamp endTimestamp) {
8083

8184
final String token = partition.getPartitionToken();
8285
LOG.debug("[{}] Processing heartbeat record {}", token, record);
@@ -96,6 +99,11 @@ public Optional<ProcessContinuation> run(
9699
watermarkEstimator.setWatermark(timestampInstant);
97100

98101
LOG.debug("[{}] Heartbeat record action completed successfully", token);
99-
return Optional.empty();
102+
if (timestamp.equals(endTimestamp)) {
103+
// this is probably last element in query, let it finish query
104+
return Optional.empty();
105+
}
106+
// no new data, finish reading data
107+
return cancelQueryOnHeartbeat ? Optional.empty() : Optional.of(ProcessContinuation.resume());
100108
}
101109
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public class QueryChangeStreamAction {
9191
private final PartitionEventRecordAction partitionEventRecordAction;
9292
private final ChangeStreamMetrics metrics;
9393
private final boolean isMutableChangeStream;
94+
private final Duration realTimeCheckpointInterval;
9495

9596
/**
9697
* Constructs an action class for performing a change stream query for a given partition.
@@ -109,6 +110,7 @@ public class QueryChangeStreamAction {
109110
* @param partitionEventRecordAction action class to process {@link PartitionEventRecord}s
110111
* @param metrics metrics gathering class
111112
* @param isMutableChangeStream whether the change stream is mutable or not
113+
* @param realTimeCheckpointInterval duration to add to current time
112114
*/
113115
QueryChangeStreamAction(
114116
ChangeStreamDao changeStreamDao,
@@ -122,7 +124,8 @@ public class QueryChangeStreamAction {
122124
PartitionEndRecordAction partitionEndRecordAction,
123125
PartitionEventRecordAction partitionEventRecordAction,
124126
ChangeStreamMetrics metrics,
125-
boolean isMutableChangeStream) {
127+
boolean isMutableChangeStream,
128+
Duration realTimeCheckpointInterval) {
126129
this.changeStreamDao = changeStreamDao;
127130
this.partitionMetadataDao = partitionMetadataDao;
128131
this.changeStreamRecordMapper = changeStreamRecordMapper;
@@ -135,6 +138,7 @@ public class QueryChangeStreamAction {
135138
this.partitionEventRecordAction = partitionEventRecordAction;
136139
this.metrics = metrics;
137140
this.isMutableChangeStream = isMutableChangeStream;
141+
this.realTimeCheckpointInterval = realTimeCheckpointInterval;
138142
}
139143

140144
/**
@@ -244,7 +248,8 @@ public ProcessContinuation run(
244248
(HeartbeatRecord) record,
245249
tracker,
246250
interrupter,
247-
watermarkEstimator);
251+
watermarkEstimator,
252+
endTimestamp);
248253
} else if (record instanceof ChildPartitionsRecord) {
249254
maybeContinuation =
250255
childPartitionsRecordAction.run(
@@ -387,12 +392,12 @@ private boolean isTimestampOutOfRange(SpannerException e) {
387392
&& e.getMessage().contains(OUT_OF_RANGE_ERROR_MESSAGE);
388393
}
389394

390-
// Return (now + 2 mins) as the end timestamp for reading change streams. This is only used if
391-
// users want to run the connector forever. If the end timestamp is reached, we will resume
392-
// processing from that timestamp on a subsequent DoFn execution.
395+
// Return (now + config duration) as the end timestamp for reading change streams. This is only
396+
// used if users want to run the connector forever. If the end timestamp is reached, we
397+
// will resume processing from that timestamp on a subsequent DoFn execution.
393398
private Timestamp getNextReadChangeStreamEndTimestamp() {
394-
final Timestamp current = Timestamp.now();
395-
return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos());
399+
return Timestamp.ofTimeMicroseconds(
400+
Instant.now().plus(realTimeCheckpointInterval).getMillis() * 1000L);
396401
}
397402

398403
// For Mutable Change Stream bounded queries, update the query end timestamp to be within 2

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,7 @@ public class InitializeDoFn extends DoFn<byte[], PartitionMetadata> implements S
3636

3737
private static final long serialVersionUID = -8921188388649003102L;
3838

39-
/** Heartbeat interval for all change stream queries will be of 2 seconds. */
40-
// Be careful when changing this interval, as it needs to be less than the checkpointing interval
41-
// in Dataflow. Otherwise, if there are no records within checkpoint intervals, the consuming of
42-
// a change stream query might get stuck.
43-
private static final long DEFAULT_HEARTBEAT_MILLIS = 2000;
39+
private final long heartbeatMillis;
4440

4541
private final DaoFactory daoFactory;
4642
private final MapperFactory mapperFactory;
@@ -53,11 +49,13 @@ public InitializeDoFn(
5349
DaoFactory daoFactory,
5450
MapperFactory mapperFactory,
5551
com.google.cloud.Timestamp startTimestamp,
56-
com.google.cloud.Timestamp endTimestamp) {
52+
com.google.cloud.Timestamp endTimestamp,
53+
long heartbeatMillis) {
5754
this.daoFactory = daoFactory;
5855
this.mapperFactory = mapperFactory;
5956
this.startTimestamp = startTimestamp;
6057
this.endTimestamp = endTimestamp;
58+
this.heartbeatMillis = heartbeatMillis;
6159
}
6260

6361
@ProcessElement
@@ -88,7 +86,7 @@ private void createFakeParentPartition() {
8886
.setPartitionToken(InitialPartition.PARTITION_TOKEN)
8987
.setStartTimestamp(startTimestamp)
9088
.setEndTimestamp(endTimestamp)
91-
.setHeartbeatMillis(DEFAULT_HEARTBEAT_MILLIS)
89+
.setHeartbeatMillis(heartbeatMillis)
9290
.setState(State.CREATED)
9391
.setWatermark(startTimestamp)
9492
.build();

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,15 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
7474
private final ActionFactory actionFactory;
7575
private final ChangeStreamMetrics metrics;
7676
private final boolean isMutableChangeStream;
77+
private final boolean cancelQueryOnHeartbeat;
7778
/**
7879
* Needs to be set through the {@link
7980
* ReadChangeStreamPartitionDoFn#setThroughputEstimator(BytesThroughputEstimator)} call.
8081
*/
8182
private ThroughputEstimator<DataChangeRecord> throughputEstimator;
8283

84+
private final Duration realTimeCheckpointInterval;
85+
8386
private transient QueryChangeStreamAction queryChangeStreamAction;
8487

8588
/**
@@ -95,17 +98,23 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
9598
* @param mapperFactory the {@link MapperFactory} to construct {@link ChangeStreamRecordMapper}s
9699
* @param actionFactory the {@link ActionFactory} to construct actions
97100
* @param metrics the {@link ChangeStreamMetrics} to emit partition related metrics
101+
* @param realTimeCheckpointInterval duration to be used for the next end timestamp
102+
* @param cancelQueryOnHeartbeat flag to improve low latency checkpointing
98103
*/
99104
public ReadChangeStreamPartitionDoFn(
100105
DaoFactory daoFactory,
101106
MapperFactory mapperFactory,
102107
ActionFactory actionFactory,
103-
ChangeStreamMetrics metrics) {
108+
ChangeStreamMetrics metrics,
109+
Duration realTimeCheckpointInterval,
110+
boolean cancelQueryOnHeartbeat) {
104111
this.daoFactory = daoFactory;
105-
this.mapperFactory = mapperFactory;
106112
this.actionFactory = actionFactory;
113+
this.mapperFactory = mapperFactory;
107114
this.metrics = metrics;
108115
this.isMutableChangeStream = daoFactory.isMutableChangeStream();
116+
this.realTimeCheckpointInterval = realTimeCheckpointInterval;
117+
this.cancelQueryOnHeartbeat = cancelQueryOnHeartbeat;
109118
this.throughputEstimator = new NullThroughputEstimator<>();
110119
}
111120

@@ -195,7 +204,7 @@ public void setup() {
195204
final DataChangeRecordAction dataChangeRecordAction =
196205
actionFactory.dataChangeRecordAction(throughputEstimator);
197206
final HeartbeatRecordAction heartbeatRecordAction =
198-
actionFactory.heartbeatRecordAction(metrics);
207+
actionFactory.heartbeatRecordAction(metrics, cancelQueryOnHeartbeat);
199208
final ChildPartitionsRecordAction childPartitionsRecordAction =
200209
actionFactory.childPartitionsRecordAction(partitionMetadataDao, metrics);
201210
final PartitionStartRecordAction partitionStartRecordAction =
@@ -218,7 +227,8 @@ public void setup() {
218227
partitionEndRecordAction,
219228
partitionEventRecordAction,
220229
metrics,
221-
isMutableChangeStream);
230+
isMutableChangeStream,
231+
realTimeCheckpointInterval);
222232
}
223233

224234
/**

0 commit comments

Comments
 (0)