Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.cloud.storage.multipartupload.model.ListPartsResponse;
import com.google.cloud.storage.multipartupload.model.UploadPartRequest;
import com.google.cloud.storage.multipartupload.model.UploadPartResponse;
import java.net.URI;

/**
* A client for interacting with Google Cloud Storage's Multipart Upload API.
Expand Down Expand Up @@ -111,7 +110,6 @@ public abstract CompleteMultipartUploadResponse completeMultipartUpload(
public static MultipartUploadClient create(MultipartUploadSettings config) {
HttpStorageOptions options = config.getOptions();
return new MultipartUploadClientImpl(
URI.create(options.getHost()),
options.createRetrier(),
MultipartUploadHttpRequestManager.createFrom(options),
options.getRetryAlgorithmManager());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.google.cloud.storage.multipartupload.model.ListPartsResponse;
import com.google.cloud.storage.multipartupload.model.UploadPartRequest;
import com.google.cloud.storage.multipartupload.model.UploadPartResponse;
import java.net.URI;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -38,25 +37,22 @@ final class MultipartUploadClientImpl extends MultipartUploadClient {

private final MultipartUploadHttpRequestManager httpRequestManager;
private final Retrier retrier;
private final URI uri;
private final HttpRetryAlgorithmManager retryAlgorithmManager;

MultipartUploadClientImpl(
URI uri,
Retrier retrier,
MultipartUploadHttpRequestManager multipartUploadHttpRequestManager,
HttpRetryAlgorithmManager retryAlgorithmManager) {
this.httpRequestManager = multipartUploadHttpRequestManager;
this.retrier = retrier;
this.uri = uri;
this.retryAlgorithmManager = retryAlgorithmManager;
}

@Override
public CreateMultipartUploadResponse createMultipartUpload(CreateMultipartUploadRequest request) {
return retrier.run(
retryAlgorithmManager.nonIdempotent(),
() -> httpRequestManager.sendCreateMultipartUploadRequest(uri, request),
() -> httpRequestManager.sendCreateMultipartUploadRequest(request),
Decoder.identity());
}

Expand All @@ -65,7 +61,7 @@ public ListPartsResponse listParts(ListPartsRequest request) {

return retrier.run(
retryAlgorithmManager.idempotent(),
() -> httpRequestManager.sendListPartsRequest(uri, request),
() -> httpRequestManager.sendListPartsRequest(request),
Decoder.identity());
}

Expand All @@ -74,7 +70,7 @@ public AbortMultipartUploadResponse abortMultipartUpload(AbortMultipartUploadReq

return retrier.run(
retryAlgorithmManager.idempotent(),
() -> httpRequestManager.sendAbortMultipartUploadRequest(uri, request),
() -> httpRequestManager.sendAbortMultipartUploadRequest(request),
Decoder.identity());
}

Expand All @@ -83,7 +79,7 @@ public CompleteMultipartUploadResponse completeMultipartUpload(
CompleteMultipartUploadRequest request) {
return retrier.run(
retryAlgorithmManager.idempotent(),
() -> httpRequestManager.sendCompleteMultipartUploadRequest(uri, request),
() -> httpRequestManager.sendCompleteMultipartUploadRequest(request),
Decoder.identity());
}

Expand All @@ -96,7 +92,7 @@ public UploadPartResponse uploadPart(UploadPartRequest request, RequestBody requ
if (dirty.getAndSet(true)) {
requestBody.getContent().rewindTo(0);
}
return httpRequestManager.sendUploadPartRequest(uri, request, requestBody.getContent());
return httpRequestManager.sendUploadPartRequest(request, requestBody.getContent());
},
Decoder.identity());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,26 @@ final class MultipartUploadHttpRequestManager {
private final HttpRequestFactory requestFactory;
private final ObjectParser objectParser;
private final HeaderProvider headerProvider;
private final URI uri;

MultipartUploadHttpRequestManager(
HttpRequestFactory requestFactory, ObjectParser objectParser, HeaderProvider headerProvider) {
HttpRequestFactory requestFactory,
ObjectParser objectParser,
HeaderProvider headerProvider,
URI uri) {
this.requestFactory = requestFactory;
this.objectParser = objectParser;
this.headerProvider = headerProvider;
this.uri = uri;
}

CreateMultipartUploadResponse sendCreateMultipartUploadRequest(
URI uri, CreateMultipartUploadRequest request) throws IOException {
CreateMultipartUploadRequest request) throws IOException {

String createUri =
UriTemplate.expand(
uri.toString() + "{bucket}/{key}?uploads",
uri.toString(),
"{bucket}/{key}?uploads",
ImmutableMap.of("bucket", request.bucket(), "key", request.key()),
false);

Expand All @@ -85,7 +91,7 @@ CreateMultipartUploadResponse sendCreateMultipartUploadRequest(
return httpRequest.execute().parseAs(CreateMultipartUploadResponse.class);
}

ListPartsResponse sendListPartsRequest(URI uri, ListPartsRequest request) throws IOException {
ListPartsResponse sendListPartsRequest(ListPartsRequest request) throws IOException {

ImmutableMap.Builder<String, Object> params =
ImmutableMap.<String, Object>builder()
Expand All @@ -101,7 +107,8 @@ ListPartsResponse sendListPartsRequest(URI uri, ListPartsRequest request) throws

String listUri =
UriTemplate.expand(
uri.toString() + "{bucket}/{key}{?uploadId,max-parts,part-number-marker}",
uri.toString(),
"{bucket}/{key}{?uploadId,max-parts,part-number-marker}",
params.build(),
false);
HttpRequest httpRequest = requestFactory.buildGetRequest(new GenericUrl(listUri));
Expand All @@ -111,12 +118,13 @@ ListPartsResponse sendListPartsRequest(URI uri, ListPartsRequest request) throws
return httpRequest.execute().parseAs(ListPartsResponse.class);
}

AbortMultipartUploadResponse sendAbortMultipartUploadRequest(
URI uri, AbortMultipartUploadRequest request) throws IOException {
AbortMultipartUploadResponse sendAbortMultipartUploadRequest(AbortMultipartUploadRequest request)
throws IOException {

String abortUri =
UriTemplate.expand(
uri.toString() + "{bucket}/{key}{?uploadId}",
uri.toString(),
"{bucket}/{key}{?uploadId}",
ImmutableMap.of(
"bucket", request.bucket(), "key", request.key(), "uploadId", request.uploadId()),
false);
Expand All @@ -129,7 +137,7 @@ AbortMultipartUploadResponse sendAbortMultipartUploadRequest(
}

CompleteMultipartUploadResponse sendCompleteMultipartUploadRequest(
URI uri, CompleteMultipartUploadRequest request) throws IOException {
CompleteMultipartUploadRequest request) throws IOException {
String completeUri =
UriTemplate.expand(
uri.toString() + "{bucket}/{key}{?uploadId}",
Expand All @@ -149,7 +157,7 @@ CompleteMultipartUploadResponse sendCompleteMultipartUploadRequest(
}

UploadPartResponse sendUploadPartRequest(
URI uri, UploadPartRequest request, RewindableContent rewindableContent) throws IOException {
UploadPartRequest request, RewindableContent rewindableContent) throws IOException {
String uploadUri =
UriTemplate.expand(
uri.toString() + "{bucket}/{key}{?partNumber,uploadId}",
Expand Down Expand Up @@ -190,7 +198,8 @@ static MultipartUploadHttpRequestManager createFrom(HttpStorageOptions options)
return new MultipartUploadHttpRequestManager(
storage.getRequestFactory(),
new XmlObjectParser(new XmlMapper()),
options.getMergedHeaderProvider(FixedHeaderProvider.create(stableHeaders.build())));
options.getMergedHeaderProvider(FixedHeaderProvider.create(stableHeaders.build())),
URI.create(options.getHost()));
}

private void addChecksumHeader(@Nullable Crc32cLengthKnown crc32c, HttpHeaders headers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import static io.grpc.netty.shaded.io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
import static io.grpc.netty.shaded.io.netty.handler.codec.http.HttpHeaderValues.CLOSE;

import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.NoCredentials;
import com.google.cloud.storage.it.runner.registry.Registry;
import io.grpc.netty.shaded.io.netty.bootstrap.ServerBootstrap;
import io.grpc.netty.shaded.io.netty.buffer.ByteBuf;
import io.grpc.netty.shaded.io.netty.channel.Channel;
Expand All @@ -44,17 +47,25 @@
import io.grpc.netty.shaded.io.netty.handler.logging.LoggingHandler;
import java.net.InetSocketAddress;
import java.net.URI;
import java.time.Duration;

final class FakeHttpServer implements AutoCloseable {

private final URI endpoint;
private final Channel channel;
private final Runnable shutdown;
private final HttpStorageOptions httpStorageOptions;

private FakeHttpServer(URI endpoint, Channel channel, Runnable shutdown) {
private FakeHttpServer(
URI endpoint, Channel channel, Runnable shutdown, HttpStorageOptions httpStorageOptions) {
this.endpoint = endpoint;
this.channel = channel;
this.shutdown = shutdown;
this.httpStorageOptions = httpStorageOptions;
}

public HttpStorageOptions getHttpStorageOptions() {
return httpStorageOptions;
}

public URI getEndpoint() {
Expand Down Expand Up @@ -99,13 +110,34 @@ protected void initChannel(SocketChannel ch) {
Channel channel = b.bind(address).syncUninterruptibly().channel();

InetSocketAddress socketAddress = (InetSocketAddress) channel.localAddress();
URI endpoint = URI.create("http://localhost:" + socketAddress.getPort() + "/");
HttpStorageOptions httpStorageOptions =
HttpStorageOptions.http()
.setHost(endpoint.toString())
.setProjectId("test-proj")
.setCredentials(NoCredentials.getInstance())
.setOpenTelemetry(Registry.getInstance().otelSdk.get().get())
// cut most retry settings by half. we're hitting an in process server.
.setRetrySettings(
RetrySettings.newBuilder()
.setTotalTimeoutDuration(Duration.ofSeconds(25))
.setInitialRetryDelayDuration(Duration.ofMillis(250))
.setRetryDelayMultiplier(1.2)
.setMaxRetryDelayDuration(Duration.ofSeconds(16))
.setMaxAttempts(6)
.setInitialRpcTimeoutDuration(Duration.ofSeconds(25))
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeoutDuration(Duration.ofSeconds(25))
.build())
.build();
return new FakeHttpServer(
URI.create("http://localhost:" + socketAddress.getPort()),
endpoint,
channel,
() -> {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
});
},
httpStorageOptions);
}

interface HttpRequestHandler {
Expand Down
Loading