diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java index ea7e4f14d057..35d8a55646ed 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java @@ -453,8 +453,7 @@ public List readStreamPartitionsWithWatermark() * @return list of PartitionRecord of all StreamPartitions in the metadata table. */ public List readAllStreamPartitions() throws InvalidProtocolBufferException { - Query query = Query.create(tableId).prefix(getFullStreamPartitionPrefix()); - ServerStream rows = dataClient.readRows(query); + ServerStream rows = readAllStreamPartitionRows(); List partitions = new ArrayList<>(); for (Row row : rows) { Instant watermark = parseWatermarkFromRow(row); @@ -817,4 +816,22 @@ void mutateRowWithHardTimeout(RowMutation rowMutation) { throw new RuntimeException(interruptedException); } } + + /** + * Reads the raw bigtable StreamPartition rows. This is separate from {@link + * #readAllStreamPartitions()} only for testing purposes. {@link #readAllStreamPartitions()} + * should be used for all usage outside this file. + * + * @return {@link ServerStream} of StreamPartition bigtable rows + */ + @VisibleForTesting + ServerStream readAllStreamPartitionRows() { + Query query = + Query.create(tableId) + .prefix(getFullStreamPartitionPrefix()) + // Add a cells per column filter to avoid loading old versions of watermark and token. + // We only need the latest. + .filter(FILTERS.limit().cellsPerColumn(1)); + return dataClient.readRows(query); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java index 867117b4d392..9c0fbcfec440 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao; +import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.MetadataTableEncoder.parseInitialContinuationTokens; import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.MetadataTableEncoder.parseWatermarkFromRow; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -31,6 +32,7 @@ import static org.mockito.Mockito.when; import com.google.api.core.ApiFuture; +import com.google.api.gax.rpc.ServerStream; import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient; import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings; import com.google.cloud.bigtable.data.v2.BigtableDataClient; @@ -39,6 +41,7 @@ import com.google.cloud.bigtable.data.v2.models.Filters; import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.data.v2.models.RowCell; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule; import com.google.protobuf.ByteString; @@ -49,6 +52,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -57,6 +61,7 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.StreamPartitionWithWatermark; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; @@ -774,4 +779,69 @@ public void mutateRowWithHardTimeoutErrorHandling() RuntimeException.class, () -> daoWithMock.mutateRowWithHardTimeout(RowMutation.create("test", "test").deleteRow())); } + + @Test + public void readAllStreamPartitionRowsOnlyReadsLatestVersion() + throws InvalidProtocolBufferException { + ByteStringRange partition1 = ByteStringRange.create("A", "B"); + Instant watermark1 = Instant.now(); + PartitionRecord partitionRecord1 = + new PartitionRecord(partition1, watermark1, "1", watermark1, Collections.emptyList(), null); + metadataTableDao.lockAndRecordPartition(partitionRecord1); + + ByteStringRange partition2 = ByteStringRange.create("B", "D"); + ChangeStreamContinuationToken partition2Token1 = + ChangeStreamContinuationToken.create(ByteStringRange.create("B", "C"), "tokenBC"); + ChangeStreamContinuationToken partition2Token2 = + ChangeStreamContinuationToken.create(ByteStringRange.create("C", "D"), "tokenCD"); + Instant watermark2 = Instant.now(); + PartitionRecord partitionRecord2 = + new PartitionRecord( + partition2, + Arrays.asList(partition2Token1, partition2Token2), + "2", + watermark2, + Collections.emptyList(), + null); + metadataTableDao.lockAndRecordPartition(partitionRecord2); + + // Update the watermark of partition1 + Instant watermark3 = watermark2.plus(Duration.standardSeconds(10)); + ChangeStreamContinuationToken partition1Token1 = + ChangeStreamContinuationToken.create(ByteStringRange.create("A", "B"), "token1"); + metadataTableDao.updateWatermark(partition1, watermark3, partition1Token1); + Instant watermark4 = watermark3.plus(Duration.standardSeconds(10)); + ChangeStreamContinuationToken partition1Token2 = + ChangeStreamContinuationToken.create(ByteStringRange.create("A", "B"), "token2"); + metadataTableDao.updateWatermark(partition1, watermark4, partition1Token2); + + ServerStream rows = metadataTableDao.readAllStreamPartitionRows(); + Map rowsByKey = new HashMap<>(); + for (Row row : rows) { + rowsByKey.put(row.getKey(), row); + } + Row partition1Row = + rowsByKey.get(metadataTableDao.convertPartitionToStreamPartitionRowKey(partition1)); + Row partition2Row = + rowsByKey.get(metadataTableDao.convertPartitionToStreamPartitionRowKey(partition2)); + + List initialTokens = + parseInitialContinuationTokens(partition2Row); + // Make sure we get all initial tokens back + assertEquals(partition2Token1, initialTokens.get(0)); + assertEquals(partition2Token2, initialTokens.get(1)); + // check we only get one watermark and token version even though we've added multiple + List watermarks = + partition1Row.getCells( + MetadataTableAdminDao.CF_WATERMARK, MetadataTableAdminDao.QUALIFIER_DEFAULT); + assertEquals(1, watermarks.size()); + Instant parsedWatermark = + Instant.ofEpochMilli(Longs.fromByteArray(watermarks.get(0).getValue().toByteArray())); + assertEquals(watermark4, parsedWatermark); + List tokens = + partition1Row.getCells( + MetadataTableAdminDao.CF_CONTINUATION_TOKEN, MetadataTableAdminDao.QUALIFIER_DEFAULT); + assertEquals(1, tokens.size()); + assertEquals(partition1Token2.getToken(), tokens.get(0).getValue().toStringUtf8()); + } }