Skip to content

Commit 92913b7

Browse files
fix: Add limit to stream partitions query to avoid loading old cell versions (#37099)
1 parent 3464da5 commit 92913b7

File tree

2 files changed

+89
-2
lines changed

2 files changed

+89
-2
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -453,8 +453,7 @@ public List<StreamPartitionWithWatermark> readStreamPartitionsWithWatermark()
453453
* @return list of PartitionRecord of all StreamPartitions in the metadata table.
454454
*/
455455
public List<PartitionRecord> readAllStreamPartitions() throws InvalidProtocolBufferException {
456-
Query query = Query.create(tableId).prefix(getFullStreamPartitionPrefix());
457-
ServerStream<Row> rows = dataClient.readRows(query);
456+
ServerStream<Row> rows = readAllStreamPartitionRows();
458457
List<PartitionRecord> partitions = new ArrayList<>();
459458
for (Row row : rows) {
460459
Instant watermark = parseWatermarkFromRow(row);
@@ -817,4 +816,22 @@ void mutateRowWithHardTimeout(RowMutation rowMutation) {
817816
throw new RuntimeException(interruptedException);
818817
}
819818
}
819+
820+
/**
821+
* Reads the raw bigtable StreamPartition rows. This is separate from {@link
822+
* #readAllStreamPartitions()} only for testing purposes. {@link #readAllStreamPartitions()}
823+
* should be used for all usage outside this file.
824+
*
825+
* @return {@link ServerStream} of StreamPartition bigtable rows
826+
*/
827+
@VisibleForTesting
828+
ServerStream<Row> readAllStreamPartitionRows() {
829+
Query query =
830+
Query.create(tableId)
831+
.prefix(getFullStreamPartitionPrefix())
832+
// Add a cells per column filter to avoid loading old versions of watermark and token.
833+
// We only need the latest.
834+
.filter(FILTERS.limit().cellsPerColumn(1));
835+
return dataClient.readRows(query);
836+
}
820837
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;
1919

20+
import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.MetadataTableEncoder.parseInitialContinuationTokens;
2021
import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.MetadataTableEncoder.parseWatermarkFromRow;
2122
import static org.hamcrest.MatcherAssert.assertThat;
2223
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -31,6 +32,7 @@
3132
import static org.mockito.Mockito.when;
3233

3334
import com.google.api.core.ApiFuture;
35+
import com.google.api.gax.rpc.ServerStream;
3436
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
3537
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
3638
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
@@ -39,6 +41,7 @@
3941
import com.google.cloud.bigtable.data.v2.models.Filters;
4042
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
4143
import com.google.cloud.bigtable.data.v2.models.Row;
44+
import com.google.cloud.bigtable.data.v2.models.RowCell;
4245
import com.google.cloud.bigtable.data.v2.models.RowMutation;
4346
import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
4447
import com.google.protobuf.ByteString;
@@ -49,6 +52,7 @@
4952
import java.util.Collections;
5053
import java.util.HashMap;
5154
import java.util.List;
55+
import java.util.Map;
5256
import java.util.concurrent.ExecutionException;
5357
import java.util.concurrent.TimeUnit;
5458
import java.util.concurrent.TimeoutException;
@@ -57,6 +61,7 @@
5761
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition;
5862
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
5963
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.StreamPartitionWithWatermark;
64+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
6065
import org.joda.time.Duration;
6166
import org.joda.time.Instant;
6267
import org.junit.Before;
@@ -774,4 +779,69 @@ public void mutateRowWithHardTimeoutErrorHandling()
774779
RuntimeException.class,
775780
() -> daoWithMock.mutateRowWithHardTimeout(RowMutation.create("test", "test").deleteRow()));
776781
}
782+
783+
@Test
784+
public void readAllStreamPartitionRowsOnlyReadsLatestVersion()
785+
throws InvalidProtocolBufferException {
786+
ByteStringRange partition1 = ByteStringRange.create("A", "B");
787+
Instant watermark1 = Instant.now();
788+
PartitionRecord partitionRecord1 =
789+
new PartitionRecord(partition1, watermark1, "1", watermark1, Collections.emptyList(), null);
790+
metadataTableDao.lockAndRecordPartition(partitionRecord1);
791+
792+
ByteStringRange partition2 = ByteStringRange.create("B", "D");
793+
ChangeStreamContinuationToken partition2Token1 =
794+
ChangeStreamContinuationToken.create(ByteStringRange.create("B", "C"), "tokenBC");
795+
ChangeStreamContinuationToken partition2Token2 =
796+
ChangeStreamContinuationToken.create(ByteStringRange.create("C", "D"), "tokenCD");
797+
Instant watermark2 = Instant.now();
798+
PartitionRecord partitionRecord2 =
799+
new PartitionRecord(
800+
partition2,
801+
Arrays.asList(partition2Token1, partition2Token2),
802+
"2",
803+
watermark2,
804+
Collections.emptyList(),
805+
null);
806+
metadataTableDao.lockAndRecordPartition(partitionRecord2);
807+
808+
// Update the watermark of partition1
809+
Instant watermark3 = watermark2.plus(Duration.standardSeconds(10));
810+
ChangeStreamContinuationToken partition1Token1 =
811+
ChangeStreamContinuationToken.create(ByteStringRange.create("A", "B"), "token1");
812+
metadataTableDao.updateWatermark(partition1, watermark3, partition1Token1);
813+
Instant watermark4 = watermark3.plus(Duration.standardSeconds(10));
814+
ChangeStreamContinuationToken partition1Token2 =
815+
ChangeStreamContinuationToken.create(ByteStringRange.create("A", "B"), "token2");
816+
metadataTableDao.updateWatermark(partition1, watermark4, partition1Token2);
817+
818+
ServerStream<Row> rows = metadataTableDao.readAllStreamPartitionRows();
819+
Map<ByteString, Row> rowsByKey = new HashMap<>();
820+
for (Row row : rows) {
821+
rowsByKey.put(row.getKey(), row);
822+
}
823+
Row partition1Row =
824+
rowsByKey.get(metadataTableDao.convertPartitionToStreamPartitionRowKey(partition1));
825+
Row partition2Row =
826+
rowsByKey.get(metadataTableDao.convertPartitionToStreamPartitionRowKey(partition2));
827+
828+
List<ChangeStreamContinuationToken> initialTokens =
829+
parseInitialContinuationTokens(partition2Row);
830+
// Make sure we get all initial tokens back
831+
assertEquals(partition2Token1, initialTokens.get(0));
832+
assertEquals(partition2Token2, initialTokens.get(1));
833+
// check we only get one watermark and token version even though we've added multiple
834+
List<RowCell> watermarks =
835+
partition1Row.getCells(
836+
MetadataTableAdminDao.CF_WATERMARK, MetadataTableAdminDao.QUALIFIER_DEFAULT);
837+
assertEquals(1, watermarks.size());
838+
Instant parsedWatermark =
839+
Instant.ofEpochMilli(Longs.fromByteArray(watermarks.get(0).getValue().toByteArray()));
840+
assertEquals(watermark4, parsedWatermark);
841+
List<RowCell> tokens =
842+
partition1Row.getCells(
843+
MetadataTableAdminDao.CF_CONTINUATION_TOKEN, MetadataTableAdminDao.QUALIFIER_DEFAULT);
844+
assertEquals(1, tokens.size());
845+
assertEquals(partition1Token2.getToken(), tokens.get(0).getValue().toStringUtf8());
846+
}
777847
}

0 commit comments

Comments
 (0)