Skip to content

Commit 070a41e

Browse files
authored
fix endtimestamp issue (#35544)
1 parent 2d5beae commit 070a41e

File tree

1 file changed

+6
-5
lines changed

1 file changed

+6
-5
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,11 @@ public ProcessContinuation run(
178178
BundleFinalizer bundleFinalizer) {
179179
final String token = partition.getPartitionToken();
180180
final Timestamp startTimestamp = tracker.currentRestriction().getFrom();
181+
final Timestamp endTimestamp = partition.getEndTimestamp();
181182
final Timestamp changeStreamQueryEndTimestamp =
182-
partition.getEndTimestamp().equals(MAX_INCLUSIVE_END_AT)
183+
endTimestamp.equals(MAX_INCLUSIVE_END_AT)
183184
? getNextReadChangeStreamEndTimestamp()
184-
: partition.getEndTimestamp();
185+
: endTimestamp;
185186

186187
// TODO: Potentially we can avoid this fetch, by enriching the runningAt timestamp when the
187188
// ReadChangeStreamPartitionDoFn#processElement is called
@@ -287,7 +288,7 @@ public ProcessContinuation run(
287288
"[{}] query change stream is out of range for {} to {}, finishing stream.",
288289
token,
289290
startTimestamp,
290-
changeStreamQueryEndTimestamp,
291+
endTimestamp,
291292
e);
292293
} else {
293294
throw e;
@@ -297,13 +298,13 @@ public ProcessContinuation run(
297298
"[{}] query change stream had exception processing range {} to {}.",
298299
token,
299300
startTimestamp,
300-
changeStreamQueryEndTimestamp,
301+
endTimestamp,
301302
e);
302303
throw e;
303304
}
304305

305306
LOG.debug("[{}] change stream completed successfully", token);
306-
if (tracker.tryClaim(changeStreamQueryEndTimestamp)) {
307+
if (tracker.tryClaim(endTimestamp)) {
307308
LOG.debug("[{}] Finishing partition", token);
308309
partitionMetadataDao.updateToFinished(token);
309310
metrics.decActivePartitionReadCounter();

0 commit comments

Comments
 (0)