diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index 3176abd9f247..8da9f3d09515 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -168,7 +168,6 @@ public class QueryChangeStreamAction { * @return a {@link ProcessContinuation#stop()} if a record timestamp could not be claimed or if * the partition processing has finished */ - @SuppressWarnings("nullness") @VisibleForTesting public ProcessContinuation run( PartitionMetadata partition, @@ -177,12 +176,6 @@ public ProcessContinuation run( ManualWatermarkEstimator watermarkEstimator, BundleFinalizer bundleFinalizer) { final String token = partition.getPartitionToken(); - final Timestamp startTimestamp = tracker.currentRestriction().getFrom(); - final Timestamp endTimestamp = partition.getEndTimestamp(); - final Timestamp changeStreamQueryEndTimestamp = - endTimestamp.equals(MAX_INCLUSIVE_END_AT) - ? getNextReadChangeStreamEndTimestamp() - : endTimestamp; // TODO: Potentially we can avoid this fetch, by enriching the runningAt timestamp when the // ReadChangeStreamPartitionDoFn#processElement is called @@ -198,6 +191,17 @@ public ProcessContinuation run( RestrictionInterrupter interrupter = RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT); + final Timestamp startTimestamp = tracker.currentRestriction().getFrom(); + final Timestamp endTimestamp = partition.getEndTimestamp(); + final boolean isBoundedRestriction = !endTimestamp.equals(MAX_INCLUSIVE_END_AT); + final Timestamp changeStreamQueryEndTimestamp = + isBoundedRestriction ? endTimestamp : getNextReadChangeStreamEndTimestamp(); + + // Once the changeStreamQuery completes we may need to resume reading from the partition if we + // had an unbounded restriction for which we set an arbitrary query end timestamp and for which + // we didn't encounter any indications that the partition is done (explicit end records or + // exceptions about being out of timestamp range). + boolean stopAfterQuerySucceeds = isBoundedRestriction; try (ChangeStreamResultSet resultSet = changeStreamDao.changeStreamQuery( token, startTimestamp, changeStreamQueryEndTimestamp, partition.getHeartbeatMillis())) { @@ -234,6 +238,10 @@ public ProcessContinuation run( tracker, interrupter, watermarkEstimator); + // Child Partition records indicate that the partition has ended. There may be + // additional ChildPartitionRecords but they will share the same timestamp and + // will be returned by the query and processed if it finishes successfully. + stopAfterQuerySucceeds = true; } else if (record instanceof PartitionStartRecord) { maybeContinuation = partitionStartRecordAction.run( @@ -250,6 +258,9 @@ public ProcessContinuation run( tracker, interrupter, watermarkEstimator); + // The PartitionEndRecord indicates that there are no more records expected + // for this partition. + stopAfterQuerySucceeds = true; } else if (record instanceof PartitionEventRecord) { maybeContinuation = partitionEventRecordAction.run( @@ -272,10 +283,6 @@ public ProcessContinuation run( } } } - bundleFinalizer.afterBundleCommit( - Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT), - updateWatermarkCallback(token, watermarkEstimator)); - } catch (SpannerException e) { /* If there is a split when a partition is supposed to be finished, the residual will try @@ -283,16 +290,16 @@ public ProcessContinuation run( here, and the residual should be able to claim the end of the timestamp range, finishing the partition. */ - if (isTimestampOutOfRange(e)) { - LOG.info( - "[{}] query change stream is out of range for {} to {}, finishing stream.", - token, - startTimestamp, - endTimestamp, - e); - } else { + if (!isTimestampOutOfRange(e)) { throw e; } + LOG.info( + "[{}] query change stream is out of range for {} to {}, finishing stream.", + token, + startTimestamp, + endTimestamp, + e); + stopAfterQuerySucceeds = true; } catch (Exception e) { LOG.error( "[{}] query change stream had exception processing range {} to {}.", @@ -303,13 +310,40 @@ public ProcessContinuation run( throw e; } - LOG.debug("[{}] change stream completed successfully", token); - if (tracker.tryClaim(endTimestamp)) { - LOG.debug("[{}] Finishing partition", token); - partitionMetadataDao.updateToFinished(token); - metrics.decActivePartitionReadCounter(); - LOG.info("[{}] After attempting to finish the partition", token); + LOG.debug( + "[{}] change stream completed successfully up to {}", token, changeStreamQueryEndTimestamp); + + if (!stopAfterQuerySucceeds) { + // Records stopped being returned for the query due to our artificial query end timestamp but + // we want to continue processing the partition, resuming from changeStreamQueryEndTimestamp. + if (!tracker.tryClaim(changeStreamQueryEndTimestamp)) { + return ProcessContinuation.stop(); + } + bundleFinalizer.afterBundleCommit( + Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT), + updateWatermarkCallback(token, watermarkEstimator)); + LOG.debug("[{}] Rescheduling partition to resume reading", token); + return ProcessContinuation.resume(); } + + // Otherwise we have finished processing the partition, either due to: + // 1. reading to the bounded restriction end timestamp + // 2. encountering a ChildPartitionRecord or EndPartitionRecord indicating there are no more + // elements in the partition + // 3. encountering a exception indicating the start timestamp is out of bounds of the + // partition + // We claim the restriction completely to satisfy internal sanity checks and do not reschedule + // the restriction. + if (!tracker.tryClaim(endTimestamp)) { + return ProcessContinuation.stop(); + } + + LOG.debug("[{}] Finishing partition", token); + // TODO: This should be performed after the commit succeeds. Since bundle finalizers are not + // guaranteed to be called, this needs to be performed in a subsequent fused stage. + partitionMetadataDao.updateToFinished(token); + metrics.decActivePartitionReadCounter(); + LOG.info("[{}] After attempting to finish the partition", token); return ProcessContinuation.stop(); } @@ -339,8 +373,8 @@ private boolean isTimestampOutOfRange(SpannerException e) { } // Return (now + 2 mins) as the end timestamp for reading change streams. This is only used if - // users want to run the connector forever. This approach works because Google Dataflow - // checkpoints every 5s or 5MB output provided and the change stream query has deadline for 1 min. + // users want to run the connector forever. If the end timestamp is reached, we will resume + // processing from that timestamp on a subsequent DoFn execution. private Timestamp getNextReadChangeStreamEndTimestamp() { final Timestamp current = Timestamp.now(); return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java index 21f5a888b14b..cf4c047025c4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java @@ -17,8 +17,10 @@ */ package org.apache.beam.sdk.io.gcp.spanner.changestreams.action; +import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -27,6 +29,8 @@ import static org.mockito.Mockito.when; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.ErrorCode; +import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.cloud.spanner.Struct; import java.util.Arrays; import java.util.Optional; @@ -55,10 +59,12 @@ import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; public class QueryChangeStreamActionTest { private static final String PARTITION_TOKEN = "partitionToken"; private static final Timestamp PARTITION_START_TIMESTAMP = Timestamp.ofTimeMicroseconds(10L); + private static final Timestamp RECORD_TIMESTAMP = Timestamp.ofTimeMicroseconds(20L); private static final Timestamp PARTITION_END_TIMESTAMP = Timestamp.ofTimeMicroseconds(30L); private static final long PARTITION_HEARTBEAT_MILLIS = 30_000L; private static final Instant WATERMARK = Instant.now(); @@ -136,6 +142,22 @@ public void setUp() throws Exception { when(partitionMetadataMapper.from(row)).thenReturn(partition); } + void setupUnboundedPartition() { + partition = + PartitionMetadata.newBuilder() + .setPartitionToken(PARTITION_TOKEN) + .setParentTokens(Sets.newHashSet("parentToken")) + .setStartTimestamp(PARTITION_START_TIMESTAMP) + .setEndTimestamp(MAX_INCLUSIVE_END_AT) + .setHeartbeatMillis(PARTITION_HEARTBEAT_MILLIS) + .setState(SCHEDULED) + .setWatermark(WATERMARK_TIMESTAMP) + .setScheduledAt(Timestamp.now()) + .build(); + when(partitionMetadataMapper.from(any())).thenReturn(partition); + when(restriction.getTo()).thenReturn(MAX_INCLUSIVE_END_AT); + } + @Test public void testQueryChangeStreamWithDataChangeRecord() { final Struct rowAsStruct = mock(Struct.class); @@ -145,7 +167,7 @@ public void testQueryChangeStreamWithDataChangeRecord() { final DataChangeRecord record1 = mock(DataChangeRecord.class); final DataChangeRecord record2 = mock(DataChangeRecord.class); when(record1.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP); - when(record2.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP); + when(record2.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP); when(changeStreamDao.changeStreamQuery( PARTITION_TOKEN, PARTITION_START_TIMESTAMP, @@ -214,8 +236,8 @@ public void testQueryChangeStreamWithHeartbeatRecord() { final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); final HeartbeatRecord record1 = mock(HeartbeatRecord.class); final HeartbeatRecord record2 = mock(HeartbeatRecord.class); - when(record1.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP); - when(record2.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP); + when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP); + when(record2.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP); when(changeStreamDao.changeStreamQuery( PARTITION_TOKEN, PARTITION_START_TIMESTAMP, @@ -498,19 +520,19 @@ public void testQueryChangeStreamWithRestrictionFromAfterPartitionStartForPartit } @Test - public void testQueryChangeStreamWithPartitionEndRecord() { + public void testQueryChangeStreamWithPartitionEndRecordBoundedRestriction() { final ChangeStreamResultSetMetadata resultSetMetadata = mock(ChangeStreamResultSetMetadata.class); final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); final PartitionEndRecord record1 = mock(PartitionEndRecord.class); - when(record1.getRecordTimestamp()).thenReturn(PARTITION_END_TIMESTAMP); + when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP); when(changeStreamDao.changeStreamQuery( PARTITION_TOKEN, PARTITION_START_TIMESTAMP, PARTITION_END_TIMESTAMP, PARTITION_HEARTBEAT_MILLIS)) .thenReturn(resultSet); - when(resultSet.next()).thenReturn(true); + when(resultSet.next()).thenReturn(true, false); when(resultSet.getMetadata()).thenReturn(resultSetMetadata); when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) .thenReturn(Arrays.asList(record1)); @@ -520,8 +542,9 @@ public void testQueryChangeStreamWithPartitionEndRecord() { eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator))) - .thenReturn(Optional.of(ProcessContinuation.stop())); + .thenReturn(Optional.empty()); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); + when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true); final ProcessContinuation result = action.run( @@ -535,14 +558,67 @@ public void testQueryChangeStreamWithPartitionEndRecord() { eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); - verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); + verify(restrictionTracker).tryClaim(PARTITION_END_TIMESTAMP); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any()); - verify(restrictionTracker, never()).tryClaim(any()); + verify(partitionMetadataDao, never()).updateWatermark(any(), any()); + } + + @Test + public void testQueryChangeStreamWithPartitionEndRecordUnboundedRestriction() { + setupUnboundedPartition(); + + final ChangeStreamResultSetMetadata resultSetMetadata = + mock(ChangeStreamResultSetMetadata.class); + final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); + final PartitionEndRecord record1 = mock(PartitionEndRecord.class); + when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP); + final ArgumentCaptor timestampCaptor = ArgumentCaptor.forClass(Timestamp.class); + when(changeStreamDao.changeStreamQuery( + eq(PARTITION_TOKEN), + eq(PARTITION_START_TIMESTAMP), + timestampCaptor.capture(), + eq(PARTITION_HEARTBEAT_MILLIS))) + .thenReturn(resultSet); + when(resultSet.next()).thenReturn(true, false); + when(resultSet.getMetadata()).thenReturn(resultSetMetadata); + when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) + .thenReturn(Arrays.asList(record1)); + when(partitionEndRecordAction.run( + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator))) + .thenReturn(Optional.empty()); + when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); + when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true); + + final ProcessContinuation result = + action.run( + partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); + + assertEquals(ProcessContinuation.stop(), result); + assertNotEquals(MAX_INCLUSIVE_END_AT, timestampCaptor.getValue()); + verify(partitionEndRecordAction) + .run( + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator)); + verify(restrictionTracker).tryClaim(MAX_INCLUSIVE_END_AT); + + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionMetadataDao, never()).updateWatermark(any(), any()); } @Test @@ -611,8 +687,90 @@ public void testQueryChangeStreamWithStreamFinished() { partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); assertEquals(ProcessContinuation.stop(), result); + verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN); + verify(metrics).decActivePartitionReadCounter(); + + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any()); + } + + @Test + public void testQueryChangeStreamFinishedWithResume() { + partition = + PartitionMetadata.newBuilder() + .setPartitionToken(PARTITION_TOKEN) + .setParentTokens(Sets.newHashSet("parentToken")) + .setStartTimestamp(PARTITION_START_TIMESTAMP) + .setEndTimestamp(MAX_INCLUSIVE_END_AT) + .setHeartbeatMillis(PARTITION_HEARTBEAT_MILLIS) + .setState(SCHEDULED) + .setWatermark(WATERMARK_TIMESTAMP) + .setScheduledAt(Timestamp.now()) + .build(); + when(partitionMetadataMapper.from(any())).thenReturn(partition); + + final ChangeStreamResultSet changeStreamResultSet = mock(ChangeStreamResultSet.class); + final ArgumentCaptor timestampCaptor = ArgumentCaptor.forClass(Timestamp.class); + when(changeStreamDao.changeStreamQuery( + eq(PARTITION_TOKEN), + eq(PARTITION_START_TIMESTAMP), + timestampCaptor.capture(), + eq(PARTITION_HEARTBEAT_MILLIS))) + .thenReturn(changeStreamResultSet); + when(changeStreamResultSet.next()).thenReturn(false); + when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); + when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true); + + final ProcessContinuation result = + action.run( + partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); + assertEquals(ProcessContinuation.resume(), result); + assertNotEquals(MAX_INCLUSIVE_END_AT, timestampCaptor.getValue()); + + verify(restrictionTracker).tryClaim(timestampCaptor.getValue()); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); + verify(partitionMetadataDao, never()).updateToFinished(PARTITION_TOKEN); + verify(metrics, never()).decActivePartitionReadCounter(); + + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any()); + } + + // Out of range indicates that we're beyond the end of the partition and should stop + // processing. + @Test + public void testQueryChangeStreamWithOutOfRangeErrorOnUnboundedPartition() { + setupUnboundedPartition(); + + final ArgumentCaptor timestampCaptor = ArgumentCaptor.forClass(Timestamp.class); + when(changeStreamDao.changeStreamQuery( + eq(PARTITION_TOKEN), + eq(PARTITION_START_TIMESTAMP), + timestampCaptor.capture(), + eq(PARTITION_HEARTBEAT_MILLIS))) + .thenThrow( + SpannerExceptionFactory.newSpannerException( + ErrorCode.OUT_OF_RANGE, "Specified start_timestamp is invalid")); + when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); + when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true); + + final ProcessContinuation result = + action.run( + partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); + assertEquals(ProcessContinuation.stop(), result); + assertNotEquals(MAX_INCLUSIVE_END_AT, timestampCaptor.getValue()); + + verify(restrictionTracker).tryClaim(MAX_INCLUSIVE_END_AT); verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN); + verify(metrics).decActivePartitionReadCounter(); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); @@ -620,6 +778,142 @@ public void testQueryChangeStreamWithStreamFinished() { verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any()); verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionMetadataDao, never()).updateWatermark(any(), any()); + } + + // Out of range indicates that we're beyond the end of the partition and should stop + // processing. + @Test + public void testQueryChangeStreamWithOutOfRangeErrorOnBoundedPartition() { + when(changeStreamDao.changeStreamQuery( + eq(PARTITION_TOKEN), + eq(PARTITION_START_TIMESTAMP), + eq(PARTITION_END_TIMESTAMP), + eq(PARTITION_HEARTBEAT_MILLIS))) + .thenThrow( + SpannerExceptionFactory.newSpannerException( + ErrorCode.OUT_OF_RANGE, "Specified start_timestamp is invalid")); + when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); + when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true); + + final ProcessContinuation result = + action.run( + partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); + assertEquals(ProcessContinuation.stop(), result); + + verify(restrictionTracker).tryClaim(PARTITION_END_TIMESTAMP); + verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN); + verify(metrics).decActivePartitionReadCounter(); + + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionMetadataDao, never()).updateWatermark(any(), any()); + } + + @Test + public void testQueryChangeStreamWithChildPartitionsRecordBoundedRestriction() { + final ChangeStreamResultSetMetadata resultSetMetadata = + mock(ChangeStreamResultSetMetadata.class); + final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); + final ChildPartitionsRecord record1 = mock(ChildPartitionsRecord.class); + when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP); + when(changeStreamDao.changeStreamQuery( + PARTITION_TOKEN, + PARTITION_START_TIMESTAMP, + PARTITION_END_TIMESTAMP, + PARTITION_HEARTBEAT_MILLIS)) + .thenReturn(resultSet); + when(resultSet.next()).thenReturn(true, false); + when(resultSet.getMetadata()).thenReturn(resultSetMetadata); + when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) + .thenReturn(Arrays.asList(record1)); + when(childPartitionsRecordAction.run( + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator))) + .thenReturn(Optional.empty()); + when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); + when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true); + + final ProcessContinuation result = + action.run( + partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); + + assertEquals(ProcessContinuation.stop(), result); + verify(childPartitionsRecordAction) + .run( + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator)); + verify(restrictionTracker).tryClaim(PARTITION_END_TIMESTAMP); + + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionMetadataDao, never()).updateWatermark(any(), any()); + } + + @Test + public void testQueryChangeStreamWithChildPartitionsRecordUnboundedRestriction() { + setupUnboundedPartition(); + + final ChangeStreamResultSetMetadata resultSetMetadata = + mock(ChangeStreamResultSetMetadata.class); + final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); + final ChildPartitionsRecord record1 = mock(ChildPartitionsRecord.class); + when(record1.getRecordTimestamp()).thenReturn(RECORD_TIMESTAMP); + final ArgumentCaptor timestampCaptor = ArgumentCaptor.forClass(Timestamp.class); + when(changeStreamDao.changeStreamQuery( + eq(PARTITION_TOKEN), + eq(PARTITION_START_TIMESTAMP), + timestampCaptor.capture(), + eq(PARTITION_HEARTBEAT_MILLIS))) + .thenReturn(resultSet); + when(resultSet.next()).thenReturn(true, false); + when(resultSet.getMetadata()).thenReturn(resultSetMetadata); + when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) + .thenReturn(Arrays.asList(record1)); + when(childPartitionsRecordAction.run( + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator))) + .thenReturn(Optional.empty()); + when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); + when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true); + + final ProcessContinuation result = + action.run( + partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); + + assertEquals(ProcessContinuation.stop(), result); + assertNotEquals(MAX_INCLUSIVE_END_AT, timestampCaptor.getValue()); + verify(childPartitionsRecordAction) + .run( + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator)); + verify(restrictionTracker).tryClaim(MAX_INCLUSIVE_END_AT); + + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(partitionMetadataDao, never()).updateWatermark(any(), any()); } private static class BundleFinalizerStub implements BundleFinalizer {