Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,22 @@ public Read withMaxBufferElementCount(@Nullable Integer maxBufferElementCount) {
.build();
}

/**
* Returns a new {@link BigtableIO.Read} that will skip the large rows (>256MB) while reading
* This function will switch the base BigtableIO.Reader class to using the
* BigtableReaderWithExperimentalOptions. If
*
* <p>Does not modify this object.
*/
public Read withExperimentSkipLargeRows(@Nullable Boolean skipLargeRows) {
BigtableReadOptions bigtableReadOptions = getBigtableReadOptions();

return toBuilder()
.setBigtableReadOptions(
bigtableReadOptions.toBuilder().setExperimentalSkipLargeRows(skipLargeRows).build())
.build();
}

/**
* Returns a new {@link BigtableIO.Read} that will read only rows in the specified range.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
})
abstract class BigtableReadOptions implements Serializable {

/** Returns the experimental skip large rows option value */
abstract @Nullable Boolean getExperimentalSkipLargeRows();

/** Returns the table id. */
abstract ValueProvider<String> getTableId();

Expand Down Expand Up @@ -69,6 +72,8 @@ static BigtableReadOptions.Builder builder() {
@AutoValue.Builder
abstract static class Builder {

abstract Builder setExperimentalSkipLargeRows(Boolean experimentalSkipLargeRows);

abstract Builder setTableId(ValueProvider<String> tableId);

abstract Builder setRowFilter(ValueProvider<RowFilter> rowFilter);
Expand All @@ -86,6 +91,10 @@ abstract static class Builder {
abstract BigtableReadOptions build();
}

BigtableReadOptions setExperimentalSkipLargeRows(@Nullable Boolean experimentalSkipLargeRows) {
return toBuilder().setExperimentalSkipLargeRows(experimentalSkipLargeRows).build();
}

BigtableReadOptions setMaxBufferElementCount(@Nullable Integer maxBufferElementCount) {
return toBuilder().setMaxBufferElementCount(maxBufferElementCount).build();
}
Expand Down Expand Up @@ -116,7 +125,10 @@ void populateDisplayData(DisplayData.Builder builder) {
.withLabel("Read Attempt Timeout"))
.addIfNotNull(
DisplayData.item("operationTimeout", getOperationTimeout())
.withLabel("Read Operation Timeout"));
.withLabel("Read Operation Timeout"))
.addIfNotNull(
DisplayData.item("experimentalSkipLargeRows", getExperimentalSkipLargeRows())
.withLabel("Skip Large Rows"));
}

void validate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,100 @@ public BigtableWriterImpl openForWriting(BigtableWriteOptions writeOptions) {
writeOptions.getCloseWaitTimeout());
}

@VisibleForTesting
static class BigtableReaderWithExperimentalOptions implements Reader {
private final BigtableDataClient client;

private final String projectId;
private final String instanceId;
private final String tableId;

private final List<ByteKeyRange> ranges;
private final RowFilter rowFilter;
private Iterator<Row> results;

private Row currentRow;

private ServerStream<Row> stream;

private boolean exhausted;

@VisibleForTesting
BigtableReaderWithExperimentalOptions(
BigtableDataClient client,
String projectId,
String instanceId,
String tableId,
List<ByteKeyRange> ranges,
@Nullable RowFilter rowFilter) {
this.client = client;
this.projectId = projectId;
this.instanceId = instanceId;
this.tableId = tableId;
this.ranges = ranges;
this.rowFilter = rowFilter;
}

@Override
public boolean start() throws IOException {
ServiceCallMetric serviceCallMetric = createCallMetric(projectId, instanceId, tableId);

Query query = Query.create(tableId);
for (ByteKeyRange sourceRange : ranges) {
query.range(
ByteString.copyFrom(sourceRange.getStartKey().getValue()),
ByteString.copyFrom(sourceRange.getEndKey().getValue()));
}

if (rowFilter != null) {
query.filter(Filters.FILTERS.fromProto(rowFilter));
}
try {
stream =
client
.skipLargeRowsCallable(new BigtableRowProtoAdapter())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forget, does this throw an exception with the large rows, or just silently swallow them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it swallows them & returns the next non-large row

.call(query, GrpcCallContext.createDefault());
results = stream.iterator();
serviceCallMetric.call("ok");
} catch (StatusRuntimeException e) {
serviceCallMetric.call(e.getStatus().getCode().toString());
throw e;
}
return advance();
}

@Override
public boolean advance() throws IOException {
if (results.hasNext()) {
currentRow = results.next();
return true;
}
exhausted = true;
return false;
}

@Override
public Row getCurrentRow() throws NoSuchElementException {
if (currentRow == null) {
throw new NoSuchElementException();
}
return currentRow;
}

@Override
public void close() {
if (!exhausted) {
stream.cancel();
exhausted = true;
}
}

@Override
public void reportLineage() {
Lineage.getSources().add("bigtable", ImmutableList.of(projectId, instanceId, tableId));
}
}

@VisibleForTesting
static class BigtableReaderImpl implements Reader {
private final BigtableDataClient client;
Expand Down Expand Up @@ -660,6 +754,14 @@ public Reader createReader(BigtableSource source) throws IOException {
source.getRanges(),
source.getRowFilter(),
source.getMaxBufferElementCount());
} else if (source.getReadOptions().getExperimentalSkipLargeRows()) {
return new BigtableReaderWithExperimentalOptions(
client,
projectId,
instanceId,
source.getTableId().get(),
source.getRanges(),
source.getRowFilter());
} else {
return new BigtableReaderImpl(
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,27 @@ public void testReadBuildsCorrectly() {
assertEquals(PORT_CONFIGURATOR, read.getBigtableConfig().getBigtableOptionsConfigurator());
}

@Test
public void testReadWithSkippingLargeRows() {
BigtableIO.Read read =
BigtableIO.read()
.withBigtableOptions(BIGTABLE_OPTIONS)
.withExperimentSkipLargeRows(true)
.withTableId("table")
.withInstanceId("instance")
.withProjectId("project")
.withAppProfileId("app-profile")
.withBigtableOptionsConfigurator(PORT_CONFIGURATOR);
assertEquals("options_project", read.getBigtableOptions().getProjectId());
assertEquals("options_instance", read.getBigtableOptions().getInstanceId());
assertEquals("instance", read.getBigtableConfig().getInstanceId().get());
assertEquals("project", read.getBigtableConfig().getProjectId().get());
assertEquals("app-profile", read.getBigtableConfig().getAppProfileId().get());
assertEquals("table", read.getTableId());
assertEquals(true, read.getBigtableReadOptions().getExperimentalSkipLargeRows());
assertEquals(PORT_CONFIGURATOR, read.getBigtableConfig().getBigtableOptionsConfigurator());
}

@Test
public void testReadValidationFailsMissingTable() {
BigtableIO.Read read = BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,65 @@ public void testRead() throws IOException {
verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
}

/**
* This test ensures that protobuf creation and interactions with {@link BigtableDataClient} work
* as expected.
*
* @throws IOException
* @throws InterruptedException
*/
@Test
@SuppressWarnings("unchecked")
public void testReadWithSkippingLargeRows() throws IOException {
ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
ByteKey end = ByteKey.copyFrom("d".getBytes(StandardCharsets.UTF_8));
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start, end)));
when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));

Row expectedRowA = Row.newBuilder().setKey(ByteString.copyFromUtf8("a")).build();
// b is large row that's skipped
// Row expectedRowB = Row.newBuilder().setKey(ByteString.copyFromUtf8("c")).build();
Row expectedRowC = Row.newBuilder().setKey(ByteString.copyFromUtf8("c")).build();

// Set up iterator to be returned by ServerStream.iterator()
Iterator<Row> mockIterator = Mockito.mock(Iterator.class);
when(mockIterator.next()).thenReturn(expectedRowA).thenReturn(expectedRowC).thenReturn(null);
when(mockIterator.hasNext()).thenReturn(true).thenReturn(true).thenReturn(false);
// Set up ServerStream to be returned by callable.call(Query)
ServerStream<Row> mockRows = Mockito.mock(ServerStream.class);
when(mockRows.iterator()).thenReturn(mockIterator);
// Set up Callable to be returned by stub.createReadRowsCallable()
ServerStreamingCallable<Query, Row> mockCallable = Mockito.mock(ServerStreamingCallable.class);
when(mockCallable.call(
any(com.google.cloud.bigtable.data.v2.models.Query.class), any(ApiCallContext.class)))
.thenReturn(mockRows);
when(mockStub.createSkipLargeRowsCallable(any(RowAdapter.class))).thenReturn(mockCallable);
ServerStreamingCallable<Query, Row> callable =
mockStub.createSkipLargeRowsCallable(new BigtableServiceImpl.BigtableRowProtoAdapter());

when(mockBigtableDataClient.skipLargeRowsCallable(any(RowAdapter.class))).thenReturn(callable);

BigtableService.Reader underTest =
new BigtableServiceImpl.BigtableReaderWithExperimentalOptions(
mockBigtableDataClient,
bigtableDataSettings.getProjectId(),
bigtableDataSettings.getInstanceId(),
mockBigtableSource.getTableId().get(),
mockBigtableSource.getRanges(),
null);

int rowCount = 0;
underTest.start();
Assert.assertEquals(expectedRowA, underTest.getCurrentRow());
rowCount++;
underTest.advance();
Assert.assertEquals(expectedRowC, underTest.getCurrentRow());
rowCount++;
Assert.assertFalse(underTest.advance());
Assert.assertEquals(2, rowCount);
verify(mockIterator, times(2)).next();
}

/**
* This test ensures that protobuf creation and interactions with {@link BigtableDataClient} work
* as expected. This test checks that a single row is returned from the future.
Expand Down
Loading