1717 */
1818package org .apache .beam .sdk .io .gcp .spanner .changestreams .action ;
1919
20+ import static org .apache .beam .sdk .io .gcp .spanner .changestreams .ChangeStreamsConstants .MAX_INCLUSIVE_END_AT ;
21+
2022import com .google .cloud .Timestamp ;
2123import com .google .cloud .spanner .ErrorCode ;
2224import com .google .cloud .spanner .SpannerException ;
@@ -160,7 +162,10 @@ public ProcessContinuation run(
160162 BundleFinalizer bundleFinalizer ) {
161163 final String token = partition .getPartitionToken ();
162164 final Timestamp startTimestamp = tracker .currentRestriction ().getFrom ();
163- final Timestamp endTimestamp = partition .getEndTimestamp ();
165+ final Timestamp changeStreamQueryEndTimestamp =
166+ partition .getEndTimestamp ().equals (MAX_INCLUSIVE_END_AT )
167+ ? getNextReadChangeStreamEndTimestamp ()
168+ : partition .getEndTimestamp ();
164169
165170 // TODO: Potentially we can avoid this fetch, by enriching the runningAt timestamp when the
166171 // ReadChangeStreamPartitionDoFn#processElement is called
@@ -178,7 +183,7 @@ public ProcessContinuation run(
178183
179184 try (ChangeStreamResultSet resultSet =
180185 changeStreamDao .changeStreamQuery (
181- token , startTimestamp , endTimestamp , partition .getHeartbeatMillis ())) {
186+ token , startTimestamp , changeStreamQueryEndTimestamp , partition .getHeartbeatMillis ())) {
182187
183188 metrics .incQueryCounter ();
184189 while (resultSet .next ()) {
@@ -243,7 +248,7 @@ public ProcessContinuation run(
243248 "[{}] query change stream is out of range for {} to {}, finishing stream." ,
244249 token ,
245250 startTimestamp ,
246- endTimestamp ,
251+ changeStreamQueryEndTimestamp ,
247252 e );
248253 } else {
249254 throw e ;
@@ -253,13 +258,13 @@ public ProcessContinuation run(
253258 "[{}] query change stream had exception processing range {} to {}." ,
254259 token ,
255260 startTimestamp ,
256- endTimestamp ,
261+ changeStreamQueryEndTimestamp ,
257262 e );
258263 throw e ;
259264 }
260265
261266 LOG .debug ("[{}] change stream completed successfully" , token );
262- if (tracker .tryClaim (endTimestamp )) {
267+ if (tracker .tryClaim (changeStreamQueryEndTimestamp )) {
263268 LOG .debug ("[{}] Finishing partition" , token );
264269 partitionMetadataDao .updateToFinished (token );
265270 metrics .decActivePartitionReadCounter ();
@@ -292,4 +297,12 @@ private boolean isTimestampOutOfRange(SpannerException e) {
292297 && e .getMessage () != null
293298 && e .getMessage ().contains (OUT_OF_RANGE_ERROR_MESSAGE );
294299 }
300+
301+ // Return (now + 2 mins) as the end timestamp for reading change streams. This is only used if
302+ // users want to run the connector forever. This approach works because Google Dataflow
303+ // checkpoints every 5s or 5MB output provided and the change stream query has deadline for 1 min.
304+ private Timestamp getNextReadChangeStreamEndTimestamp () {
305+ final Timestamp current = Timestamp .now ();
306+ return Timestamp .ofTimeSecondsAndNanos (current .getSeconds () + 2 * 60 , current .getNanos ());
307+ }
295308}
0 commit comments