feat:large-row-skip-in-bigtable | added experimental options to skip …#34245
feat:large-row-skip-in-bigtable | added experimental options to skip …#34245sarthakbhutani wants to merge 6 commits intoapache:masterfrom
Conversation
…large rows while reading from bigtable
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
...a/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
Outdated
Show resolved
Hide resolved
...a/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
Show resolved
Hide resolved
...a/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
Show resolved
Hide resolved
| try { | ||
| stream = | ||
| client | ||
| .skipLargeRowsCallable(new BigtableRowProtoAdapter()) |
There was a problem hiding this comment.
I forget, does this throw an exception with the large rows, or just silently swallow them?
There was a problem hiding this comment.
it swallows them & returns the next non-large row
|
Retest this please |
|
Assigning reviewers. If you would like to opt out of this review, comment R: @Abacn for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
...a/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
Show resolved
Hide resolved
| if (bigtableReadOptions != null | ||
| && Boolean.TRUE.equals(bigtableReadOptions.getExperimentalSkipLargeRows())) { | ||
| stream = | ||
| client | ||
| .skipLargeRowsCallable(new BigtableRowProtoAdapter()) | ||
| .call(query, GrpcCallContext.createDefault()); | ||
| } else { | ||
| stream = | ||
| client | ||
| .readRowsCallable(new BigtableRowProtoAdapter()) | ||
| .call(query, GrpcCallContext.createDefault()); | ||
| } |
There was a problem hiding this comment.
Resolving of the feature should be done during pipeline construction not during execution
Also you should factor out the common code:
readRowsCallable = client.skipLargeRowsCallable(new BigtableRowProtoAdapter())
if (isLargeRowSkippingEnabled) {
readRowsCallable = client.readRowsCallable(new BigtableRowProtoAdapter());
}
readRowsCallable.call(...There was a problem hiding this comment.
-
regarding code suggestion -
instead of having if-else,
do we want to override the code later? -
Resolving of the feature should be done during pipeline construction not during execution
initially, i had made this into a separate reader class itself.
since this was a experimental option, It came out in the discussion that we dont want the overhead of maintaining this as a separate reader for new changes.
and I had to revert these changes - 50f7924
There was a problem hiding this comment.
I think you can pass in a unary callable when creating the BigtableReaderImpl instead of the client:
There was a problem hiding this comment.
No, I meant keep the unary callable where it is, but resolve unset value in the settings. ie this logic:
bigtableReadOptions != null
&& Boolean.TRUE.equals(bigtableReadOptions.getExperimentalSkipLargeRows()
What unset means should be resolved during pipeline construction
| new BigtableOptions.Builder().setProjectId(project).setInstanceId(options.getInstanceId()); | ||
|
|
||
| final String tableId = "BigtableReadTest"; | ||
| final long numRows = 1000L; |
There was a problem hiding this comment.
how does this test large row skipping?
There was a problem hiding this comment.
this doesn't.
the main logic lies in the java-client. Apache beam implementation is only a wrapper to call that implementation. I couldn't figure out - how to test the large row skipping in a IT here - it's already being done in the java-client.
it came out in our discussion earlier, that we need a data integrity check where no data loss should happen.
hence, this is a check for data integrity - that if there isn't a large row, the feature still works as expected - reading all the rows.
There was a problem hiding this comment.
I think you can do a bit better here
testE2EBigtableReadWithSkippingLargeRows() {
//...
// add an error injector to trigger large row logic
ExperimentalOptions.addExperiment("bigtable_settings_override", InterceptorInjector.class.getName());
//...
}
static class LargeRowErrorInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
private boolean artificiallyClosed = false;
private int numMsgs = 0;
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>() {
@Override
public void onMessage(RespT message) {
if (++numMsgs > 10) {
artificiallyClosed = true;
delegate().onClose(
Status.WHATEVER_ERROR_TRIGGERS_PAGING,
new Metadata()
);
return;
}
super.onMessage(message);
}
@Override
public void onClose(Status status, Metadata trailers) {
if (!artificiallyClosed) {
super.onClose(status, trailers);
}
}
}, headers);
}
};
}
}
public static class InterceptorInjector implements BiFunction<BigtableDataSettings.Builder, PipelineOptions,
BigtableDataSettings.Builder> {
@Override
public BigtableDataSettings.Builder apply(BigtableDataSettings.Builder builder, PipelineOptions pipelineOptions) {
InstantiatingGrpcChannelProvider.Builder transportChannelProvider = ((InstantiatingGrpcChannelProvider) builder.stubSettings()
.getTransportChannelProvider())
.toBuilder();
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> oldConf = transportChannelProvider.getChannelConfigurator();
transportChannelProvider.setChannelConfigurator(b -> {
if (oldConf!=null) {
b = oldConf.apply(b);
}
return b.intercept(new LargeRowErrorInterceptor());
});
return null;
}
}|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @robertwb for label java. Available commands:
|
|
Reminder, please take a look at this pr: @robertwb @johnjcasey |
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @Abacn for label java. Available commands:
|
|
I was occupied with other work, hence, couldn't take this up earlier. |
|
Reminder, please take a look at this pr: @Abacn @jrmccluskey |
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @robertwb for label java. Available commands:
|
|
Reminder, please take a look at this pr: @robertwb @kennknowles |
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @Abacn for label java. Available commands:
|
|
stop reviewer notification |
|
waiting on author |
|
waiting on author |
|
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |

…large rows while reading from bigtable
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.