Skip to content

Commit 6eb3331

Browse files
authored
fix: update retry logic for grpc start resumable upload to properly handle client side deadline_exceeded (googleapis#3354)
1 parent 9f0a93e commit 6eb3331

File tree

7 files changed

+120
-45
lines changed

7 files changed

+120
-45
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUploadSessionBuilder.java

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717
package com.google.cloud.storage;
1818

1919
import com.google.api.core.ApiFuture;
20-
import com.google.api.core.ApiFutures;
20+
import com.google.api.core.SettableApiFuture;
2121
import com.google.api.gax.rpc.BidiStreamingCallable;
2222
import com.google.api.gax.rpc.ClientStreamingCallable;
2323
import com.google.api.gax.rpc.UnaryCallable;
24+
import com.google.cloud.storage.Conversions.Decoder;
25+
import com.google.cloud.storage.Retrying.RetrierWithAlg;
2426
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
2527
import com.google.cloud.storage.UnifiedOpts.Opts;
26-
import com.google.common.util.concurrent.MoreExecutors;
2728
import com.google.storage.v2.BidiWriteObjectRequest;
2829
import com.google.storage.v2.BidiWriteObjectResponse;
2930
import com.google.storage.v2.StartResumableWriteRequest;
@@ -53,7 +54,8 @@ GapicBidiWritableByteChannelSessionBuilder bidiByteChannel(
5354
ApiFuture<ResumableWrite> resumableWrite(
5455
UnaryCallable<StartResumableWriteRequest, StartResumableWriteResponse> callable,
5556
WriteObjectRequest writeObjectRequest,
56-
Opts<ObjectTargetOpt> opts) {
57+
Opts<ObjectTargetOpt> opts,
58+
RetrierWithAlg retrier) {
5759
StartResumableWriteRequest.Builder b = StartResumableWriteRequest.newBuilder();
5860
if (writeObjectRequest.hasWriteObjectSpec()) {
5961
b.setWriteObjectSpec(writeObjectRequest.getWriteObjectSpec());
@@ -68,23 +70,27 @@ ApiFuture<ResumableWrite> resumableWrite(
6870
Function<String, WriteObjectRequest> f =
6971
uploadId ->
7072
writeObjectRequest.toBuilder().clearWriteObjectSpec().setUploadId(uploadId).build();
71-
ApiFuture<ResumableWrite> futureResumableWrite =
72-
ApiFutures.transform(
73-
callable.futureCall(req),
74-
(resp) -> new ResumableWrite(req, resp, f),
75-
MoreExecutors.directExecutor());
76-
// make sure we wrap any failure as a storage exception
77-
return ApiFutures.catchingAsync(
78-
futureResumableWrite,
79-
Throwable.class,
80-
throwable -> ApiFutures.immediateFailedFuture(StorageException.coalesce(throwable)),
81-
MoreExecutors.directExecutor());
73+
SettableApiFuture<ResumableWrite> future = SettableApiFuture.create();
74+
try {
75+
ResumableWrite resumableWrite =
76+
retrier.run(
77+
() -> {
78+
StartResumableWriteResponse resp = callable.call(req);
79+
return new ResumableWrite(req, resp, f);
80+
},
81+
Decoder.identity());
82+
future.set(resumableWrite);
83+
} catch (StorageException e) {
84+
future.setException(e);
85+
}
86+
return future;
8287
}
8388

8489
ApiFuture<BidiResumableWrite> bidiResumableWrite(
8590
UnaryCallable<StartResumableWriteRequest, StartResumableWriteResponse> x,
8691
BidiWriteObjectRequest writeObjectRequest,
87-
Opts<ObjectTargetOpt> opts) {
92+
Opts<ObjectTargetOpt> opts,
93+
RetrierWithAlg retrier) {
8894
StartResumableWriteRequest.Builder b = StartResumableWriteRequest.newBuilder();
8995
if (writeObjectRequest.hasWriteObjectSpec()) {
9096
b.setWriteObjectSpec(writeObjectRequest.getWriteObjectSpec());
@@ -99,9 +105,19 @@ ApiFuture<BidiResumableWrite> bidiResumableWrite(
99105
Function<String, BidiWriteObjectRequest> f =
100106
uploadId ->
101107
writeObjectRequest.toBuilder().clearWriteObjectSpec().setUploadId(uploadId).build();
102-
return ApiFutures.transform(
103-
x.futureCall(req),
104-
(resp) -> new BidiResumableWrite(req, resp, f),
105-
MoreExecutors.directExecutor());
108+
SettableApiFuture<BidiResumableWrite> future = SettableApiFuture.create();
109+
try {
110+
BidiResumableWrite resumableWrite =
111+
retrier.run(
112+
() -> {
113+
StartResumableWriteResponse resp = x.call(req);
114+
return new BidiResumableWrite(req, resp, f);
115+
},
116+
Decoder.identity());
117+
future.set(resumableWrite);
118+
} catch (StorageException e) {
119+
future.setException(e);
120+
}
121+
return future;
106122
}
107123
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import static com.google.cloud.storage.ByteSizeConstants._256KiB;
2222
import static com.google.cloud.storage.CrossTransportUtils.fmtMethodName;
2323
import static com.google.cloud.storage.CrossTransportUtils.throwHttpJsonOnly;
24-
import static com.google.cloud.storage.GrpcToHttpStatusCodeTranslation.resultRetryAlgorithmToCodes;
2524
import static com.google.cloud.storage.StorageV2ProtoUtils.bucketAclEntityOrAltEq;
2625
import static com.google.cloud.storage.StorageV2ProtoUtils.objectAclEntityOrAltEq;
2726
import static com.google.cloud.storage.Utils.bucketNameCodec;
@@ -42,7 +41,6 @@
4241
import com.google.api.gax.rpc.ApiExceptions;
4342
import com.google.api.gax.rpc.ClientStreamingCallable;
4443
import com.google.api.gax.rpc.NotFoundException;
45-
import com.google.api.gax.rpc.StatusCode;
4644
import com.google.api.gax.rpc.UnaryCallable;
4745
import com.google.cloud.BaseService;
4846
import com.google.cloud.Policy;
@@ -1831,30 +1829,26 @@ private UnbufferedReadableByteChannelSession<Object> unbufferedReadSession(
18311829
@VisibleForTesting
18321830
ApiFuture<ResumableWrite> startResumableWrite(
18331831
GrpcCallContext grpcCallContext, WriteObjectRequest req, Opts<ObjectTargetOpt> opts) {
1834-
Set<StatusCode.Code> codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(req));
18351832
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
18361833
return ResumableMedia.gapic()
18371834
.write()
18381835
.resumableWrite(
1839-
storageClient
1840-
.startResumableWriteCallable()
1841-
.withDefaultCallContext(merge.withRetryableCodes(codes)),
1836+
storageClient.startResumableWriteCallable().withDefaultCallContext(merge),
18421837
req,
1843-
opts);
1838+
opts,
1839+
retrier.withAlg(retryAlgorithmManager.getFor(req)));
18441840
}
18451841

18461842
ApiFuture<BidiResumableWrite> startResumableWrite(
18471843
GrpcCallContext grpcCallContext, BidiWriteObjectRequest req, Opts<ObjectTargetOpt> opts) {
1848-
Set<StatusCode.Code> codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(req));
18491844
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
18501845
return ResumableMedia.gapic()
18511846
.write()
18521847
.bidiResumableWrite(
1853-
storageClient
1854-
.startResumableWriteCallable()
1855-
.withDefaultCallContext(merge.withRetryableCodes(codes)),
1848+
storageClient.startResumableWriteCallable().withDefaultCallContext(merge),
18561849
req,
1857-
opts);
1850+
opts,
1851+
retrier.withAlg(retryAlgorithmManager.getFor(req)));
18581852
}
18591853

18601854
private SourceObject sourceObjectEncode(SourceBlob from) {

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import com.google.api.gax.rpc.RequestParamsBuilder;
4646
import com.google.api.gax.rpc.RequestParamsExtractor;
4747
import com.google.api.gax.rpc.ServerStreamingCallable;
48-
import com.google.api.gax.rpc.StatusCode.Code;
4948
import com.google.api.gax.rpc.internal.QuotaProjectIdHidingCredentials;
5049
import com.google.api.gax.tracing.ApiTracerFactory;
5150
import com.google.api.pathtemplate.PathTemplate;
@@ -109,6 +108,7 @@
109108
import java.net.URI;
110109
import java.nio.ByteBuffer;
111110
import java.time.Clock;
111+
import java.time.Duration;
112112
import java.util.ArrayList;
113113
import java.util.Arrays;
114114
import java.util.Collections;
@@ -360,8 +360,6 @@ private Tuple<StorageSettings, Opts<UserProject>> resolveSettingsAndOpts() throw
360360
.setLogicalTimeout(java.time.Duration.ofDays(28))
361361
.build();
362362
java.time.Duration totalTimeout = baseRetrySettings.getTotalTimeoutDuration();
363-
Set<Code> startResumableWriteRetryableCodes =
364-
builder.startResumableWriteSettings().getRetryableCodes();
365363

366364
// retries for unary methods are generally handled at a different level, except
367365
// StartResumableWrite
@@ -372,10 +370,22 @@ private Tuple<StorageSettings, Opts<UserProject>> resolveSettingsAndOpts() throw
372370
});
373371

374372
// configure the settings for StartResumableWrite
373+
Duration startResumableTimeoutDuration;
374+
// the default for initialRpcTimeout is the same as totalTimeout. This is not good, because it
375+
// will prevent our retries from even happening.
376+
// If the default values is used, set our per-rpc timeout to 20 seconds to allow our retries
377+
// a chance.
378+
if (baseRetrySettings
379+
.getInitialRpcTimeoutDuration()
380+
.equals(getDefaultRetrySettings().getInitialRpcTimeoutDuration())) {
381+
startResumableTimeoutDuration = Duration.ofSeconds(20);
382+
} else {
383+
startResumableTimeoutDuration = baseRetrySettings.getInitialRpcTimeoutDuration();
384+
}
375385
builder
376386
.startResumableWriteSettings()
377-
.setRetrySettings(baseRetrySettings)
378-
.setRetryableCodes(startResumableWriteRetryableCodes);
387+
// set this lower, to allow our retries a chance instead of it being totalTimeout
388+
.setSimpleTimeoutNoRetriesDuration(startResumableTimeoutDuration);
379389
// for ReadObject disable retries and move the total timeout to the idle timeout
380390
builder
381391
.readObjectSettings()

google-cloud-storage/src/test/java/com/google/cloud/storage/FakeServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ static FakeServer of(StorageGrpc.StorageImplBase service) throws IOException {
7575
.setRetryDelayMultiplier(1.2)
7676
.setMaxRetryDelayDuration(Duration.ofSeconds(16))
7777
.setMaxAttempts(6)
78-
.setInitialRpcTimeoutDuration(Duration.ofSeconds(25))
78+
.setInitialRpcTimeoutDuration(Duration.ofSeconds(1))
7979
.setRpcTimeoutMultiplier(1.0)
8080
.setMaxRpcTimeoutDuration(Duration.ofSeconds(25))
8181
.build())

google-cloud-storage/src/test/java/com/google/cloud/storage/GapicUploadSessionBuilderSyntaxTest.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
import static org.mockito.Mockito.when;
2121

2222
import com.google.api.core.ApiFuture;
23-
import com.google.api.core.ApiFutures;
2423
import com.google.api.gax.rpc.ClientStreamingCallable;
2524
import com.google.api.gax.rpc.UnaryCallable;
25+
import com.google.cloud.storage.Retrying.RetrierWithAlg;
2626
import com.google.cloud.storage.UnifiedOpts.Opts;
2727
import com.google.storage.v2.StartResumableWriteRequest;
2828
import com.google.storage.v2.StartResumableWriteResponse;
@@ -60,8 +60,8 @@ public final class GapicUploadSessionBuilderSyntaxTest {
6060

6161
@Before
6262
public void setUp() throws Exception {
63-
when(startResumableWrite.futureCall(any()))
64-
.thenReturn(ApiFutures.immediateFuture(StartResumableWriteResponse.getDefaultInstance()));
63+
when(startResumableWrite.call(any()))
64+
.thenReturn(StartResumableWriteResponse.getDefaultInstance());
6565
}
6666

6767
@Test
@@ -95,7 +95,9 @@ public void syntax_directBuffered_fluent() {
9595
@Test
9696
public void syntax_resumableUnbuffered_fluent() {
9797
ApiFuture<ResumableWrite> startAsync =
98-
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty());
98+
ResumableMedia.gapic()
99+
.write()
100+
.resumableWrite(startResumableWrite, req, Opts.empty(), RetrierWithAlg.attemptOnce());
99101
UnbufferedWritableByteChannelSession<WriteObjectResponse> session =
100102
ResumableMedia.gapic()
101103
.write()
@@ -111,7 +113,9 @@ public void syntax_resumableUnbuffered_fluent() {
111113
@Test
112114
public void syntax_resumableBuffered_fluent() {
113115
ApiFuture<ResumableWrite> startAsync =
114-
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty());
116+
ResumableMedia.gapic()
117+
.write()
118+
.resumableWrite(startResumableWrite, req, Opts.empty(), RetrierWithAlg.attemptOnce());
115119
BufferedWritableByteChannelSession<WriteObjectResponse> session =
116120
ResumableMedia.gapic()
117121
.write()
@@ -151,7 +155,9 @@ public void syntax_directBuffered_incremental() {
151155
@Test
152156
public void syntax_resumableUnbuffered_incremental() {
153157
ApiFuture<ResumableWrite> startAsync =
154-
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty());
158+
ResumableMedia.gapic()
159+
.write()
160+
.resumableWrite(startResumableWrite, req, Opts.empty(), RetrierWithAlg.attemptOnce());
155161
GapicWritableByteChannelSessionBuilder b1 =
156162
ResumableMedia.gapic()
157163
.write()
@@ -165,7 +171,9 @@ public void syntax_resumableUnbuffered_incremental() {
165171
@Test
166172
public void syntax_resumableBuffered_incremental() {
167173
ApiFuture<ResumableWrite> startAsync =
168-
ResumableMedia.gapic().write().resumableWrite(startResumableWrite, req, Opts.empty());
174+
ResumableMedia.gapic()
175+
.write()
176+
.resumableWrite(startResumableWrite, req, Opts.empty(), RetrierWithAlg.attemptOnce());
169177
GapicWritableByteChannelSessionBuilder b1 =
170178
ResumableMedia.gapic()
171179
.write()

google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@
2020
import static com.google.cloud.storage.TestUtils.getChecksummedData;
2121
import static com.google.common.truth.Truth.assertThat;
2222

23+
import com.google.api.core.ApiFuture;
2324
import com.google.api.core.SettableApiFuture;
25+
import com.google.api.gax.grpc.GrpcCallContext;
2426
import com.google.api.gax.rpc.PermissionDeniedException;
2527
import com.google.cloud.storage.Retrying.RetrierWithAlg;
28+
import com.google.cloud.storage.UnifiedOpts.Opts;
2629
import com.google.cloud.storage.WriteCtx.SimpleWriteObjectRequestBuilderFactory;
2730
import com.google.cloud.storage.WriteCtx.WriteObjectRequestBuilderFactory;
2831
import com.google.common.collect.ImmutableList;
@@ -47,7 +50,10 @@
4750
import java.util.List;
4851
import java.util.Locale;
4952
import java.util.Set;
53+
import java.util.UUID;
5054
import java.util.concurrent.ExecutionException;
55+
import java.util.concurrent.TimeUnit;
56+
import java.util.concurrent.TimeoutException;
5157
import java.util.concurrent.atomic.AtomicBoolean;
5258
import java.util.concurrent.atomic.AtomicInteger;
5359
import java.util.function.BiConsumer;
@@ -210,6 +216,45 @@ public void resumableUpload() throws IOException, InterruptedException, Executio
210216
}
211217
}
212218

219+
@Test
220+
public void startResumableUpload_deadlineExceeded_isRetried()
221+
throws IOException, InterruptedException, ExecutionException, TimeoutException {
222+
223+
String uploadId = UUID.randomUUID().toString();
224+
AtomicInteger callCount = new AtomicInteger(0);
225+
StorageImplBase service =
226+
new StorageImplBase() {
227+
@Override
228+
public void startResumableWrite(
229+
StartResumableWriteRequest req, StreamObserver<StartResumableWriteResponse> respond) {
230+
if (callCount.getAndIncrement() > 0) {
231+
respond.onNext(
232+
StartResumableWriteResponse.newBuilder().setUploadId(uploadId).build());
233+
respond.onCompleted();
234+
}
235+
}
236+
};
237+
try (FakeServer fake = FakeServer.of(service)) {
238+
GrpcStorageImpl gsi = (GrpcStorageImpl) fake.getGrpcStorageOptions().getService();
239+
ApiFuture<ResumableWrite> f =
240+
gsi.startResumableWrite(
241+
GrpcCallContext.createDefault(),
242+
WriteObjectRequest.newBuilder()
243+
.setWriteObjectSpec(
244+
WriteObjectSpec.newBuilder()
245+
.setResource(
246+
Object.newBuilder().setBucket("bucket").setName("name").build())
247+
.setIfGenerationMatch(0)
248+
.build())
249+
.build(),
250+
Opts.empty());
251+
252+
ResumableWrite resumableWrite = f.get(2, TimeUnit.MINUTES);
253+
assertThat(callCount.get()).isEqualTo(2);
254+
assertThat(resumableWrite.newBuilder().build().getUploadId()).isEqualTo(uploadId);
255+
}
256+
}
257+
213258
@Test
214259
public void resumableUpload_chunkAutomaticRetry()
215260
throws IOException, InterruptedException, ExecutionException {

google-cloud-storage/src/test/java/com/google/cloud/storage/ITUnbufferedResumableUploadTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.api.services.storage.model.StorageObject;
2525
import com.google.cloud.storage.ITUnbufferedResumableUploadTest.ObjectSizes;
2626
import com.google.cloud.storage.Retrying.Retrier;
27+
import com.google.cloud.storage.Retrying.RetrierWithAlg;
2728
import com.google.cloud.storage.TransportCompatibility.Transport;
2829
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
2930
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
@@ -241,7 +242,8 @@ private UnbufferedWritableByteChannelSession<WriteObjectResponse> grpcSession()
241242
.resumableWrite(
242243
storageClient.startResumableWriteCallable().withDefaultCallContext(merge),
243244
request,
244-
opts);
245+
opts,
246+
RetrierWithAlg.attemptOnce());
245247

246248
return ResumableMedia.gapic()
247249
.write()

0 commit comments

Comments
 (0)