Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,24 @@ public Read withMaxBufferElementCount(@Nullable Integer maxBufferElementCount) {
.build();
}

/**
* Returns a new {@link BigtableIO.Read} that will skip the large rows while reading. This
* function will switch the base BigtableIO.Reader class to using the
* BigtableReaderWithExperimentalOptions. If
*
* <p>Does not modify this object.
*
* <p>This is incompatible with withMaxBufferElementCount()
*/
public Read withExperimentalSkipLargeRows(@Nullable Boolean skipLargeRows) {
BigtableReadOptions bigtableReadOptions = getBigtableReadOptions();

return toBuilder()
.setBigtableReadOptions(
bigtableReadOptions.toBuilder().setExperimentalSkipLargeRows(skipLargeRows).build())
.build();
}

/**
* Returns a new {@link BigtableIO.Read} that will read only rows in the specified range.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
})
abstract class BigtableReadOptions implements Serializable {

/** Returns the experimental skip large rows option value. */
abstract @Nullable Boolean getExperimentalSkipLargeRows();

/** Returns the table id. */
abstract ValueProvider<String> getTableId();

Expand Down Expand Up @@ -69,6 +72,8 @@ static BigtableReadOptions.Builder builder() {
@AutoValue.Builder
abstract static class Builder {

abstract Builder setExperimentalSkipLargeRows(Boolean experimentalSkipLargeRows);

abstract Builder setTableId(ValueProvider<String> tableId);

abstract Builder setRowFilter(ValueProvider<RowFilter> rowFilter);
Expand All @@ -86,6 +91,10 @@ abstract static class Builder {
abstract BigtableReadOptions build();
}

BigtableReadOptions setExperimentalSkipLargeRows(@Nullable Boolean experimentalSkipLargeRows) {
return toBuilder().setExperimentalSkipLargeRows(experimentalSkipLargeRows).build();
}

BigtableReadOptions setMaxBufferElementCount(@Nullable Integer maxBufferElementCount) {
return toBuilder().setMaxBufferElementCount(maxBufferElementCount).build();
}
Expand Down Expand Up @@ -116,7 +125,10 @@ void populateDisplayData(DisplayData.Builder builder) {
.withLabel("Read Attempt Timeout"))
.addIfNotNull(
DisplayData.item("operationTimeout", getOperationTimeout())
.withLabel("Read Operation Timeout"));
.withLabel("Read Operation Timeout"))
.addIfNotNull(
DisplayData.item("experimentalSkipLargeRows", getExperimentalSkipLargeRows())
.withLabel("Skip Large Rows"));
}

void validate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ static class BigtableReaderImpl implements Reader {
private ServerStream<Row> stream;

private boolean exhausted;
private BigtableReadOptions bigtableReadOptions;

@VisibleForTesting
BigtableReaderImpl(
Expand All @@ -150,13 +151,15 @@ static class BigtableReaderImpl implements Reader {
String instanceId,
String tableId,
List<ByteKeyRange> ranges,
@Nullable RowFilter rowFilter) {
@Nullable RowFilter rowFilter,
BigtableReadOptions bigtableReadOptions) {
this.client = client;
this.projectId = projectId;
this.instanceId = instanceId;
this.tableId = tableId;
this.ranges = ranges;
this.rowFilter = rowFilter;
this.bigtableReadOptions = bigtableReadOptions;
}

@Override
Expand All @@ -174,10 +177,18 @@ public boolean start() throws IOException {
query.filter(Filters.FILTERS.fromProto(rowFilter));
}
try {
stream =
client
.readRowsCallable(new BigtableRowProtoAdapter())
.call(query, GrpcCallContext.createDefault());
if (bigtableReadOptions != null
&& Boolean.TRUE.equals(bigtableReadOptions.getExperimentalSkipLargeRows())) {
stream =
client
.skipLargeRowsCallable(new BigtableRowProtoAdapter())
.call(query, GrpcCallContext.createDefault());
} else {
stream =
client
.readRowsCallable(new BigtableRowProtoAdapter())
.call(query, GrpcCallContext.createDefault());
}
Comment on lines +180 to +191
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolving of the feature should be done during pipeline construction not during execution

Also you should factor out the common code:

readRowsCallable = client.skipLargeRowsCallable(new BigtableRowProtoAdapter())
if (isLargeRowSkippingEnabled) {
  readRowsCallable = client.readRowsCallable(new BigtableRowProtoAdapter());
}
readRowsCallable.call(...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. regarding code suggestion -
    instead of having if-else,
    do we want to override the code later?

  2. Resolving of the feature should be done during pipeline construction not during execution
    initially, i had made this into a separate reader class itself.
    since this was a experimental option, It came out in the discussion that we dont want the overhead of maintaining this as a separate reader for new changes.

and I had to revert these changes - 50f7924

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can pass in a unary callable when creating the BigtableReaderImpl instead of the client:

public Reader createReader(BigtableSource source) throws IOException {
if (source.getMaxBufferElementCount() != null) {
return BigtableSegmentReaderImpl.create(
client,
projectId,
instanceId,
source.getTableId().get(),
source.getRanges(),
source.getRowFilter(),
source.getMaxBufferElementCount());
} else {
return new BigtableReaderImpl(
client,
projectId,
instanceId,
source.getTableId().get(),
source.getRanges(),
source.getRowFilter());
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I meant keep the unary callable where it is, but resolve unset value in the settings. ie this logic:

bigtableReadOptions != null
            && Boolean.TRUE.equals(bigtableReadOptions.getExperimentalSkipLargeRows()

What unset means should be resolved during pipeline construction

results = stream.iterator();
serviceCallMetric.call("ok");
} catch (StatusRuntimeException e) {
Expand Down Expand Up @@ -667,7 +678,8 @@ public Reader createReader(BigtableSource source) throws IOException {
instanceId,
source.getTableId().get(),
source.getRanges(),
source.getRowFilter());
source.getRowFilter(),
source.getReadOptions());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,27 @@ public void testReadBuildsCorrectly() {
assertEquals(PORT_CONFIGURATOR, read.getBigtableConfig().getBigtableOptionsConfigurator());
}

@Test
public void testReadWithSkippingLargeRows() {
BigtableIO.Read read =
BigtableIO.read()
.withBigtableOptions(BIGTABLE_OPTIONS)
.withExperimentalSkipLargeRows(true)
.withTableId("table")
.withInstanceId("instance")
.withProjectId("project")
.withAppProfileId("app-profile")
.withBigtableOptionsConfigurator(PORT_CONFIGURATOR);
assertEquals("options_project", read.getBigtableOptions().getProjectId());
assertEquals("options_instance", read.getBigtableOptions().getInstanceId());
assertEquals("instance", read.getBigtableConfig().getInstanceId().get());
assertEquals("project", read.getBigtableConfig().getProjectId().get());
assertEquals("app-profile", read.getBigtableConfig().getAppProfileId().get());
assertEquals("table", read.getTableId());
assertEquals(true, read.getBigtableReadOptions().getExperimentalSkipLargeRows());
assertEquals(PORT_CONFIGURATOR, read.getBigtableConfig().getBigtableOptionsConfigurator());
}

@Test
public void testReadValidationFailsMissingTable() {
BigtableIO.Read read = BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,27 @@ public void testE2EBigtableRead() {
checkLineageSourceMetric(r, tableId);
}

@Test
public void testE2EBigtableReadWithSkippingLargeRows() {
BigtableOptions.Builder bigtableOptionsBuilder =
new BigtableOptions.Builder().setProjectId(project).setInstanceId(options.getInstanceId());

final String tableId = "BigtableReadTest";
final long numRows = 1000L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this test large row skipping?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't.
the main logic lies in the java-client. Apache beam implementation is only a wrapper to call that implementation. I couldn't figure out - how to test the large row skipping in a IT here - it's already being done in the java-client.

it came out in our discussion earlier, that we need a data integrity check where no data loss should happen.
hence, this is a check for data integrity - that if there isn't a large row, the feature still works as expected - reading all the rows.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can do a bit better here

testE2EBigtableReadWithSkippingLargeRows() {
   //...
   // add an error injector to trigger large row logic
   ExperimentalOptions.addExperiment("bigtable_settings_override", InterceptorInjector.class.getName());
  //...
}
static class LargeRowErrorInterceptor implements ClientInterceptor {
        @Override
        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
            return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
                private boolean artificiallyClosed = false;
                private int numMsgs = 0;

                @Override
                public void start(Listener<RespT> responseListener, Metadata headers) {
                    super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>() {
                        @Override
                        public void onMessage(RespT message) {
                            if (++numMsgs > 10) {
                                artificiallyClosed = true;
                                delegate().onClose(
                                        Status.WHATEVER_ERROR_TRIGGERS_PAGING,
                                        new Metadata()
                                );
                                return;
                            }
                            super.onMessage(message);
                        }

                        @Override
                        public void onClose(Status status, Metadata trailers) {
                            if (!artificiallyClosed) {
                                super.onClose(status, trailers);
                            }
                        }
                    }, headers);
                }
            };
        }
    }
    public static class InterceptorInjector implements BiFunction<BigtableDataSettings.Builder, PipelineOptions,
            BigtableDataSettings.Builder> {
        @Override
        public BigtableDataSettings.Builder apply(BigtableDataSettings.Builder builder, PipelineOptions pipelineOptions) {
            InstantiatingGrpcChannelProvider.Builder transportChannelProvider = ((InstantiatingGrpcChannelProvider) builder.stubSettings()
                    .getTransportChannelProvider())
                            .toBuilder();
            ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> oldConf = transportChannelProvider.getChannelConfigurator();
            transportChannelProvider.setChannelConfigurator(b -> {
                if (oldConf!=null) {
                    b = oldConf.apply(b);
                }
                return b.intercept(new LargeRowErrorInterceptor());
            });
            return null;
        }
    }


Pipeline p = Pipeline.create(options);
PCollection<Long> count =
p.apply(
BigtableIO.read()
.withBigtableOptions(bigtableOptionsBuilder)
.withTableId(tableId)
.withExperimentalSkipLargeRows(true))
.apply(Count.globally());
PAssert.thatSingleton(count).isEqualTo(numRows);
PipelineResult r = p.run();
checkLineageSourceMetric(r, tableId);
}

@Test
public void testE2EBigtableSegmentRead() {
tableAdminClient.createTable(CreateTableRequest.of(tableId).addFamily(COLUMN_FAMILY_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ public void testRead() throws IOException {
bigtableDataSettings.getInstanceId(),
mockBigtableSource.getTableId().get(),
mockBigtableSource.getRanges(),
null);
null,
BigtableReadOptions.builder().setTableId(mockBigtableSource.getTableId()).build());

underTest.start();
Assert.assertEquals(expectedRow, underTest.getCurrentRow());
Expand All @@ -188,6 +189,69 @@ public void testRead() throws IOException {
verifyMetricWasSet("google.bigtable.v2.ReadRows", "ok", 1);
}

/**
* This test ensures that protobuf creation and interactions with {@link BigtableDataClient} work
* as expected.
*
* @throws IOException
* @throws InterruptedException
*/
@Test
@SuppressWarnings("unchecked")
public void testReadWithSkippingLargeRows() throws IOException {
ByteKey start = ByteKey.copyFrom("a".getBytes(StandardCharsets.UTF_8));
ByteKey end = ByteKey.copyFrom("d".getBytes(StandardCharsets.UTF_8));
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start, end)));
when(mockBigtableSource.getTableId()).thenReturn(StaticValueProvider.of(TABLE_ID));

Row expectedRowA = Row.newBuilder().setKey(ByteString.copyFromUtf8("a")).build();
// b is large row that's skipped
// Row expectedRowB = Row.newBuilder().setKey(ByteString.copyFromUtf8("c")).build();
Row expectedRowC = Row.newBuilder().setKey(ByteString.copyFromUtf8("c")).build();

// Set up iterator to be returned by ServerStream.iterator()
Iterator<Row> mockIterator = Mockito.mock(Iterator.class);
when(mockIterator.next()).thenReturn(expectedRowA).thenReturn(expectedRowC).thenReturn(null);
when(mockIterator.hasNext()).thenReturn(true).thenReturn(true).thenReturn(false);
// Set up ServerStream to be returned by callable.call(Query)
ServerStream<Row> mockRows = Mockito.mock(ServerStream.class);
when(mockRows.iterator()).thenReturn(mockIterator);
// Set up Callable to be returned by stub.createReadRowsCallable()
ServerStreamingCallable<Query, Row> mockCallable = Mockito.mock(ServerStreamingCallable.class);
when(mockCallable.call(
any(com.google.cloud.bigtable.data.v2.models.Query.class), any(ApiCallContext.class)))
.thenReturn(mockRows);
when(mockStub.createSkipLargeRowsCallable(any(RowAdapter.class))).thenReturn(mockCallable);
ServerStreamingCallable<Query, Row> callable =
mockStub.createSkipLargeRowsCallable(new BigtableServiceImpl.BigtableRowProtoAdapter());

when(mockBigtableDataClient.skipLargeRowsCallable(any(RowAdapter.class))).thenReturn(callable);

BigtableService.Reader underTest =
new BigtableServiceImpl.BigtableReaderImpl(
mockBigtableDataClient,
bigtableDataSettings.getProjectId(),
bigtableDataSettings.getInstanceId(),
mockBigtableSource.getTableId().get(),
mockBigtableSource.getRanges(),
null,
BigtableReadOptions.builder()
.setExperimentalSkipLargeRows(true)
.setTableId(mockBigtableSource.getTableId())
.build());

int rowCount = 0;
underTest.start();
Assert.assertEquals(expectedRowA, underTest.getCurrentRow());
rowCount++;
underTest.advance();
Assert.assertEquals(expectedRowC, underTest.getCurrentRow());
rowCount++;
Assert.assertFalse(underTest.advance());
Assert.assertEquals(2, rowCount);
verify(mockIterator, times(2)).next();
}

/**
* This test ensures that protobuf creation and interactions with {@link BigtableDataClient} work
* as expected. This test checks that a single row is returned from the future.
Expand Down
Loading