Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ public class QueryChangeStreamAction {
* @return a {@link ProcessContinuation#stop()} if a record timestamp could not be claimed or if
* the partition processing has finished
*/
@SuppressWarnings("nullness")
@VisibleForTesting
public ProcessContinuation run(
PartitionMetadata partition,
Expand All @@ -179,10 +178,9 @@ public ProcessContinuation run(
final String token = partition.getPartitionToken();
final Timestamp startTimestamp = tracker.currentRestriction().getFrom();
final Timestamp endTimestamp = partition.getEndTimestamp();
Copy link

@tianz101 tianz101 Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would like to get the right behavior agreed through this comment first.
It may need multiple PR's to fix.

Here endTs from the partition can be

  1. bounded (i.e., a specific end ts specified by user) or
  2. unbounded(i.e., MAX_INCLUSIVE_END_AT).

If bounded,
1.1) For change stream V1, we can use the bounded endTs to query.
1.2) For change stream V2, we can not use the bounded endTs to query as validation may fail endTs <= max(startTs, now) + 30m and return exception. So we should use min(endTs, now+2m) to query.
We can unify 1.1) and 1.2) as min(endTs, now+2m) to query.

If unbounded,
2.1) For change stream V1, we can use the unbounded endTs to query.
2.2) For change stream V2 we can not use the unbounded endTs to query as validation may fail endTs <= max(startTs, now) + 30min and return exception. So we should use now+2m instead.
We can unify 2.1) and 2.2) as min(endTs, now+2m) to query as well.

So in summary, we do not need to know v1 or v2, and consistently use min(endTs, now+2m) to query no matter v1 or v2, bounded or unbounded.

Feel free to poke a hole if this comment is not accurate. But I agree that unify to min(endTs, now+2m) can be risky as it changes V1 existing behaviors and may need more testing and more follow up PR's.

final boolean readToEndTimestamp = !endTimestamp.equals(MAX_INCLUSIVE_END_AT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about renaming to isBoundedQuery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went with isBoundedRestriction because the query is bounded in either case.

final Timestamp changeStreamQueryEndTimestamp =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the changeStreamQueryEndTimestamp is only used in line 202, can we move it right before line 202 so we can avoid some cases that the changeStreamQueryEndTimestamp is set in the past if we meant to set it in the future.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a good idea and it can reduce the window between getting the queryEndTs and using the queryEndTs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

endTimestamp.equals(MAX_INCLUSIVE_END_AT)
? getNextReadChangeStreamEndTimestamp()
: endTimestamp;
readToEndTimestamp ? endTimestamp : getNextReadChangeStreamEndTimestamp();
Copy link

@tianz101 tianz101 Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I just updated my comments to try to unify the queryEndTs = min(endTs, now+2m) no matter bounded or unbounded, no matter v1 or v2.
Now the question is how to detect end of the partition.

  1. If bounded endTs,then if (queryEndTs >= endTs), then it is ended.
  2. If unbounded endTs, maybe the line 281 logic is an indicator of the partition ended.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way here to determine if v1 is being used and to just continue using endTimestamp instead of now+2m in that case?

Alternatively I've updated the logic below so that we treat the childpartition records as indicating the end of the partition as well which appears true from the documentation.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry Sam, I tried to correct my comments to reflect the latest accurately.

Copy link

@tianz101 tianz101 Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR that Xue is working on can get v1 and v2 indicator #36667.
I will let you and Xue decide how much to address in this PR. Basically we should keep v1 behavior unchanged
and v2 use now+2m always to minimize the risks of regression.
If isMutableChangeStream(), then it is v2, otherwise it is v1. It queries partition_mode to figure out v1 or v2.


// TODO: Potentially we can avoid this fetch, by enriching the runningAt timestamp when the
// ReadChangeStreamPartitionDoFn#processElement is called
Expand All @@ -198,6 +196,7 @@ public ProcessContinuation run(
RestrictionInterrupter<Timestamp> interrupter =
RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT);

boolean stopAfterQuerySucceeds = readToEndTimestamp;
try (ChangeStreamResultSet resultSet =
changeStreamDao.changeStreamQuery(
token, startTimestamp, changeStreamQueryEndTimestamp, partition.getHeartbeatMillis())) {
Expand Down Expand Up @@ -250,6 +249,9 @@ public ProcessContinuation run(
tracker,
interrupter,
watermarkEstimator);
// The PartitionEndRecord indicates that there are no more records expected
// for this partition.
stopAfterQuerySucceeds = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we don't need it here. Only change stream v2 has PartitionEndRecord, while change stream v1 has ChildPartitionRecord (might be more than one), which also indicate the partition ends.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See below, I need the variable to determine whether the query made from (start, now+2m) stopped due to reaching now+2m or due to being done. Since if it's just because it reached the artificial end timestamp we need to reschedule.

For v1, it sounds like this approach is not sufficient. I believe the change to the artificial end timestamp was made for v2 change streams. Is it possible to see whether v1 or v2 is being used and we can use changeStreamQueryEndTimestamp=endTimestamp for v1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at https://docs.cloud.google.com/spanner/docs/change-streams/details#query it seems that ChildPartitionRecords always indicate the end of a query too and that all of such records will have the same timestamp. I've updated the logic to set stopAfterQuerySucceeds in that case too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following on this comment, if we add stopAfterQuerySucceeds = true for both ChildPartitionRecords and PartitionEndRecord, then we have assumption relying on that spanner always returns both records in the end of partition. Should we not make this assumption and rely on OutOfRange as the indicator of partition end? Thanks!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

} else if (record instanceof PartitionEventRecord) {
maybeContinuation =
partitionEventRecordAction.run(
Expand All @@ -272,27 +274,23 @@ public ProcessContinuation run(
}
}
}
bundleFinalizer.afterBundleCommit(
Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
updateWatermarkCallback(token, watermarkEstimator));

} catch (SpannerException e) {
/*
If there is a split when a partition is supposed to be finished, the residual will try
to perform a change stream query for an out of range interval. We ignore this error
here, and the residual should be able to claim the end of the timestamp range, finishing
the partition.
*/
if (isTimestampOutOfRange(e)) {
LOG.info(
"[{}] query change stream is out of range for {} to {}, finishing stream.",
token,
startTimestamp,
endTimestamp,
e);
} else {
if (!isTimestampOutOfRange(e)) {
throw e;
}
LOG.info(
"[{}] query change stream is out of range for {} to {}, finishing stream.",
token,
startTimestamp,
endTimestamp,
e);
stopAfterQuerySucceeds = true;
} catch (Exception e) {
LOG.error(
"[{}] query change stream had exception processing range {} to {}.",
Expand All @@ -303,13 +301,28 @@ public ProcessContinuation run(
throw e;
}

LOG.debug("[{}] change stream completed successfully", token);
if (tracker.tryClaim(endTimestamp)) {
LOG.debug("[{}] Finishing partition", token);
partitionMetadataDao.updateToFinished(token);
metrics.decActivePartitionReadCounter();
LOG.info("[{}] After attempting to finish the partition", token);
LOG.debug(
"[{}] change stream completed successfully up to {}", token, changeStreamQueryEndTimestamp);
Timestamp claimTimestamp =
stopAfterQuerySucceeds ? endTimestamp : changeStreamQueryEndTimestamp;
if (!tracker.tryClaim(claimTimestamp)) {
return ProcessContinuation.stop();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please verify if my understand is correct. Thanks
If a query finished without any interruptions, the tryClaim will always return true in line 308 no matter for bounded or unbounded query.

For bounded query line 315 return false and it is marked as finished later.
For unbounded query, line 315 return false, the work get rescheduled, the new query got out of range error and stopAfterQuerySucceeds is set to true and later the partition is marked as finished.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a query finished without any interruptions, the tryClaim will always return true in line 308 no matter for bounded or unbounded query.

It is possible that the runner splits the processing during the completion of the query and this tryClaim call, that is the case that tryClaim here would return false. Otherwise this marks at what point we have processed this partition up to either the end of the range or the end of what we queried. one note of interest is that if we are going stop processing due to internal SDF checks we need to claim up to the endTimestamp as otherwise it considers it data loss.

For bounded query line 315 return false and it is marked as finished later.
For unbounded query, line 315 return false, the work get rescheduled, the new query got out of range error and stopAfterQuerySucceeds is set to true and later the partition is marked as finished.

stopAfterQuerySucceeds is true either due to

  1. being a bounded partition (which we queried to completion).
  2. we got record indicating that the partition ended (I was using PartitionEndRecord but sounds like maybe that isn't enough for v1 change streams).
  3. we got a out-of-range error indicating the partition ended

If we don't want to stop ie stopAfterQuerySucceed=false, we indicate the SDF should resume with return value and above we claimed only to changeStreamQueryEndTimestamp. The SDF will resume from changeStreamQueryEndTimestamp as the start timestamp.

If we want to stop, we indicate the SDF should stop with the return value and we claimed up to endTimestamp above. The partition will not be rescheduled.

A difficulty that this code is trying to solve is that I don't see an easy way to determine when a query initiated from an unbounded range completes whether it completed due to

  1. reaching the changeStreamQueryEndTimestamp (in which case we want to resume)
  2. it completed due to the partition being done

This is why I was trying to explicitly track the partition done conditions via the stopAfterQuerySucceeds boolean.

bundleFinalizer.afterBundleCommit(
Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
updateWatermarkCallback(token, watermarkEstimator));

if (!stopAfterQuerySucceeds) {
LOG.debug("[{}] Rescheduling partition to resume reading", token);
return ProcessContinuation.resume();
}

LOG.debug("[{}] Finishing partition", token);
// TODO: This should be performed after the commit succeeds. Since bundle finalizers are not
// guaranteed to be called, this needs to be performed in a subsequent fused stage.
partitionMetadataDao.updateToFinished(token);
metrics.decActivePartitionReadCounter();
LOG.info("[{}] After attempting to finish the partition", token);
return ProcessContinuation.stop();
}

Expand Down Expand Up @@ -339,8 +352,8 @@ private boolean isTimestampOutOfRange(SpannerException e) {
}

// Return (now + 2 mins) as the end timestamp for reading change streams. This is only used if
// users want to run the connector forever. This approach works because Google Dataflow
// checkpoints every 5s or 5MB output provided and the change stream query has deadline for 1 min.
// users want to run the connector forever. If the end timestamp is reached, we will resume
// processing from that timestamp on a subsequent DoFn execution.
private Timestamp getNextReadChangeStreamEndTimestamp() {
final Timestamp current = Timestamp.now();
return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos());
Expand Down
Loading
Loading