Skip to content

Commit a1223bc

Browse files
committed
fix: Add limit to stream partitions query to avoid loading old cell versions
1 parent cbcdf1b commit a1223bc

File tree

2 files changed

+89
-11
lines changed

2 files changed

+89
-11
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -453,8 +453,7 @@ public List<StreamPartitionWithWatermark> readStreamPartitionsWithWatermark()
453453
* @return list of PartitionRecord of all StreamPartitions in the metadata table.
454454
*/
455455
public List<PartitionRecord> readAllStreamPartitions() throws InvalidProtocolBufferException {
456-
Query query = Query.create(tableId).prefix(getFullStreamPartitionPrefix());
457-
ServerStream<Row> rows = dataClient.readRows(query);
456+
ServerStream<Row> rows = readAllStreamPartitionRows();
458457
List<PartitionRecord> partitions = new ArrayList<>();
459458
for (Row row : rows) {
460459
Instant watermark = parseWatermarkFromRow(row);
@@ -817,4 +816,22 @@ void mutateRowWithHardTimeout(RowMutation rowMutation) {
817816
throw new RuntimeException(interruptedException);
818817
}
819818
}
819+
820+
/**
821+
* Reads the raw bigtable StreamPartition rows. This is separate from {@link
822+
* #readAllStreamPartitions()} only for testing purposes. {@link #readAllStreamPartitions()}
823+
* should be used for all usage outside this file.
824+
*
825+
* @return {@link ServerStream} of StreamPartition bigtable rows
826+
*/
827+
@VisibleForTesting
828+
ServerStream<Row> readAllStreamPartitionRows() {
829+
Query query =
830+
Query.create(tableId)
831+
.prefix(getFullStreamPartitionPrefix())
832+
// Add a cells per column filter to avoid loading old versions of watermark and token.
833+
// We only need the latest.
834+
.filter(FILTERS.limit().cellsPerColumn(1));
835+
return dataClient.readRows(query);
836+
}
820837
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDaoTest.java

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;
1919

20+
import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.MetadataTableEncoder.parseInitialContinuationTokens;
2021
import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.MetadataTableEncoder.parseWatermarkFromRow;
2122
import static org.hamcrest.MatcherAssert.assertThat;
2223
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -31,24 +32,18 @@
3132
import static org.mockito.Mockito.when;
3233

3334
import com.google.api.core.ApiFuture;
35+
import com.google.api.gax.rpc.ServerStream;
3436
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
3537
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
3638
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
3739
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
38-
import com.google.cloud.bigtable.data.v2.models.ChangeStreamContinuationToken;
39-
import com.google.cloud.bigtable.data.v2.models.Filters;
40+
import com.google.cloud.bigtable.data.v2.models.*;
4041
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
41-
import com.google.cloud.bigtable.data.v2.models.Row;
42-
import com.google.cloud.bigtable.data.v2.models.RowMutation;
4342
import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
4443
import com.google.protobuf.ByteString;
4544
import com.google.protobuf.InvalidProtocolBufferException;
4645
import java.io.IOException;
47-
import java.util.ArrayList;
48-
import java.util.Arrays;
49-
import java.util.Collections;
50-
import java.util.HashMap;
51-
import java.util.List;
46+
import java.util.*;
5247
import java.util.concurrent.ExecutionException;
5348
import java.util.concurrent.TimeUnit;
5449
import java.util.concurrent.TimeoutException;
@@ -57,6 +52,7 @@
5752
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition;
5853
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
5954
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.StreamPartitionWithWatermark;
55+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
6056
import org.joda.time.Duration;
6157
import org.joda.time.Instant;
6258
import org.junit.Before;
@@ -774,4 +770,69 @@ public void mutateRowWithHardTimeoutErrorHandling()
774770
RuntimeException.class,
775771
() -> daoWithMock.mutateRowWithHardTimeout(RowMutation.create("test", "test").deleteRow()));
776772
}
773+
774+
@Test
775+
public void readAllStreamPartitionRowsOnlyReadsLatestVersion()
776+
throws InvalidProtocolBufferException {
777+
ByteStringRange partition1 = ByteStringRange.create("A", "B");
778+
Instant watermark1 = Instant.now();
779+
PartitionRecord partitionRecord1 =
780+
new PartitionRecord(partition1, watermark1, "1", watermark1, Collections.emptyList(), null);
781+
metadataTableDao.lockAndRecordPartition(partitionRecord1);
782+
783+
ByteStringRange partition2 = ByteStringRange.create("B", "D");
784+
ChangeStreamContinuationToken partition2Token1 =
785+
ChangeStreamContinuationToken.create(ByteStringRange.create("B", "C"), "tokenBC");
786+
ChangeStreamContinuationToken partition2Token2 =
787+
ChangeStreamContinuationToken.create(ByteStringRange.create("C", "D"), "tokenCD");
788+
Instant watermark2 = Instant.now();
789+
PartitionRecord partitionRecord2 =
790+
new PartitionRecord(
791+
partition2,
792+
Arrays.asList(partition2Token1, partition2Token2),
793+
"2",
794+
watermark2,
795+
Collections.emptyList(),
796+
null);
797+
metadataTableDao.lockAndRecordPartition(partitionRecord2);
798+
799+
// Update the watermark of partition1
800+
Instant watermark3 = watermark2.plus(Duration.standardSeconds(10));
801+
ChangeStreamContinuationToken partition1Token1 =
802+
ChangeStreamContinuationToken.create(ByteStringRange.create("A", "B"), "token1");
803+
metadataTableDao.updateWatermark(partition1, watermark3, partition1Token1);
804+
Instant watermark4 = watermark3.plus(Duration.standardSeconds(10));
805+
ChangeStreamContinuationToken partition1Token2 =
806+
ChangeStreamContinuationToken.create(ByteStringRange.create("A", "B"), "token2");
807+
metadataTableDao.updateWatermark(partition1, watermark4, partition1Token2);
808+
809+
ServerStream<Row> rows = metadataTableDao.readAllStreamPartitionRows();
810+
Map<ByteString, Row> rowsByKey = new HashMap<>();
811+
for (Row row : rows) {
812+
rowsByKey.put(row.getKey(), row);
813+
}
814+
Row partition1Row =
815+
rowsByKey.get(metadataTableDao.convertPartitionToStreamPartitionRowKey(partition1));
816+
Row partition2Row =
817+
rowsByKey.get(metadataTableDao.convertPartitionToStreamPartitionRowKey(partition2));
818+
819+
List<ChangeStreamContinuationToken> initialTokens =
820+
parseInitialContinuationTokens(partition2Row);
821+
// Make sure we get all initial tokens back
822+
assertEquals(partition2Token1, initialTokens.get(0));
823+
assertEquals(partition2Token2, initialTokens.get(1));
824+
// check we only get one watermark and token version even though we've added multiple
825+
List<RowCell> watermarks =
826+
partition1Row.getCells(
827+
MetadataTableAdminDao.CF_WATERMARK, MetadataTableAdminDao.QUALIFIER_DEFAULT);
828+
assertEquals(1, watermarks.size());
829+
Instant parsedWatermark =
830+
Instant.ofEpochMilli(Longs.fromByteArray(watermarks.get(0).getValue().toByteArray()));
831+
assertEquals(watermark4, parsedWatermark);
832+
List<RowCell> tokens =
833+
partition1Row.getCells(
834+
MetadataTableAdminDao.CF_CONTINUATION_TOKEN, MetadataTableAdminDao.QUALIFIER_DEFAULT);
835+
assertEquals(1, tokens.size());
836+
assertEquals(partition1Token2.getToken(), tokens.get(0).getValue().toStringUtf8());
837+
}
777838
}

0 commit comments

Comments
 (0)