Skip to content
This repository was archived by the owner on Sep 26, 2023. It is now read-only.

Commit 956a513

Browse files
igorbernstein2garrettjonesgoogle
authored andcommitted
Implementing server streaming retries (#463)
1 parent 4739e38 commit 956a513

13 files changed

+1952
-2
lines changed

gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcCallableFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ private static <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createBa
7171
&& callSettings.getRetrySettings().getMaxAttempts() > 1) {
7272
callable = Callables.retrying(callable, callSettings, clientContext);
7373
}
74+
7475
return callable;
7576
}
7677

@@ -238,6 +239,8 @@ ServerStreamingCallable<RequestT, ResponseT> createServerStreamingCallable(
238239
new GrpcExceptionServerStreamingCallable<>(
239240
callable, streamingCallSettings.getRetryableCodes());
240241

242+
callable = Callables.retrying(callable, streamingCallSettings, clientContext);
243+
241244
return callable.withDefaultCallContext(clientContext.getDefaultCallContext());
242245
}
243246

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2018, Google LLC All rights reserved.
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.retrying;
31+
32+
import com.google.api.core.InternalApi;
33+
34+
/**
35+
* A wrapper exception thrown by {@code ServerStreamingAttemptCallable} to communicate additional
36+
* context to the {@link StreamingRetryAlgorithm} and to pass the original cancellation stack trace
37+
* to {@code RetryingServerStreamingCallable}.
38+
*
39+
* <p>For internal use only - public for technical reasons.
40+
*/
41+
@InternalApi
42+
public class ServerStreamingAttemptException extends RuntimeException {
43+
private final boolean canResume;
44+
private final boolean seenResponses;
45+
46+
public ServerStreamingAttemptException(
47+
Throwable cause, boolean canResume, boolean seenResponses) {
48+
super(cause);
49+
this.canResume = canResume;
50+
this.seenResponses = seenResponses;
51+
}
52+
53+
/** If the {@link StreamResumptionStrategy} supports resuming after this error. */
54+
public boolean canResume() {
55+
return canResume;
56+
}
57+
58+
/**
59+
* If the current RPC attempt has seen any streamed messages. This is used as a signal by {@link
60+
* StreamingRetryAlgorithm} to reset timers.
61+
*/
62+
public boolean hasSeenResponses() {
63+
return seenResponses;
64+
}
65+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2018, Google LLC All rights reserved.
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.retrying;
31+
32+
import com.google.api.core.BetaApi;
33+
import com.google.common.base.Preconditions;
34+
35+
/**
36+
* Simplest implementation of a {@link StreamResumptionStrategy} which returns the initial request
37+
* for unstarted streams.
38+
*/
39+
@BetaApi("The surface for streaming is not stable yet and may change in the future.")
40+
public final class SimpleStreamResumptionStrategy<RequestT, ResponseT>
41+
implements StreamResumptionStrategy<RequestT, ResponseT> {
42+
private boolean seenFirstResponse;
43+
44+
@Override
45+
public StreamResumptionStrategy<RequestT, ResponseT> createNew() {
46+
return new SimpleStreamResumptionStrategy<>();
47+
}
48+
49+
@Override
50+
public void onProgress(ResponseT response) {
51+
seenFirstResponse = true;
52+
}
53+
54+
@Override
55+
public RequestT getResumeRequest(RequestT originalRequest) {
56+
Preconditions.checkState(!seenFirstResponse, "Tried to resume an unresumeable stream.");
57+
return originalRequest;
58+
}
59+
60+
@Override
61+
public boolean canResume() {
62+
return !seenFirstResponse;
63+
}
64+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2018, Google LLC All rights reserved.
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.retrying;
31+
32+
import com.google.api.core.BetaApi;
33+
34+
/**
35+
* This is part of the server streaming retry api. Its implementers are responsible for tracking the
36+
* progress of the stream and calculating a request to resume it in case of an error.
37+
*
38+
* <p>Implementations don't have to be threadsafe because all of the calls will be serialized.
39+
*/
40+
@BetaApi("The surface for streaming is not stable yet and may change in the future.")
41+
public interface StreamResumptionStrategy<RequestT, ResponseT> {
42+
43+
/** Creates a new instance of this StreamResumptionStrategy without accumulated state */
44+
StreamResumptionStrategy<RequestT, ResponseT> createNew();
45+
46+
/**
47+
* Called by the {@code ServerStreamingAttemptCallable} when a response has been successfully
48+
* received.
49+
*/
50+
void onProgress(ResponseT response);
51+
52+
/**
53+
* Called when a stream needs to be restarted, the implementation should generate a request that
54+
* will yield a new stream whose first response would come right after the last response received
55+
* by onProgress.
56+
*
57+
* @return A request that can be used to resume the stream.
58+
*/
59+
RequestT getResumeRequest(RequestT originalRequest);
60+
61+
/** If a resume request can be created. */
62+
boolean canResume();
63+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright 2018, Google LLC All rights reserved.
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
* * Neither the name of Google LLC nor the names of its
15+
* contributors may be used to endorse or promote products derived from
16+
* this software without specific prior written permission.
17+
*
18+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29+
*/
30+
package com.google.api.gax.retrying;
31+
32+
import com.google.api.core.InternalApi;
33+
import java.util.concurrent.CancellationException;
34+
35+
/**
36+
* The streaming retry algorithm, which makes decision based either on the thrown exception and the
37+
* execution time settings of the previous attempt. This extends {@link RetryAlgorithm} to take
38+
* additional information (provided by {@code ServerStreamingAttemptCallable}) into account.
39+
*
40+
* <p>This class is thread-safe.
41+
*
42+
* <p>Internal use only - public for technical reasons.
43+
*/
44+
@InternalApi("For internal use only")
45+
public final class StreamingRetryAlgorithm<ResponseT> extends RetryAlgorithm<ResponseT> {
46+
public StreamingRetryAlgorithm(
47+
ResultRetryAlgorithm<ResponseT> resultAlgorithm, TimedRetryAlgorithm timedAlgorithm) {
48+
super(resultAlgorithm, timedAlgorithm);
49+
}
50+
51+
/**
52+
* {@inheritDoc}
53+
*
54+
* <p>The attempt settings will be reset if the stream attempt produced any messages.
55+
*/
56+
@Override
57+
public TimedAttemptSettings createNextAttempt(
58+
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings prevSettings) {
59+
60+
if (prevThrowable instanceof ServerStreamingAttemptException) {
61+
ServerStreamingAttemptException attemptException =
62+
(ServerStreamingAttemptException) prevThrowable;
63+
prevThrowable = prevThrowable.getCause();
64+
65+
// If we have made progress in the last attempt, then reset the delays
66+
if (attemptException.hasSeenResponses()) {
67+
prevSettings =
68+
createFirstAttempt()
69+
.toBuilder()
70+
.setFirstAttemptStartTimeNanos(prevSettings.getFirstAttemptStartTimeNanos())
71+
.build();
72+
}
73+
}
74+
75+
return super.createNextAttempt(prevThrowable, prevResponse, prevSettings);
76+
}
77+
78+
/**
79+
* {@inheritDoc}
80+
*
81+
* <p>Ensures retries are only scheduled if the {@link StreamResumptionStrategy} in the {@code
82+
* ServerStreamingAttemptCallable} supports it.
83+
*/
84+
@Override
85+
public boolean shouldRetry(
86+
Throwable prevThrowable, ResponseT prevResponse, TimedAttemptSettings nextAttemptSettings)
87+
throws CancellationException {
88+
89+
// Unwrap
90+
if (prevThrowable instanceof ServerStreamingAttemptException) {
91+
ServerStreamingAttemptException attemptExceptino =
92+
(ServerStreamingAttemptException) prevThrowable;
93+
prevThrowable = prevThrowable.getCause();
94+
95+
if (!attemptExceptino.canResume()) {
96+
return false;
97+
}
98+
}
99+
100+
return super.shouldRetry(prevThrowable, prevResponse, nextAttemptSettings);
101+
}
102+
}

gax/src/main/java/com/google/api/gax/rpc/Callables.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.google.api.gax.retrying.RetryAlgorithm;
3838
import com.google.api.gax.retrying.RetryingExecutor;
3939
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
40+
import com.google.api.gax.retrying.StreamingRetryAlgorithm;
4041

4142
/**
4243
* Class with utility methods to create callable objects using provided settings.
@@ -65,6 +66,43 @@ public static <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> retrying(
6566
clientContext.getDefaultCallContext(), innerCallable, retryingExecutor);
6667
}
6768

69+
@BetaApi("The surface for streaming is not stable yet and may change in the future.")
70+
public static <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT> retrying(
71+
ServerStreamingCallable<RequestT, ResponseT> innerCallable,
72+
ServerStreamingCallSettings<RequestT, ResponseT> callSettings,
73+
ClientContext clientContext) {
74+
75+
if (callSettings.getRetryableCodes().isEmpty()
76+
|| callSettings.getRetrySettings().getMaxAttempts() <= 1) {
77+
78+
return innerCallable;
79+
}
80+
81+
StreamingRetryAlgorithm<Void> retryAlgorithm =
82+
new StreamingRetryAlgorithm<>(
83+
new ApiResultRetryAlgorithm<Void>(),
84+
new ExponentialRetryAlgorithm(
85+
callSettings.getRetrySettings(), clientContext.getClock()));
86+
87+
ScheduledRetryingExecutor<Void> retryingExecutor =
88+
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
89+
90+
// NOTE: This creates a Watchdog per streaming API method. Ideally, there should only be a
91+
// single Watchdog for the entire process, however that change would be fairly invasive and
92+
// the cost of multiple Watchdogs is fairly small, since they all use the same executor. If this
93+
// becomes an issue, the watchdog can be moved to ClientContext.
94+
Watchdog<ResponseT> watchdog =
95+
new Watchdog<>(
96+
clientContext.getExecutor(),
97+
clientContext.getClock(),
98+
callSettings.getTimeoutCheckInterval(),
99+
callSettings.getIdleTimeout());
100+
watchdog.start();
101+
102+
return new RetryingServerStreamingCallable<>(
103+
watchdog, innerCallable, retryingExecutor, callSettings.getResumptionStrategy());
104+
}
105+
68106
/**
69107
* Create a callable object that represents a batching API method. Designed for use by generated
70108
* code.

0 commit comments

Comments
 (0)