Skip to content

Commit 0330d77

Browse files
authored
fix: add RetryCallable to the callable chain (#2348)
* fix: add StreamingAttemptCallable to the callable chain * add a test * add a comment
1 parent 377437f commit 0330d77

File tree

2 files changed

+76
-5
lines changed

2 files changed

+76
-5
lines changed

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1335,17 +1335,16 @@ public Map<String, String> extract(ExecuteQueryRequest executeQueryRequest) {
13351335
ServerStreamingCallable<ExecuteQueryRequest, ExecuteQueryResponse> withStatsHeaders =
13361336
new StatsHeadersServerStreamingCallable<>(base);
13371337

1338-
ServerStreamingCallSettings<ExecuteQueryRequest, ExecuteQueryResponse> innerSettings =
1338+
ServerStreamingCallSettings<ExecuteQueryRequest, ExecuteQueryResponse> watchdogSettings =
13391339
ServerStreamingCallSettings.<ExecuteQueryRequest, ExecuteQueryResponse>newBuilder()
1340-
// TODO resumption strategy and retry settings
13411340
.setIdleTimeout(settings.executeQuerySettings().getIdleTimeout())
13421341
.setWaitTimeout(settings.executeQuerySettings().getWaitTimeout())
13431342
.build();
13441343

13451344
// Watchdog needs to stay above the metadata observer so that watchdog errors
13461345
// are passed through to the metadata future.
13471346
ServerStreamingCallable<ExecuteQueryRequest, ExecuteQueryResponse> watched =
1348-
Callables.watched(withStatsHeaders, innerSettings, clientContext);
1347+
Callables.watched(withStatsHeaders, watchdogSettings, clientContext);
13491348

13501349
ServerStreamingCallable<ExecuteQueryCallContext, ExecuteQueryResponse> withMetadataObserver =
13511350
new MetadataResolvingCallable(watched);
@@ -1356,10 +1355,19 @@ public Map<String, String> extract(ExecuteQueryRequest executeQueryRequest) {
13561355
ServerStreamingCallable<ExecuteQueryCallContext, SqlRow> withBigtableTracer =
13571356
new BigtableTracerStreamingCallable<>(merging);
13581357

1358+
ServerStreamingCallSettings<ExecuteQueryCallContext, SqlRow> retrySettings =
1359+
ServerStreamingCallSettings.<ExecuteQueryCallContext, SqlRow>newBuilder()
1360+
// TODO resumption strategy and retry settings
1361+
.build();
1362+
1363+
// Adding RetryingCallable to the callable chain so that client side metrics can be
1364+
// measured correctly. Retries are currently disabled.
1365+
ServerStreamingCallable<ExecuteQueryCallContext, SqlRow> retries =
1366+
withRetries(withBigtableTracer, retrySettings);
1367+
13591368
SpanName span = getSpanName("ExecuteQuery");
13601369
ServerStreamingCallable<ExecuteQueryCallContext, SqlRow> traced =
1361-
new TracedServerStreamingCallable<>(
1362-
withBigtableTracer, clientContext.getTracerFactory(), span);
1370+
new TracedServerStreamingCallable<>(retries, clientContext.getTracerFactory(), span);
13631371

13641372
return new ExecuteQueryCallable(
13651373
traced.withDefaultCallContext(clientContext.getDefaultCallContext()), requestContext);

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/sql/ExecuteQueryCallableTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,30 @@
2020
import static com.google.cloud.bigtable.data.v2.stub.sql.SqlProtoFactory.stringType;
2121
import static com.google.cloud.bigtable.data.v2.stub.sql.SqlProtoFactory.stringValue;
2222
import static com.google.common.truth.Truth.assertThat;
23+
import static org.junit.Assert.assertThrows;
2324

25+
import com.google.api.gax.rpc.UnavailableException;
26+
import com.google.bigtable.v2.BigtableGrpc;
27+
import com.google.bigtable.v2.ExecuteQueryRequest;
28+
import com.google.bigtable.v2.ExecuteQueryResponse;
29+
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
30+
import com.google.cloud.bigtable.data.v2.FakeServiceBuilder;
2431
import com.google.cloud.bigtable.data.v2.internal.ProtoResultSetMetadata;
2532
import com.google.cloud.bigtable.data.v2.internal.ProtoSqlRow;
2633
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
2734
import com.google.cloud.bigtable.data.v2.internal.SqlRow;
2835
import com.google.cloud.bigtable.data.v2.models.sql.Statement;
36+
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub;
2937
import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable;
38+
import io.grpc.Server;
39+
import io.grpc.Status;
40+
import io.grpc.StatusRuntimeException;
41+
import io.grpc.stub.StreamObserver;
42+
import java.io.IOException;
3043
import java.util.Collections;
3144
import java.util.Iterator;
45+
import org.junit.After;
46+
import org.junit.Before;
3247
import org.junit.Test;
3348
import org.junit.runner.RunWith;
3449
import org.junit.runners.JUnit4;
@@ -39,6 +54,29 @@ public class ExecuteQueryCallableTest {
3954
private static final RequestContext REQUEST_CONTEXT =
4055
RequestContext.create("fake-project", "fake-instance", "fake-profile");
4156

57+
private Server server;
58+
private FakeService fakeService = new FakeService();
59+
private EnhancedBigtableStub stub;
60+
61+
@Before
62+
public void setup() throws IOException {
63+
server = FakeServiceBuilder.create(fakeService).start();
64+
65+
BigtableDataSettings settings =
66+
BigtableDataSettings.newBuilderForEmulator(server.getPort())
67+
.setProjectId("fake-project")
68+
.setInstanceId("fake-instance")
69+
.build();
70+
71+
stub = EnhancedBigtableStub.create(settings.getStubSettings());
72+
}
73+
74+
@After
75+
public void tearDown() {
76+
stub.close();
77+
server.shutdown();
78+
}
79+
4280
@Test
4381
public void testCallContextAndServerStreamSetup() {
4482
SqlRow row =
@@ -57,4 +95,29 @@ public void testCallContextAndServerStreamSetup() {
5795
assertThat(responseIterator.next()).isEqualTo(row);
5896
assertThat(responseIterator.hasNext()).isFalse();
5997
}
98+
99+
@Test
100+
public void testExecuteQueryRequestsAreNotRetried() {
101+
// TODO: retries for execute query is currently disabled. This test should be
102+
// updated once resumption token is in place.
103+
SqlServerStream stream = stub.executeQueryCallable().call(Statement.of("SELECT * FROM table"));
104+
105+
Iterator<SqlRow> iterator = stream.rows().iterator();
106+
107+
assertThrows(UnavailableException.class, iterator::next).getCause();
108+
assertThat(fakeService.attempts).isEqualTo(1);
109+
}
110+
111+
private static class FakeService extends BigtableGrpc.BigtableImplBase {
112+
113+
private int attempts = 0;
114+
115+
@Override
116+
public void executeQuery(
117+
ExecuteQueryRequest request, StreamObserver<ExecuteQueryResponse> responseObserver) {
118+
attempts++;
119+
responseObserver.onNext(metadata(columnMetadata("test", stringType())));
120+
responseObserver.onError(new StatusRuntimeException(Status.UNAVAILABLE));
121+
}
122+
}
60123
}

0 commit comments

Comments
 (0)