Skip to content

Commit 50c0a51

Browse files
Revert "feat: all setting timeouts for batchers + fix handling of timeouts for point reads (#861)" (#875)
This reverts commit c145ceb.
1 parent c145ceb commit 50c0a51

File tree

10 files changed

+69
-255
lines changed

10 files changed

+69
-255
lines changed

google-cloud-bigtable/clirr-ignored-differences.xml

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,4 @@
2323
<differenceType>8001</differenceType>
2424
<className>com/google/cloud/bigtable/gaxx/tracing/WrappedTracerFactory*</className>
2525
</difference>
26-
<difference>
27-
<!-- change method args is ok because EnhancedBigtableStub is InternalApi -->
28-
<differenceType>7004</differenceType>
29-
<className>com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub</className>
30-
<method>*</method>
31-
</difference>
32-
</differences>
26+
</differences>

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java

Lines changed: 3 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.google.api.core.BetaApi;
2424
import com.google.api.core.InternalApi;
2525
import com.google.api.gax.batching.Batcher;
26-
import com.google.api.gax.grpc.GrpcCallContext;
2726
import com.google.api.gax.rpc.ApiExceptions;
2827
import com.google.api.gax.rpc.ResponseObserver;
2928
import com.google.api.gax.rpc.ServerStream;
@@ -1074,40 +1073,7 @@ public void bulkMutateRows(BulkMutation mutation) {
10741073
*/
10751074
@BetaApi("This surface is likely to change as the batching surface evolves.")
10761075
public Batcher<RowMutationEntry, Void> newBulkMutationBatcher(@Nonnull String tableId) {
1077-
return newBulkMutationBatcher(tableId, null);
1078-
}
1079-
1080-
/**
1081-
* Mutates multiple rows in a batch. Each individual row is mutated atomically as in MutateRow,
1082-
* but the entire batch is not executed atomically. The returned Batcher instance is not
1083-
* threadsafe, it can only be used from single thread. This method allows customization of the
1084-
* underlying RPCs by passing in a {@link com.google.api.gax.grpc.GrpcCallContext}. The same
1085-
* context will be reused for all batches. This can be used to customize things like per attempt
1086-
* timeouts.
1087-
*
1088-
* <p>Sample Code:
1089-
*
1090-
* <pre>{@code
1091-
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
1092-
* try (Batcher<RowMutationEntry, Void> batcher = bigtableDataClient.newBulkMutationBatcher("[TABLE]", GrpcCallContext.createDefault().withTimeout(Duration.ofSeconds(10)))) {
1093-
* for (String someValue : someCollection) {
1094-
* ApiFuture<Void> entryFuture =
1095-
* batcher.add(
1096-
* RowMutationEntry.create("[ROW KEY]")
1097-
* .setCell("[FAMILY NAME]", "[QUALIFIER]", "[VALUE]"));
1098-
* }
1099-
*
1100-
* // Blocks until mutations are applied on all submitted row entries.
1101-
* batcher.flush();
1102-
* }
1103-
* // Before `batcher` is closed, all remaining(If any) mutations are applied.
1104-
* }
1105-
* }</pre>
1106-
*/
1107-
@BetaApi("This surface is likely to change as the batching surface evolves.")
1108-
public Batcher<RowMutationEntry, Void> newBulkMutationBatcher(
1109-
@Nonnull String tableId, @Nullable GrpcCallContext ctx) {
1110-
return stub.newMutateRowsBatcher(tableId, ctx);
1076+
return stub.newMutateRowsBatcher(tableId);
11111077
}
11121078

11131079
/**
@@ -1193,61 +1159,11 @@ public Batcher<ByteString, Row> newBulkReadRowsBatcher(String tableId) {
11931159
*/
11941160
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
11951161
String tableId, @Nullable Filters.Filter filter) {
1196-
return newBulkReadRowsBatcher(tableId, filter, null);
1197-
}
1198-
1199-
/**
1200-
* Reads rows for given tableId and filter criteria in a batch. If the row does not exist, the
1201-
* value will be null. The returned Batcher instance is not threadsafe, it can only be used from a
1202-
* single thread. This method allows customization of the underlying RPCs by passing in a {@link
1203-
* com.google.api.gax.grpc.GrpcCallContext}. The same context will be reused for all batches. This
1204-
* can be used to customize things like per attempt timeouts.
1205-
*
1206-
* <p>Performance notice: The ReadRows protocol requires that rows are sent in ascending key
1207-
* order, which means that the keys are processed sequentially on the server-side, so batching
1208-
* allows improving throughput but not latency. Lower latencies can be achieved by sending smaller
1209-
* requests concurrently.
1210-
*
1211-
* <p>Sample Code:
1212-
*
1213-
* <pre>{@code
1214-
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
1215-
*
1216-
* // Build the filter expression
1217-
* Filter filter = FILTERS.chain()
1218-
* .filter(FILTERS.key().regex("prefix.*"))
1219-
* .filter(FILTERS.limit().cellsPerRow(10));
1220-
*
1221-
* List<ApiFuture<Row>> rows = new ArrayList<>();
1222-
*
1223-
* try (Batcher<ByteString, Row> batcher = bigtableDataClient.newBulkReadRowsBatcher(
1224-
* "[TABLE]", filter, GrpcCallContext.createDefault().withTimeout(Duration.ofSeconds(10)))) {
1225-
* for (String someValue : someCollection) {
1226-
* ApiFuture<Row> rowFuture =
1227-
* batcher.add(ByteString.copyFromUtf8("[ROW KEY]"));
1228-
* rows.add(rowFuture);
1229-
* }
1230-
*
1231-
* // [Optional] Sends collected elements for batching asynchronously.
1232-
* batcher.sendOutstanding();
1233-
*
1234-
* // [Optional] Invokes sendOutstanding() and awaits until all pending entries are resolved.
1235-
* batcher.flush();
1236-
* }
1237-
* // batcher.close() invokes `flush()` which will in turn invoke `sendOutstanding()` with await for
1238-
* pending batches until its resolved.
1239-
*
1240-
* List<Row> actualRows = ApiFutures.allAsList(rows).get();
1241-
* }
1242-
* }</pre>
1243-
*/
1244-
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
1245-
String tableId, @Nullable Filters.Filter filter, @Nullable GrpcCallContext ctx) {
12461162
Query query = Query.create(tableId);
12471163
if (filter != null) {
1248-
query.filter(filter);
1164+
query = query.filter(filter);
12491165
}
1250-
return stub.newBulkReadRowsBatcher(query, ctx);
1166+
return stub.newBulkReadRowsBatcher(query);
12511167
}
12521168

12531169
/**

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.google.api.gax.core.BackgroundResource;
2424
import com.google.api.gax.core.FixedCredentialsProvider;
2525
import com.google.api.gax.grpc.GaxGrpcProperties;
26-
import com.google.api.gax.grpc.GrpcCallContext;
2726
import com.google.api.gax.grpc.GrpcCallSettings;
2827
import com.google.api.gax.grpc.GrpcRawCallableFactory;
2928
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
@@ -99,7 +98,6 @@
9998
import java.util.Map;
10099
import java.util.concurrent.TimeUnit;
101100
import javax.annotation.Nonnull;
102-
import javax.annotation.Nullable;
103101

104102
/**
105103
* The core client that converts method calls to RPCs.
@@ -538,15 +536,10 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
538536
* <li>Split the responses using {@link MutateRowsBatchingDescriptor}.
539537
* </ul>
540538
*/
541-
public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(
542-
@Nonnull String tableId, @Nullable GrpcCallContext ctx) {
543-
UnaryCallable<BulkMutation, Void> callable = this.bulkMutateRowsCallable;
544-
if (ctx != null) {
545-
callable = callable.withDefaultCallContext(ctx);
546-
}
539+
public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(@Nonnull String tableId) {
547540
return new BatcherImpl<>(
548541
settings.bulkMutateRowsSettings().getBatchingDescriptor(),
549-
callable,
542+
bulkMutateRowsCallable,
550543
BulkMutation.create(tableId),
551544
settings.bulkMutateRowsSettings().getBatchingSettings(),
552545
clientContext.getExecutor(),
@@ -568,16 +561,11 @@ public Batcher<RowMutationEntry, Void> newMutateRowsBatcher(
568561
* <li>Split the responses using {@link ReadRowsBatchingDescriptor}.
569562
* </ul>
570563
*/
571-
public Batcher<ByteString, Row> newBulkReadRowsBatcher(
572-
@Nonnull Query query, @Nullable GrpcCallContext ctx) {
564+
public Batcher<ByteString, Row> newBulkReadRowsBatcher(@Nonnull Query query) {
573565
Preconditions.checkNotNull(query, "query cannot be null");
574-
UnaryCallable<Query, List<Row>> callable = readRowsCallable().all();
575-
if (ctx != null) {
576-
callable = callable.withDefaultCallContext(ctx);
577-
}
578566
return new BatcherImpl<>(
579567
settings.bulkReadRowsSettings().getBatchingDescriptor(),
580-
callable,
568+
readRowsCallable().all(),
581569
query,
582570
settings.bulkReadRowsSettings().getBatchingSettings(),
583571
clientContext.getExecutor());

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,7 @@ public Void call() {
176176

177177
// Configure the deadline
178178
ApiCallContext currentCallContext = callContext;
179-
if (currentCallContext.getTimeout() == null
180-
&& !externalFuture.getAttemptSettings().getRpcTimeout().isZero()) {
179+
if (!externalFuture.getAttemptSettings().getRpcTimeout().isZero()) {
181180
currentCallContext =
182181
currentCallContext.withTimeout(externalFuture.getAttemptSettings().getRpcTimeout());
183182
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/PointReadTimeoutCallable.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.api.gax.rpc.ResponseObserver;
2121
import com.google.api.gax.rpc.ServerStreamingCallable;
2222
import com.google.bigtable.v2.ReadRowsRequest;
23+
import javax.annotation.Nullable;
2324
import org.threeten.bp.Duration;
2425

2526
/**
@@ -45,9 +46,9 @@ public PointReadTimeoutCallable(ServerStreamingCallable<ReadRowsRequest, RespT>
4546
@Override
4647
public void call(ReadRowsRequest request, ResponseObserver<RespT> observer, ApiCallContext ctx) {
4748
if (isPointRead(request)) {
48-
Duration streamWaitTimeout = ctx.getStreamWaitTimeout();
49-
if (ctx.getTimeout() == null && streamWaitTimeout != null) {
50-
ctx = ctx.withTimeout(streamWaitTimeout);
49+
Duration effectiveTimeout = getEffectivePointReadTimeout(ctx);
50+
if (effectiveTimeout != null) {
51+
ctx = ctx.withTimeout(effectiveTimeout);
5152
}
5253
}
5354
inner.call(request, observer, ctx);
@@ -62,4 +63,24 @@ private boolean isPointRead(ReadRowsRequest request) {
6263
}
6364
return request.getRows().getRowKeysCount() == 1;
6465
}
66+
67+
/**
68+
* Extracts the effective timeout for a point read.
69+
*
70+
* <p>The effective time is the minimum of a streamWaitTimeout and a user set attempt timeout.
71+
*/
72+
@Nullable
73+
private Duration getEffectivePointReadTimeout(ApiCallContext ctx) {
74+
Duration streamWaitTimeout = ctx.getStreamWaitTimeout();
75+
Duration attemptTimeout = ctx.getTimeout();
76+
77+
if (streamWaitTimeout == null) {
78+
return attemptTimeout;
79+
}
80+
81+
if (attemptTimeout == null) {
82+
return streamWaitTimeout;
83+
}
84+
return (attemptTimeout.compareTo(streamWaitTimeout) <= 0) ? attemptTimeout : streamWaitTimeout;
85+
}
6586
}

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientFactoryTest.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,8 @@
3535
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
3636
import com.google.cloud.bigtable.data.v2.models.RowMutation;
3737
import com.google.common.base.Preconditions;
38-
import com.google.common.collect.ImmutableList;
3938
import com.google.protobuf.ByteString;
4039
import io.grpc.Attributes;
41-
import io.grpc.BindableService;
42-
import io.grpc.ServerInterceptor;
4340
import io.grpc.ServerTransportFilter;
4441
import io.grpc.stub.StreamObserver;
4542
import java.io.IOException;
@@ -98,11 +95,7 @@ public void transportTerminated(Attributes transportAttrs) {
9895
terminateAttributes.add(transportAttrs);
9996
}
10097
};
101-
serviceHelper =
102-
new FakeServiceHelper(
103-
ImmutableList.<ServerInterceptor>of(),
104-
transportFilter,
105-
ImmutableList.<BindableService>of(service));
98+
serviceHelper = new FakeServiceHelper(null, transportFilter, service);
10699
port = serviceHelper.getPort();
107100
serviceHelper.start();
108101

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTest.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.google.api.core.ApiFuture;
2323
import com.google.api.core.ApiFutures;
2424
import com.google.api.gax.batching.Batcher;
25-
import com.google.api.gax.grpc.GrpcCallContext;
2625
import com.google.api.gax.rpc.ResponseObserver;
2726
import com.google.api.gax.rpc.ServerStreamingCallable;
2827
import com.google.api.gax.rpc.UnaryCallable;
@@ -81,13 +80,9 @@ public void setUp() {
8180
Mockito.when(mockStub.bulkMutateRowsCallable()).thenReturn(mockBulkMutateRowsCallable);
8281
Mockito.when(mockStub.checkAndMutateRowCallable()).thenReturn(mockCheckAndMutateRowCallable);
8382
Mockito.when(mockStub.readModifyWriteRowCallable()).thenReturn(mockReadModifyWriteRowCallable);
84-
Mockito.when(
85-
mockStub.newMutateRowsBatcher(
86-
Mockito.any(String.class), Mockito.any(GrpcCallContext.class)))
83+
Mockito.when(mockStub.newMutateRowsBatcher(Mockito.any(String.class)))
8784
.thenReturn(mockBulkMutationBatcher);
88-
Mockito.when(
89-
mockStub.newBulkReadRowsBatcher(
90-
Mockito.any(Query.class), Mockito.any(GrpcCallContext.class)))
85+
Mockito.when(mockStub.newBulkReadRowsBatcher(Mockito.any(Query.class)))
9186
.thenReturn(mockBulkReadRowsBatcher);
9287
}
9388

@@ -379,8 +374,7 @@ public void proxyNewBulkMutationBatcherTest() {
379374
ApiFuture<Void> actualRes = batcher.add(request);
380375
assertThat(actualRes).isSameInstanceAs(expectedResponse);
381376

382-
Mockito.verify(mockStub)
383-
.newMutateRowsBatcher(Mockito.any(String.class), Mockito.any(GrpcCallContext.class));
377+
Mockito.verify(mockStub).newMutateRowsBatcher(Mockito.any(String.class));
384378
}
385379

386380
@Test
@@ -396,8 +390,7 @@ public void proxyNewBulkReadRowsTest() {
396390
ApiFuture<Row> actualResponse = batcher.add(request);
397391
assertThat(actualResponse).isSameInstanceAs(expectedResponse);
398392

399-
Mockito.verify(mockStub)
400-
.newBulkReadRowsBatcher(Mockito.any(Query.class), Mockito.any(GrpcCallContext.class));
393+
Mockito.verify(mockStub).newBulkReadRowsBatcher(Mockito.any(Query.class));
401394
}
402395

403396
@Test
@@ -414,8 +407,7 @@ public void proxyNewBulkReadRowsWithFilterTest() {
414407
ApiFuture<Row> actualResponse = batcher.add(request);
415408
assertThat(actualResponse).isSameInstanceAs(expectedResponse);
416409

417-
Mockito.verify(mockStub)
418-
.newBulkReadRowsBatcher(Mockito.any(Query.class), Mockito.any(GrpcCallContext.class));
410+
Mockito.verify(mockStub).newBulkReadRowsBatcher(Mockito.any(Query.class));
419411
}
420412

421413
@Test

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/FakeServiceHelper.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,43 +15,40 @@
1515
*/
1616
package com.google.cloud.bigtable.data.v2;
1717

18-
import com.google.common.collect.ImmutableList;
1918
import io.grpc.BindableService;
2019
import io.grpc.Server;
2120
import io.grpc.ServerBuilder;
2221
import io.grpc.ServerInterceptor;
2322
import io.grpc.ServerTransportFilter;
2423
import java.io.IOException;
2524
import java.net.ServerSocket;
26-
import java.util.List;
2725

2826
/** Utility class to setup a fake grpc server on a random port. */
2927
public class FakeServiceHelper {
3028
private final int port;
3129
private final Server server;
3230

3331
public FakeServiceHelper(BindableService... services) throws IOException {
34-
this(ImmutableList.<ServerInterceptor>of(), null, ImmutableList.copyOf(services));
32+
this(null, services);
3533
}
3634

3735
public FakeServiceHelper(ServerInterceptor interceptor, BindableService... services)
3836
throws IOException {
39-
this(ImmutableList.of(interceptor), null, ImmutableList.copyOf(services));
37+
this(interceptor, null, services);
4038
}
4139

4240
public FakeServiceHelper(
43-
List<ServerInterceptor> interceptors,
41+
ServerInterceptor interceptor,
4442
ServerTransportFilter transportFilter,
45-
List<BindableService> services)
43+
BindableService... services)
4644
throws IOException {
4745
try (ServerSocket ss = new ServerSocket(0)) {
4846
port = ss.getLocalPort();
4947
}
5048
ServerBuilder builder = ServerBuilder.forPort(port);
51-
for (ServerInterceptor interceptor : interceptors) {
49+
if (interceptor != null) {
5250
builder = builder.intercept(interceptor);
5351
}
54-
5552
if (transportFilter != null) {
5653
builder = builder.addTransportFilter(transportFilter);
5754
}

0 commit comments

Comments
 (0)