Skip to content

Commit cd7f82e

Browse files
feat: skip large rows (#2482)
Tasks remaining - - [ ] make changes in the read request API which skips for large rows/internally calls readLargeRowsCallable() - [ ] expose large rows rowkeys in sidechannel/dlq/some other method - which can be exposed to client Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) - [ ] Rollback plan is reviewed and LGTMed - [ ] All new data plane features have a completed end to end testing plan Fixes #<issue_number_goes_here> ☕️ If you write sample code, please follow the [samples format]( https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
1 parent 0a1fbb4 commit cd7f82e

File tree

9 files changed

+1009
-1
lines changed

9 files changed

+1009
-1
lines changed

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1279,6 +1279,53 @@ public ServerStreamingCallable<Query, Row> readRowsCallable() {
12791279
return stub.readRowsCallable();
12801280
}
12811281

1282+
/**
1283+
* Streams back the results of the read query & omits large rows. The returned callable object
1284+
* allows for customization of api invocation.
1285+
*
1286+
* <p>Sample code:
1287+
*
1288+
* <pre>{@code
1289+
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
1290+
* String tableId = "[TABLE]";
1291+
*
1292+
* Query query = Query.create(tableId)
1293+
* .range("[START KEY]", "[END KEY]")
1294+
* .filter(FILTERS.qualifier().regex("[COLUMN PREFIX].*"));
1295+
*
1296+
* // Iterator style
1297+
* try {
1298+
* for(Row row : bigtableDataClient.skipLargeRowsCallable().call(query)) {
1299+
* // Do something with row
1300+
* }
1301+
* } catch (NotFoundException e) {
1302+
* System.out.println("Tried to read a non-existent table");
1303+
* } catch (RuntimeException e) {
1304+
* e.printStackTrace();
1305+
* }
1306+
*
1307+
* // Sync style
1308+
* try {
1309+
* List<Row> rows = bigtableDataClient.skipLargeRowsCallable().all().call(query);
1310+
* } catch (NotFoundException e) {
1311+
* System.out.println("Tried to read a non-existent table");
1312+
* } catch (RuntimeException e) {
1313+
* e.printStackTrace();
1314+
* }
1315+
*
1316+
* // etc
1317+
* }
1318+
* }</pre>
1319+
*
1320+
* @see ServerStreamingCallable For call styles.
1321+
* @see Query For query options.
1322+
* @see com.google.cloud.bigtable.data.v2.models.Filters For the filter building DSL.
1323+
*/
1324+
@InternalApi("only to be used by Bigtable beam connector")
1325+
public ServerStreamingCallable<Query, Row> skipLargeRowsCallable() {
1326+
return stub.skipLargeRowsCallable();
1327+
}
1328+
12821329
/**
12831330
* Streams back the results of the query. This callable allows for customization of the logical
12841331
* representation of a row. It's meant for advanced use cases.

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,36 @@
4949
public final class RowSetUtil {
5050
private RowSetUtil() {}
5151

52+
/** Removes the {@code #excludePoint} rowkey from the {@code RowSet} */
53+
public static RowSet eraseLargeRow(RowSet rowSet, ByteString excludePoint) {
54+
55+
RowSet.Builder newRowSet = RowSet.newBuilder();
56+
57+
if (rowSet.getRowKeysList().isEmpty() && rowSet.getRowRangesList().isEmpty()) {
58+
// querying range (, excludePoint) and (excludePoint, )
59+
newRowSet.addRowRanges(RowRange.newBuilder().setEndKeyOpen(excludePoint).build());
60+
newRowSet.addRowRanges(RowRange.newBuilder().setStartKeyOpen(excludePoint).build());
61+
}
62+
63+
// remove large row key from point reads
64+
rowSet.getRowKeysList().stream()
65+
.filter(k -> !k.equals(excludePoint))
66+
.forEach(newRowSet::addRowKeys);
67+
68+
// Handle ranges
69+
for (RowRange rowRange : rowSet.getRowRangesList()) {
70+
List<RowRange> afterSplit = splitOnLargeRowKey(rowRange, excludePoint);
71+
if (afterSplit != null && !afterSplit.isEmpty()) {
72+
afterSplit.forEach(newRowSet::addRowRanges);
73+
}
74+
}
75+
76+
if (newRowSet.getRowKeysList().isEmpty() && newRowSet.getRowRangesList().isEmpty()) {
77+
return null;
78+
}
79+
return newRowSet.build();
80+
}
81+
5282
/**
5383
* Removes all the keys and range parts that fall on or before the splitPoint.
5484
*
@@ -125,6 +155,40 @@ private static RowRange truncateRange(RowRange range, ByteString split, boolean
125155
return newRange.build();
126156
}
127157

158+
/** This method erases the {@code #split} key from the range */
159+
private static List<RowRange> splitOnLargeRowKey(RowRange range, ByteString largeRowKey) {
160+
List<RowRange> rowRanges = new ArrayList<>();
161+
162+
ByteString startKey = StartPoint.extract(range).value;
163+
ByteString endKey = EndPoint.extract(range).value;
164+
165+
// if end key is on the left of large row key, don't split
166+
if (ByteStringComparator.INSTANCE.compare(endKey, largeRowKey) < 0) {
167+
rowRanges.add(range);
168+
return rowRanges;
169+
}
170+
171+
// if start key is on the right of the large row key, don't split
172+
if (ByteStringComparator.INSTANCE.compare(startKey, largeRowKey) > 0) {
173+
rowRanges.add(range);
174+
return rowRanges;
175+
}
176+
177+
// if start key is on the left of the large row key, set the end key to be large row key open
178+
if (ByteStringComparator.INSTANCE.compare(startKey, largeRowKey) < 0) {
179+
RowRange beforeSplit = range.toBuilder().setEndKeyOpen(largeRowKey).build();
180+
rowRanges.add(beforeSplit);
181+
}
182+
183+
// if the end key is on the right of the large row key, set the start key to be large row key
184+
// open
185+
if (ByteStringComparator.INSTANCE.compare(endKey, largeRowKey) > 0) {
186+
RowRange afterSplit = range.toBuilder().setStartKeyOpen(largeRowKey).build();
187+
rowRanges.add(afterSplit);
188+
}
189+
return rowRanges;
190+
}
191+
128192
/**
129193
* Splits the provided {@link RowSet} into segments partitioned by the provided {@code
130194
* splitPoints}. The split points will be treated as start keys of the segments. The primary

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsPartialErrorRetryAlgorithm;
111111
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
112112
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
113+
import com.google.cloud.bigtable.data.v2.stub.readrows.LargeReadRowsResumptionStrategy;
113114
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
114115
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsFirstCallable;
115116
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
@@ -176,6 +177,9 @@ public class EnhancedBigtableStub implements AutoCloseable {
176177
private final DynamicFlowControlStats bulkMutationDynamicFlowControlStats;
177178

178179
private final ServerStreamingCallable<Query, Row> readRowsCallable;
180+
181+
private final ServerStreamingCallable<Query, Row> skipLargeRowsCallable;
182+
179183
private final UnaryCallable<Query, Row> readRowCallable;
180184
private final UnaryCallable<Query, List<Row>> bulkReadRowsCallable;
181185
@Deprecated private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
@@ -304,6 +308,7 @@ public EnhancedBigtableStub(
304308
this.bulkMutationDynamicFlowControlStats = new DynamicFlowControlStats();
305309

306310
readRowsCallable = createReadRowsCallable(new DefaultRowAdapter());
311+
skipLargeRowsCallable = createSkipLargeRowsCallable(new DefaultRowAdapter());
307312
readRowCallable = createReadRowCallable(new DefaultRowAdapter());
308313
bulkReadRowsCallable = createBulkReadRowsCallable(new DefaultRowAdapter());
309314
sampleRowKeysCallable = createSampleRowKeysCallable();
@@ -445,6 +450,7 @@ private <ReqT, RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRo
445450
return createReadRowsBaseCallable(
446451
readRowsSettings, rowAdapter, new ReadRowsResumptionStrategy<RowT>(rowAdapter));
447452
}
453+
448454
/**
449455
* Creates a callable chain to handle ReadRows RPCs. The chain will:
450456
*
@@ -515,6 +521,96 @@ private <ReqT, RowT> ServerStreamingCallable<ReadRowsRequest, RowT> createReadRo
515521
return new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
516522
}
517523

524+
/**
525+
* Creates a callable chain to handle streaming ReadRows RPCs. This chain skips the large rows
526+
* internally. The chain will:
527+
*
528+
* <ul>
529+
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest}.
530+
* <li>Dispatch the RPC with {@link ReadRowsRequest}.
531+
* <li>Upon receiving the response stream, it will merge the {@link
532+
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
533+
* implementation can be configured in by the {@code rowAdapter} parameter.
534+
* <li>Add bigtable tracer for tracking bigtable specific metrics.
535+
* <li>Retry/resume on failure (retries for retryable error codes, connection errors and skip
536+
* large row keys)
537+
* <li>Filter out marker rows.
538+
* <li>Add tracing & metrics.
539+
* </ul>
540+
*/
541+
private <ReqT, RowT> ServerStreamingCallable<Query, RowT> createSkipLargeRowsCallable(
542+
RowAdapter<RowT> rowAdapter) {
543+
544+
ServerStreamingCallSettings<ReqT, Row> readRowsSettings =
545+
(ServerStreamingCallSettings<ReqT, Row>) settings.readRowsSettings();
546+
547+
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> base =
548+
GrpcRawCallableFactory.createServerStreamingCallable(
549+
GrpcCallSettings.<ReadRowsRequest, ReadRowsResponse>newBuilder()
550+
.setMethodDescriptor(BigtableGrpc.getReadRowsMethod())
551+
.setParamsExtractor(
552+
r ->
553+
composeRequestParams(
554+
r.getAppProfileId(), r.getTableName(), r.getAuthorizedViewName()))
555+
.build(),
556+
readRowsSettings.getRetryableCodes());
557+
558+
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> withStatsHeaders =
559+
new StatsHeadersServerStreamingCallable<>(base);
560+
561+
// Sometimes ReadRows connections are disconnected via an RST frame. This error is transient and
562+
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
563+
// which by default is not retryable. Convert the exception so it can be retried in the client.
564+
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> convertException =
565+
new ConvertExceptionCallable<>(withStatsHeaders);
566+
567+
ServerStreamingCallable<ReadRowsRequest, RowT> merging =
568+
new RowMergingCallable<>(convertException, rowAdapter);
569+
570+
// Copy settings for the middle ReadRowsRequest -> RowT callable (as opposed to the inner
571+
// ReadRowsRequest -> ReadRowsResponse callable).
572+
// We override the resumption strategy to use LargeReadRowsResumptionStrategy here (which skips
573+
// the large rows) instead of ReadRowResumptionStrategy
574+
ServerStreamingCallSettings<ReadRowsRequest, RowT> innerSettings =
575+
ServerStreamingCallSettings.<ReadRowsRequest, RowT>newBuilder()
576+
.setResumptionStrategy(new LargeReadRowsResumptionStrategy<>(rowAdapter))
577+
.setRetryableCodes(readRowsSettings.getRetryableCodes())
578+
.setRetrySettings(readRowsSettings.getRetrySettings())
579+
.setIdleTimeout(readRowsSettings.getIdleTimeout())
580+
.setWaitTimeout(readRowsSettings.getWaitTimeout())
581+
.build();
582+
583+
ServerStreamingCallable<ReadRowsRequest, RowT> watched =
584+
Callables.watched(merging, innerSettings, clientContext);
585+
586+
ServerStreamingCallable<ReadRowsRequest, RowT> withBigtableTracer =
587+
new BigtableTracerStreamingCallable<>(watched);
588+
589+
// Retry logic is split into 2 parts to workaround a rare edge case described in
590+
// ReadRowsRetryCompletedCallable
591+
ServerStreamingCallable<ReadRowsRequest, RowT> retrying1 =
592+
new ReadRowsRetryCompletedCallable<>(withBigtableTracer);
593+
594+
ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
595+
largeRowWithRetries(retrying1, innerSettings);
596+
597+
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
598+
new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
599+
600+
ServerStreamingCallable<Query, RowT> readRowsUserCallable =
601+
new ReadRowsUserCallable<>(readRowsCallable, requestContext);
602+
603+
SpanName span = getSpanName("ReadRows");
604+
ServerStreamingCallable<Query, RowT> traced =
605+
new TracedServerStreamingCallable<>(
606+
readRowsUserCallable, clientContext.getTracerFactory(), span);
607+
608+
return traced.withDefaultCallContext(
609+
clientContext
610+
.getDefaultCallContext()
611+
.withRetrySettings(readRowsSettings.getRetrySettings()));
612+
}
613+
518614
/**
519615
* Creates a callable chain to handle bulk ReadRows RPCs. This is meant to be used in ReadRows
520616
* batcher. The chain will:
@@ -1282,6 +1378,22 @@ private <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT> withR
12821378
return retrying;
12831379
}
12841380

1381+
private <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT> largeRowWithRetries(
1382+
ServerStreamingCallable<RequestT, ResponseT> innerCallable,
1383+
ServerStreamingCallSettings<RequestT, ResponseT> serverStreamingCallSettings) {
1384+
1385+
// Retrying algorithm in retryingForLargeRows also takes RetryInfo into consideration, so we
1386+
// skip the check for settings.getEnableRetryInfo here
1387+
ServerStreamingCallable<RequestT, ResponseT> retrying;
1388+
retrying =
1389+
com.google.cloud.bigtable.gaxx.retrying.Callables.retryingForLargeRows(
1390+
innerCallable, serverStreamingCallSettings, clientContext);
1391+
if (settings.getEnableRoutingCookie()) {
1392+
return new CookiesServerStreamingCallable<>(retrying);
1393+
}
1394+
return retrying;
1395+
}
1396+
12851397
// </editor-fold>
12861398

12871399
// <editor-fold desc="Callable accessors">
@@ -1290,6 +1402,11 @@ public ServerStreamingCallable<Query, Row> readRowsCallable() {
12901402
return readRowsCallable;
12911403
}
12921404

1405+
/** Returns a streaming read rows callable that skips large rows */
1406+
public ServerStreamingCallable<Query, Row> skipLargeRowsCallable() {
1407+
return skipLargeRowsCallable;
1408+
}
1409+
12931410
/** Return a point read callable */
12941411
public UnaryCallable<Query, Row> readRowCallable() {
12951412
return readRowCallable;

0 commit comments

Comments
 (0)