Skip to content

Commit 065a7bd

Browse files
committed
address comments: stop on childpartition records, naming, restructure for clarity, reduce race
1 parent dc9179d commit 065a7bd

File tree

2 files changed

+141
-18
lines changed

2 files changed

+141
-18
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -176,11 +176,6 @@ public ProcessContinuation run(
176176
ManualWatermarkEstimator<Instant> watermarkEstimator,
177177
BundleFinalizer bundleFinalizer) {
178178
final String token = partition.getPartitionToken();
179-
final Timestamp startTimestamp = tracker.currentRestriction().getFrom();
180-
final Timestamp endTimestamp = partition.getEndTimestamp();
181-
final boolean readToEndTimestamp = !endTimestamp.equals(MAX_INCLUSIVE_END_AT);
182-
final Timestamp changeStreamQueryEndTimestamp =
183-
readToEndTimestamp ? endTimestamp : getNextReadChangeStreamEndTimestamp();
184179

185180
// TODO: Potentially we can avoid this fetch, by enriching the runningAt timestamp when the
186181
// ReadChangeStreamPartitionDoFn#processElement is called
@@ -196,7 +191,17 @@ public ProcessContinuation run(
196191
RestrictionInterrupter<Timestamp> interrupter =
197192
RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT);
198193

199-
boolean stopAfterQuerySucceeds = readToEndTimestamp;
194+
final Timestamp startTimestamp = tracker.currentRestriction().getFrom();
195+
final Timestamp endTimestamp = partition.getEndTimestamp();
196+
final boolean isBoundedRestriction = !endTimestamp.equals(MAX_INCLUSIVE_END_AT);
197+
final Timestamp changeStreamQueryEndTimestamp =
198+
isBoundedRestriction ? endTimestamp : getNextReadChangeStreamEndTimestamp();
199+
200+
// Once the changeStreamQuery completes we may need to resume reading from the partition if we
201+
// had an unbounded restriction for which we set an arbitrary query end timestamp and for which
202+
// we didn't encounter any indications that the partition is done (explicit end records or
203+
// exceptions about being out of timestamp range).
204+
boolean stopAfterQuerySucceeds = isBoundedRestriction;
200205
try (ChangeStreamResultSet resultSet =
201206
changeStreamDao.changeStreamQuery(
202207
token, startTimestamp, changeStreamQueryEndTimestamp, partition.getHeartbeatMillis())) {
@@ -233,6 +238,10 @@ public ProcessContinuation run(
233238
tracker,
234239
interrupter,
235240
watermarkEstimator);
241+
// Child Partition records indicate that the partition has ended. There may be
242+
// additional ChildPartitionRecords but they will share the same timestamp and
243+
// will be returned by the query and processed if it finishes successfully.
244+
stopAfterQuerySucceeds = true;
236245
} else if (record instanceof PartitionStartRecord) {
237246
maybeContinuation =
238247
partitionStartRecordAction.run(
@@ -303,20 +312,32 @@ public ProcessContinuation run(
303312

304313
LOG.debug(
305314
"[{}] change stream completed successfully up to {}", token, changeStreamQueryEndTimestamp);
306-
Timestamp claimTimestamp =
307-
stopAfterQuerySucceeds ? endTimestamp : changeStreamQueryEndTimestamp;
308-
if (!tracker.tryClaim(claimTimestamp)) {
309-
return ProcessContinuation.stop();
310-
}
311-
bundleFinalizer.afterBundleCommit(
312-
Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
313-
updateWatermarkCallback(token, watermarkEstimator));
314315

315316
if (!stopAfterQuerySucceeds) {
317+
// Records stopped being returned for the query due to our artificial query end timestamp but
318+
// we want to continue processing the partition, resuming from changeStreamQueryEndTimestamp.
319+
if (!tracker.tryClaim(changeStreamQueryEndTimestamp)) {
320+
return ProcessContinuation.stop();
321+
}
322+
bundleFinalizer.afterBundleCommit(
323+
Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
324+
updateWatermarkCallback(token, watermarkEstimator));
316325
LOG.debug("[{}] Rescheduling partition to resume reading", token);
317326
return ProcessContinuation.resume();
318327
}
319328

329+
// Otherwise we have finished processing the partition, either due to:
330+
// 1. reading to the bounded restriction end timestamp
331+
// 2. encountering a ChildPartitionRecord or EndPartitionRecord indicating there are no more
332+
// elements in the partition
333+
// 3. encountering a exception indicating the start timestamp is out of bounds of the
334+
// partition
335+
// We claim the restriction completely to satisfy internal sanity checks and do not reschedule
336+
// the restriction.
337+
if (!tracker.tryClaim(endTimestamp)) {
338+
return ProcessContinuation.stop();
339+
}
340+
320341
LOG.debug("[{}] Finishing partition", token);
321342
// TODO: This should be performed after the commit succeeds. Since bundle finalizers are not
322343
// guaranteed to be called, this needs to be performed in a subsequent fused stage.

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java

Lines changed: 106 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -558,14 +558,14 @@ public void testQueryChangeStreamWithPartitionEndRecordBoundedRestriction() {
558558
eq(restrictionTracker),
559559
any(RestrictionInterrupter.class),
560560
eq(watermarkEstimator));
561-
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
562561
verify(restrictionTracker).tryClaim(PARTITION_END_TIMESTAMP);
563562

564563
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any());
565564
verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any());
566565
verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any());
567566
verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any());
568567
verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any());
568+
verify(partitionMetadataDao, never()).updateWatermark(any(), any());
569569
}
570570

571571
@Test
@@ -611,14 +611,14 @@ public void testQueryChangeStreamWithPartitionEndRecordUnboundedRestriction() {
611611
eq(restrictionTracker),
612612
any(RestrictionInterrupter.class),
613613
eq(watermarkEstimator));
614-
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
615614
verify(restrictionTracker).tryClaim(MAX_INCLUSIVE_END_AT);
616615

617616
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any());
618617
verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any());
619618
verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any());
620619
verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any());
621620
verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any());
621+
verify(partitionMetadataDao, never()).updateWatermark(any(), any());
622622
}
623623

624624
@Test
@@ -771,14 +771,14 @@ public void testQueryChangeStreamWithOutOfRangeErrorOnUnboundedPartition() {
771771
verify(restrictionTracker).tryClaim(MAX_INCLUSIVE_END_AT);
772772
verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN);
773773
verify(metrics).decActivePartitionReadCounter();
774-
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
775774

776775
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any());
777776
verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any());
778777
verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any());
779778
verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any());
780779
verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any());
781780
verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any());
781+
verify(partitionMetadataDao, never()).updateWatermark(any(), any());
782782
}
783783

784784
// Out of range indicates that we're beyond the end of the partition and should stop
@@ -804,14 +804,116 @@ public void testQueryChangeStreamWithOutOfRangeErrorOnBoundedPartition() {
804804
verify(restrictionTracker).tryClaim(PARTITION_END_TIMESTAMP);
805805
verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN);
806806
verify(metrics).decActivePartitionReadCounter();
807-
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
808807

809808
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any());
810809
verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any());
811810
verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any());
812811
verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any());
813812
verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any());
814813
verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any());
814+
verify(partitionMetadataDao, never()).updateWatermark(any(), any());
815+
}
816+
817+
@Test
818+
public void testQueryChangeStreamWithChildPartitionsRecordBoundedRestriction() {
819+
final ChangeStreamResultSetMetadata resultSetMetadata =
820+
mock(ChangeStreamResultSetMetadata.class);
821+
final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
822+
final ChildPartitionsRecord record1 = mock(ChildPartitionsRecord.class);
823+
when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP);
824+
when(changeStreamDao.changeStreamQuery(
825+
PARTITION_TOKEN,
826+
PARTITION_START_TIMESTAMP,
827+
PARTITION_END_TIMESTAMP,
828+
PARTITION_HEARTBEAT_MILLIS))
829+
.thenReturn(resultSet);
830+
when(resultSet.next()).thenReturn(true, false);
831+
when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
832+
when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata))
833+
.thenReturn(Arrays.asList(record1));
834+
when(childPartitionsRecordAction.run(
835+
eq(partition),
836+
eq(record1),
837+
eq(restrictionTracker),
838+
any(RestrictionInterrupter.class),
839+
eq(watermarkEstimator)))
840+
.thenReturn(Optional.empty());
841+
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
842+
when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
843+
844+
final ProcessContinuation result =
845+
action.run(
846+
partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
847+
848+
assertEquals(ProcessContinuation.stop(), result);
849+
verify(childPartitionsRecordAction)
850+
.run(
851+
eq(partition),
852+
eq(record1),
853+
eq(restrictionTracker),
854+
any(RestrictionInterrupter.class),
855+
eq(watermarkEstimator));
856+
verify(restrictionTracker).tryClaim(PARTITION_END_TIMESTAMP);
857+
858+
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any());
859+
verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any());
860+
verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any());
861+
verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any());
862+
verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any());
863+
verify(partitionMetadataDao, never()).updateWatermark(any(), any());
864+
}
865+
866+
@Test
867+
public void testQueryChangeStreamWithChildPartitionsRecordUnboundedRestriction() {
868+
setupUnboundedPartition();
869+
870+
final ChangeStreamResultSetMetadata resultSetMetadata =
871+
mock(ChangeStreamResultSetMetadata.class);
872+
final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
873+
final ChildPartitionsRecord record1 = mock(ChildPartitionsRecord.class);
874+
when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP);
875+
final ArgumentCaptor<Timestamp> timestampCaptor = ArgumentCaptor.forClass(Timestamp.class);
876+
when(changeStreamDao.changeStreamQuery(
877+
eq(PARTITION_TOKEN),
878+
eq(PARTITION_START_TIMESTAMP),
879+
timestampCaptor.capture(),
880+
eq(PARTITION_HEARTBEAT_MILLIS)))
881+
.thenReturn(resultSet);
882+
when(resultSet.next()).thenReturn(true, false);
883+
when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
884+
when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata))
885+
.thenReturn(Arrays.asList(record1));
886+
when(childPartitionsRecordAction.run(
887+
eq(partition),
888+
eq(record1),
889+
eq(restrictionTracker),
890+
any(RestrictionInterrupter.class),
891+
eq(watermarkEstimator)))
892+
.thenReturn(Optional.empty());
893+
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
894+
when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
895+
896+
final ProcessContinuation result =
897+
action.run(
898+
partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
899+
900+
assertEquals(ProcessContinuation.stop(), result);
901+
assertNotEquals(MAX_INCLUSIVE_END_AT, timestampCaptor.getValue());
902+
verify(childPartitionsRecordAction)
903+
.run(
904+
eq(partition),
905+
eq(record1),
906+
eq(restrictionTracker),
907+
any(RestrictionInterrupter.class),
908+
eq(watermarkEstimator));
909+
verify(restrictionTracker).tryClaim(MAX_INCLUSIVE_END_AT);
910+
911+
verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any());
912+
verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any());
913+
verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any());
914+
verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any());
915+
verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any());
916+
verify(partitionMetadataDao, never()).updateWatermark(any(), any());
815917
}
816918

817919
private static class BundleFinalizerStub implements BundleFinalizer {

0 commit comments

Comments
 (0)