Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,7 @@ public List<StreamPartitionWithWatermark> readStreamPartitionsWithWatermark()
* @return list of PartitionRecord of all StreamPartitions in the metadata table.
*/
public List<PartitionRecord> readAllStreamPartitions() throws InvalidProtocolBufferException {
Query query = Query.create(tableId).prefix(getFullStreamPartitionPrefix());
ServerStream<Row> rows = dataClient.readRows(query);
ServerStream<Row> rows = readAllStreamPartitionRows();
List<PartitionRecord> partitions = new ArrayList<>();
for (Row row : rows) {
Instant watermark = parseWatermarkFromRow(row);
Expand Down Expand Up @@ -817,4 +816,22 @@ void mutateRowWithHardTimeout(RowMutation rowMutation) {
throw new RuntimeException(interruptedException);
}
}

/**
* Reads the raw bigtable StreamPartition rows. This is separate from {@link
* #readAllStreamPartitions()} only for testing purposes. {@link #readAllStreamPartitions()}
* should be used for all usage outside this file.
*
* @return {@link ServerStream} of StreamPartition bigtable rows
*/
@VisibleForTesting
ServerStream<Row> readAllStreamPartitionRows() {
Query query =
Query.create(tableId)
.prefix(getFullStreamPartitionPrefix())
// Add a cells per column filter to avoid loading old versions of watermark and token.
// We only need the latest.
.filter(FILTERS.limit().cellsPerColumn(1));
return dataClient.readRows(query);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao;

import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.MetadataTableEncoder.parseInitialContinuationTokens;
import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder.MetadataTableEncoder.parseWatermarkFromRow;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
Expand All @@ -31,6 +32,7 @@
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
Expand All @@ -39,6 +41,7 @@
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowCell;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.emulator.v2.BigtableEmulatorRule;
import com.google.protobuf.ByteString;
Expand All @@ -49,6 +52,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -57,6 +61,7 @@
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.NewPartition;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.StreamPartitionWithWatermark;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
Expand Down Expand Up @@ -774,4 +779,69 @@ public void mutateRowWithHardTimeoutErrorHandling()
RuntimeException.class,
() -> daoWithMock.mutateRowWithHardTimeout(RowMutation.create("test", "test").deleteRow()));
}

@Test
public void readAllStreamPartitionRowsOnlyReadsLatestVersion()
throws InvalidProtocolBufferException {
ByteStringRange partition1 = ByteStringRange.create("A", "B");
Instant watermark1 = Instant.now();
PartitionRecord partitionRecord1 =
new PartitionRecord(partition1, watermark1, "1", watermark1, Collections.emptyList(), null);
metadataTableDao.lockAndRecordPartition(partitionRecord1);

ByteStringRange partition2 = ByteStringRange.create("B", "D");
ChangeStreamContinuationToken partition2Token1 =
ChangeStreamContinuationToken.create(ByteStringRange.create("B", "C"), "tokenBC");
ChangeStreamContinuationToken partition2Token2 =
ChangeStreamContinuationToken.create(ByteStringRange.create("C", "D"), "tokenCD");
Instant watermark2 = Instant.now();
PartitionRecord partitionRecord2 =
new PartitionRecord(
partition2,
Arrays.asList(partition2Token1, partition2Token2),
"2",
watermark2,
Collections.emptyList(),
null);
metadataTableDao.lockAndRecordPartition(partitionRecord2);

// Update the watermark of partition1
Instant watermark3 = watermark2.plus(Duration.standardSeconds(10));
ChangeStreamContinuationToken partition1Token1 =
ChangeStreamContinuationToken.create(ByteStringRange.create("A", "B"), "token1");
metadataTableDao.updateWatermark(partition1, watermark3, partition1Token1);
Instant watermark4 = watermark3.plus(Duration.standardSeconds(10));
ChangeStreamContinuationToken partition1Token2 =
ChangeStreamContinuationToken.create(ByteStringRange.create("A", "B"), "token2");
metadataTableDao.updateWatermark(partition1, watermark4, partition1Token2);

ServerStream<Row> rows = metadataTableDao.readAllStreamPartitionRows();
Map<ByteString, Row> rowsByKey = new HashMap<>();
for (Row row : rows) {
rowsByKey.put(row.getKey(), row);
}
Row partition1Row =
rowsByKey.get(metadataTableDao.convertPartitionToStreamPartitionRowKey(partition1));
Row partition2Row =
rowsByKey.get(metadataTableDao.convertPartitionToStreamPartitionRowKey(partition2));

List<ChangeStreamContinuationToken> initialTokens =
parseInitialContinuationTokens(partition2Row);
// Make sure we get all initial tokens back
assertEquals(partition2Token1, initialTokens.get(0));
assertEquals(partition2Token2, initialTokens.get(1));
// check we only get one watermark and token version even though we've added multiple
List<RowCell> watermarks =
partition1Row.getCells(
MetadataTableAdminDao.CF_WATERMARK, MetadataTableAdminDao.QUALIFIER_DEFAULT);
assertEquals(1, watermarks.size());
Instant parsedWatermark =
Instant.ofEpochMilli(Longs.fromByteArray(watermarks.get(0).getValue().toByteArray()));
assertEquals(watermark4, parsedWatermark);
List<RowCell> tokens =
partition1Row.getCells(
MetadataTableAdminDao.CF_CONTINUATION_TOKEN, MetadataTableAdminDao.QUALIFIER_DEFAULT);
assertEquals(1, tokens.size());
assertEquals(partition1Token2.getToken(), tokens.get(0).getValue().toStringUtf8());
}
}
Loading