Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.

Commit e5ef54d

Browse files
igorbernstein2garrettjonesgoogle
authored andcommitted
Expose retryableCodes on ServerStreamingCallables. (#461)
This allows configuration of the ApiException callable on in ServerStreamingCallable chain. Eventually this will be paired with RetrySettings and a RetryingServerStreamingCallable to provide retries for streaming rpcs. For now, it unblocks the development of a retry solution outside of the gax repo.
1 parent 33a77fc commit e5ef54d

File tree

4 files changed

+222
-3
lines changed

4 files changed

+222
-3
lines changed

gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcCallableFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,8 @@ ServerStreamingCallable<RequestT, ResponseT> createServerStreamingCallable(
235235
callable, grpcCallSettings.getParamsExtractor());
236236
}
237237
callable =
238-
new GrpcExceptionServerStreamingCallable<>(callable, ImmutableSet.<StatusCode.Code>of());
238+
new GrpcExceptionServerStreamingCallable<>(
239+
callable, streamingCallSettings.getRetryableCodes());
239240

240241
return callable.withDefaultCallContext(clientContext.getDefaultCallContext());
241242
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright 2018, Google LLC All rights reserved.
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.grpc;
31+
32+
import com.google.api.gax.grpc.testing.FakeServiceGrpc;
33+
import com.google.api.gax.grpc.testing.FakeServiceImpl;
34+
import com.google.api.gax.grpc.testing.InProcessServer;
35+
import com.google.api.gax.rpc.ClientContext;
36+
import com.google.api.gax.rpc.InvalidArgumentException;
37+
import com.google.api.gax.rpc.ServerStreamingCallSettings;
38+
import com.google.api.gax.rpc.ServerStreamingCallable;
39+
import com.google.api.gax.rpc.StatusCode.Code;
40+
import com.google.common.truth.Truth;
41+
import com.google.type.Color;
42+
import com.google.type.Money;
43+
import io.grpc.CallOptions;
44+
import io.grpc.ManagedChannel;
45+
import io.grpc.inprocess.InProcessChannelBuilder;
46+
import org.junit.After;
47+
import org.junit.Before;
48+
import org.junit.Test;
49+
import org.junit.runner.RunWith;
50+
import org.junit.runners.JUnit4;
51+
52+
@RunWith(JUnit4.class)
53+
public class GrpcCallableFactoryTest {
54+
private InProcessServer<FakeServiceImpl> inprocessServer;
55+
private ManagedChannel channel;
56+
private ClientContext clientContext;
57+
58+
@Before
59+
public void setUp() throws Exception {
60+
String serverName = "fakeservice";
61+
FakeServiceImpl serviceImpl = new FakeServiceImpl();
62+
inprocessServer = new InProcessServer<>(serviceImpl, serverName);
63+
inprocessServer.start();
64+
65+
channel =
66+
InProcessChannelBuilder.forName(serverName).directExecutor().usePlaintext(true).build();
67+
clientContext =
68+
ClientContext.newBuilder()
69+
.setTransportChannel(GrpcTransportChannel.create(channel))
70+
.setDefaultCallContext(GrpcCallContext.of(channel, CallOptions.DEFAULT))
71+
.build();
72+
}
73+
74+
@After
75+
public void tearDown() {
76+
channel.shutdown();
77+
inprocessServer.stop();
78+
}
79+
80+
@Test
81+
public void createServerStreamingCallableRetryableExceptions() throws Exception {
82+
GrpcCallSettings<Color, Money> grpcCallSettings =
83+
GrpcCallSettings.create(FakeServiceGrpc.METHOD_STREAMING_RECOGNIZE_ERROR);
84+
85+
// Base case: without config, invalid argument errors are not retryable.
86+
ServerStreamingCallSettings<Color, Money> nonRetryableSettings =
87+
ServerStreamingCallSettings.<Color, Money>newBuilder().build();
88+
89+
ServerStreamingCallable<Color, Money> nonRetryableCallable =
90+
GrpcCallableFactory.createServerStreamingCallable(
91+
grpcCallSettings, nonRetryableSettings, clientContext);
92+
93+
Throwable actualError = null;
94+
try {
95+
nonRetryableCallable
96+
.first()
97+
.call(Color.getDefaultInstance(), clientContext.getDefaultCallContext());
98+
} catch (Throwable e) {
99+
actualError = e;
100+
}
101+
Truth.assertThat(actualError).isInstanceOf(InvalidArgumentException.class);
102+
Truth.assertThat(((InvalidArgumentException) actualError).isRetryable()).isFalse();
103+
104+
// Actual test: with config, invalid argument errors are retryable.
105+
ServerStreamingCallSettings<Color, Money> retryableSettings =
106+
ServerStreamingCallSettings.<Color, Money>newBuilder()
107+
.setRetryableCodes(Code.INVALID_ARGUMENT)
108+
.build();
109+
110+
ServerStreamingCallable<Color, Money> retryableCallable =
111+
GrpcCallableFactory.createServerStreamingCallable(
112+
grpcCallSettings, retryableSettings, clientContext);
113+
114+
Throwable actualError2 = null;
115+
try {
116+
retryableCallable
117+
.first()
118+
.call(Color.getDefaultInstance(), clientContext.getDefaultCallContext());
119+
} catch (Throwable e) {
120+
actualError2 = e;
121+
}
122+
Truth.assertThat(actualError2).isInstanceOf(InvalidArgumentException.class);
123+
Truth.assertThat(((InvalidArgumentException) actualError2).isRetryable()).isTrue();
124+
}
125+
}

gax/src/main/java/com/google/api/gax/rpc/ServerStreamingCallSettings.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030
package com.google.api.gax.rpc;
3131

3232
import com.google.api.core.BetaApi;
33+
import com.google.api.gax.rpc.StatusCode.Code;
34+
import com.google.common.collect.ImmutableSet;
35+
import com.google.common.collect.Sets;
36+
import java.util.Set;
3337

3438
/**
3539
* A settings class to configure a {@link ServerStreamingCallable}.
@@ -40,7 +44,15 @@
4044
public final class ServerStreamingCallSettings<RequestT, ResponseT>
4145
extends StreamingCallSettings<RequestT, ResponseT> {
4246

43-
private ServerStreamingCallSettings(Builder<RequestT, ResponseT> builder) {}
47+
private final Set<Code> retryableCodes;
48+
49+
private ServerStreamingCallSettings(Builder<RequestT, ResponseT> builder) {
50+
this.retryableCodes = ImmutableSet.copyOf(builder.retryableCodes);
51+
}
52+
53+
public Set<Code> getRetryableCodes() {
54+
return retryableCodes;
55+
}
4456

4557
public Builder<RequestT, ResponseT> toBuilder() {
4658
return new Builder<>(this);
@@ -53,10 +65,29 @@ public static <RequestT, ResponseT> Builder<RequestT, ResponseT> newBuilder() {
5365
public static class Builder<RequestT, ResponseT>
5466
extends StreamingCallSettings.Builder<RequestT, ResponseT> {
5567

56-
private Builder() {}
68+
private Set<StatusCode.Code> retryableCodes;
69+
70+
private Builder() {
71+
this.retryableCodes = ImmutableSet.of();
72+
}
5773

5874
private Builder(ServerStreamingCallSettings<RequestT, ResponseT> settings) {
5975
super(settings);
76+
this.retryableCodes = settings.retryableCodes;
77+
}
78+
79+
public Builder<RequestT, ResponseT> setRetryableCodes(StatusCode.Code... codes) {
80+
this.setRetryableCodes(Sets.newHashSet(codes));
81+
return this;
82+
}
83+
84+
public Builder<RequestT, ResponseT> setRetryableCodes(Set<Code> retryableCodes) {
85+
this.retryableCodes = Sets.newHashSet(retryableCodes);
86+
return this;
87+
}
88+
89+
public Set<Code> getRetryableCodes() {
90+
return retryableCodes;
6091
}
6192

6293
@Override
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2018, Google LLC All rights reserved.
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.rpc;
31+
32+
import com.google.api.gax.rpc.StatusCode.Code;
33+
import com.google.common.collect.ImmutableSet;
34+
import com.google.common.truth.Truth;
35+
import java.util.Set;
36+
import org.junit.Test;
37+
import org.junit.runner.RunWith;
38+
import org.junit.runners.JUnit4;
39+
40+
@RunWith(JUnit4.class)
41+
public class ServerStreamingCallSettingsTest {
42+
@Test
43+
public void retryableCodesAreNotLost() {
44+
Set<Code> codes = ImmutableSet.of(Code.UNAVAILABLE, Code.RESOURCE_EXHAUSTED);
45+
ServerStreamingCallSettings.Builder<Object, Object> builder =
46+
ServerStreamingCallSettings.newBuilder();
47+
builder.setRetryableCodes(codes);
48+
49+
Truth.assertThat(builder.getRetryableCodes()).containsExactlyElementsIn(codes);
50+
Truth.assertThat(builder.build().getRetryableCodes()).containsExactlyElementsIn(codes);
51+
Truth.assertThat(builder.build().toBuilder().getRetryableCodes())
52+
.containsExactlyElementsIn(codes);
53+
}
54+
55+
@Test
56+
public void retryableCodesVarArgs() {
57+
ServerStreamingCallSettings.Builder<Object, Object> builder =
58+
ServerStreamingCallSettings.newBuilder().setRetryableCodes(Code.UNKNOWN, Code.ABORTED);
59+
60+
Truth.assertThat(builder.getRetryableCodes()).containsExactly(Code.UNKNOWN, Code.ABORTED);
61+
}
62+
}

0 commit comments

Comments
 (0)