Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.common.policy.RetryPolicyType;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

Expand Down Expand Up @@ -246,7 +245,8 @@ private AzureHTTPStatsCollectorHandler(HttpHandler delegate) {
}

@Override
protected void maybeTrack(String request, Headers headers) {
protected void maybeTrack(HttpExchange exchange) {
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
if (GET_BLOB_PATTERN.test(request)) {
trackRequest("GetBlob");
} else if (Regex.simpleMatch("HEAD /*/*/*", request)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,9 @@ private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpSta
}

@Override
public void maybeTrack(final String request, Headers requestHeaders) {
public void maybeTrack(HttpExchange exchange) {
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
final Headers requestHeaders = exchange.getRequestHeaders();
if (Regex.simpleMatch("GET */storage/v1/b/*/o/*", request)) {
trackRequest(Operation.GET_OBJECT.key());
} else if (Regex.simpleMatch("GET /storage/v1/b/*/o*", request)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
import com.amazonaws.services.s3.model.MultipartUpload;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

Expand Down Expand Up @@ -692,16 +691,17 @@ protected class S3StatsCollectorHttpHandler extends HttpStatsCollectorHandler {
super(delegate);
}

private S3HttpHandler.S3Request parseRequest(HttpExchange exchange) {
return new S3HttpHandler("bucket").parseRequest(exchange);
}

@Override
public void handle(HttpExchange exchange) throws IOException {
final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents(
S3HttpHandler.getRawRequestString(exchange)
);
if (false == requestComponents.request().startsWith("HEAD ")) {
assertThat(requestComponents.customQueryParameters(), hasKey(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE));
final S3HttpHandler.S3Request s3Request = parseRequest(exchange);
if ("HEAD".equals(s3Request.method())) {
assertTrue(s3Request.hasQueryParamOnce(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE));
}
final String request = requestComponents.request();
if (shouldFailCompleteMultipartUploadRequest.get() && Regex.simpleMatch("POST /*/*?uploadId=*", request)) {
if (shouldFailCompleteMultipartUploadRequest.get() && s3Request.isCompleteMultipartUploadRequest()) {
try (exchange) {
drainInputStream(exchange.getRequestBody());
exchange.sendResponseHeaders(
Expand All @@ -715,20 +715,17 @@ public void handle(HttpExchange exchange) throws IOException {
}

@Override
public void maybeTrack(final String rawRequest, Headers requestHeaders) {
final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents(rawRequest);
final String request = requestComponents.request();
final OperationPurpose purpose = OperationPurpose.parse(
requestComponents.customQueryParameters().get(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE).get(0)
);
if (Regex.simpleMatch("GET /*/?prefix=*", request)) {
public void maybeTrack(HttpExchange exchange) {
final S3HttpHandler.S3Request request = parseRequest(exchange);
final OperationPurpose purpose = OperationPurpose.parse(request.getQueryParamOnce(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE));
if (request.isListObjectsRequest()) {
trackRequest("ListObjects");
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.LIST_OBJECTS, purpose), k -> new AtomicLong())
.incrementAndGet();
} else if (Regex.simpleMatch("GET /*/?uploads&*", request)) {
} else if (request.isListMultipartUploadsRequest()) {
// TODO track ListMultipartUploads requests
logger.info("--> ListMultipartUploads not tracked [{}] with parsed purpose [{}]", request, purpose.getKey());
} else if (Regex.simpleMatch("GET /*/*", request)) {
} else if (request.isGetObjectRequest()) {
trackRequest("GetObject");
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.GET_OBJECT, purpose), k -> new AtomicLong())
.incrementAndGet();
Expand All @@ -738,21 +735,21 @@ public void maybeTrack(final String rawRequest, Headers requestHeaders) {
new S3BlobStore.StatsKey(S3BlobStore.Operation.PUT_MULTIPART_OBJECT, purpose),
k -> new AtomicLong()
).incrementAndGet();
} else if (Regex.simpleMatch("PUT /*/*", request)) {
} else if (request.isPutObjectRequest()) {
trackRequest("PutObject");
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.PUT_OBJECT, purpose), k -> new AtomicLong())
.incrementAndGet();
} else if (Regex.simpleMatch("POST /*/?delete", request)) {
} else if (request.isMultiObjectDeleteRequest()) {
trackRequest("DeleteObjects");
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.DELETE_OBJECTS, purpose), k -> new AtomicLong())
.incrementAndGet();
} else if (Regex.simpleMatch("DELETE /*/*?uploadId=*", request)) {
} else if (request.isAbortMultipartUploadRequest()) {
trackRequest("AbortMultipartObject");
metricsCount.computeIfAbsent(
new S3BlobStore.StatsKey(S3BlobStore.Operation.ABORT_MULTIPART_OBJECT, purpose),
k -> new AtomicLong()
).incrementAndGet();
} else if (Regex.simpleMatch("HEAD /*/*", request)) {
} else if (request.isHeadObjectRequest()) {
trackRequest("HeadObject");
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.HEAD_OBJECT, purpose), k -> new AtomicLong())
.incrementAndGet();
Expand All @@ -765,10 +762,10 @@ Map<S3BlobStore.StatsKey, AtomicLong> getMetricsCount() {
return metricsCount;
}

private boolean isMultiPartUpload(String request) {
return Regex.simpleMatch("POST /*/*?uploads", request)
|| Regex.simpleMatch("POST /*/*?*uploadId=*", request)
|| Regex.simpleMatch("PUT /*/*?*uploadId=*", request);
private boolean isMultiPartUpload(S3HttpHandler.S3Request s3Request) {
return s3Request.isInitiateMultipartUploadRequest()
|| s3Request.isUploadPartRequest()
|| s3Request.isCompleteMultipartUploadRequest();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ protected long getRetryDelayInMillis() {
};
}

private static S3HttpHandler.S3Request parseRequest(HttpExchange exchange) {
return new S3HttpHandler("bucket").parseRequest(exchange);
}

public void testWriteBlobWithRetries() throws Exception {
final int maxRetries = randomInt(5);
final CountDown countDown = new CountDown(maxRetries + 1);
Expand All @@ -265,10 +269,8 @@ public void testWriteBlobWithRetries() throws Exception {

final byte[] bytes = randomBlobContent();
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_max_retries"), exchange -> {
final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents(
S3HttpHandler.getRawRequestString(exchange)
);
if ("PUT".equals(requestComponents.method()) && requestComponents.query().isEmpty()) {
final S3HttpHandler.S3Request s3Request = parseRequest(exchange);
if (s3Request.isPutObjectRequest()) {
if (countDown.countDown()) {
final BytesReference body = Streams.readFully(exchange.getRequestBody());
if (Objects.deepEquals(bytes, BytesReference.toBytes(body))) {
Expand Down Expand Up @@ -353,8 +355,8 @@ public void testWriteBlobWithExceptionThrownAtClosingTime() throws Exception {

var uploadedBytes = new AtomicReference<BytesReference>();
httpServer.createContext(downloadStorageEndpoint(blobContainer, blobName), exchange -> {
var requestComponents = S3HttpHandler.parseRequestComponents(S3HttpHandler.getRawRequestString(exchange));
if ("PUT".equals(requestComponents.method()) && requestComponents.query().isEmpty()) {
var requestComponents = parseRequest(exchange);
if (requestComponents.isPutObjectRequest()) {
var body = Streams.readFully(exchange.getRequestBody());
if (uploadedBytes.compareAndSet(null, body)) {
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
Expand Down Expand Up @@ -408,12 +410,10 @@ public void testWriteLargeBlob() throws Exception {
final CountDown countDownComplete = new CountDown(nbErrors);

httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_large_blob"), exchange -> {
final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents(
S3HttpHandler.getRawRequestString(exchange)
);
final S3HttpHandler.S3Request s3Request = parseRequest(exchange);
final long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length"));

if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploads")) {
if (s3Request.isInitiateMultipartUploadRequest()) {
// initiate multipart upload request
if (countDownInitiate.countDown()) {
byte[] response = ("""
Expand All @@ -429,39 +429,37 @@ public void testWriteLargeBlob() throws Exception {
exchange.close();
return;
}
} else if ("PUT".equals(requestComponents.method())
&& requestComponents.query().contains("uploadId=TEST")
&& requestComponents.query().contains("partNumber=")) {
// upload part request
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
BytesReference bytes = Streams.readFully(md5);
assertThat((long) bytes.length(), anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));
assertThat(contentLength, anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));

if (countDownUploads.decrementAndGet() % 2 == 0) {
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
exchange.close();
return;
}
} else if (s3Request.isUploadPartRequest()) {
// upload part request
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
BytesReference bytes = Streams.readFully(md5);
assertThat((long) bytes.length(), anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));
assertThat(contentLength, anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));

if (countDownUploads.decrementAndGet() % 2 == 0) {
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
exchange.close();
return;
}

} else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) {
// complete multipart upload request
if (countDownComplete.countDown()) {
Streams.readFully(exchange.getRequestBody());
byte[] response = ("""
<?xml version="1.0" encoding="UTF-8"?>
<CompleteMultipartUploadResult>
<Bucket>bucket</Bucket>
<Key>write_large_blob</Key>
</CompleteMultipartUploadResult>""").getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
exchange.getResponseBody().write(response);
exchange.close();
return;
}
} else if (s3Request.isCompleteMultipartUploadRequest()) {
// complete multipart upload request
if (countDownComplete.countDown()) {
Streams.readFully(exchange.getRequestBody());
byte[] response = ("""
<?xml version="1.0" encoding="UTF-8"?>
<CompleteMultipartUploadResult>
<Bucket>bucket</Bucket>
<Key>write_large_blob</Key>
</CompleteMultipartUploadResult>""").getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
exchange.getResponseBody().write(response);
exchange.close();
return;
}
}

// sends an error back or let the request time out
if (useTimeout == false) {
Expand Down Expand Up @@ -510,12 +508,10 @@ public void testWriteLargeBlobStreaming() throws Exception {
final CountDown countDownComplete = new CountDown(nbErrors);

httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_large_blob_streaming"), exchange -> {
final S3HttpHandler.RequestComponents requestComponents = S3HttpHandler.parseRequestComponents(
S3HttpHandler.getRawRequestString(exchange)
);
final S3HttpHandler.S3Request s3Request = parseRequest(exchange);
final long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length"));

if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploads")) {
if (s3Request.isInitiateMultipartUploadRequest()) {
// initiate multipart upload request
if (countDownInitiate.countDown()) {
byte[] response = ("""
Expand All @@ -531,38 +527,36 @@ public void testWriteLargeBlobStreaming() throws Exception {
exchange.close();
return;
}
} else if ("PUT".equals(requestComponents.method())
&& requestComponents.query().contains("uploadId=TEST")
&& requestComponents.query().contains("partNumber=")) {
// upload part request
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
BytesReference bytes = Streams.readFully(md5);

if (counterUploads.incrementAndGet() % 2 == 0) {
bytesReceived.addAndGet(bytes.length());
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
exchange.close();
return;
}
} else if (s3Request.isUploadPartRequest()) {
// upload part request
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
BytesReference bytes = Streams.readFully(md5);

if (counterUploads.incrementAndGet() % 2 == 0) {
bytesReceived.addAndGet(bytes.length());
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
exchange.close();
return;
}

} else if ("POST".equals(requestComponents.method()) && requestComponents.query().equals("uploadId=TEST")) {
// complete multipart upload request
if (countDownComplete.countDown()) {
Streams.readFully(exchange.getRequestBody());
byte[] response = ("""
<?xml version="1.0" encoding="UTF-8"?>
<CompleteMultipartUploadResult>
<Bucket>bucket</Bucket>
<Key>write_large_blob_streaming</Key>
</CompleteMultipartUploadResult>""").getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
exchange.getResponseBody().write(response);
exchange.close();
return;
}
} else if (s3Request.isCompleteMultipartUploadRequest()) {
// complete multipart upload request
if (countDownComplete.countDown()) {
Streams.readFully(exchange.getRequestBody());
byte[] response = ("""
<?xml version="1.0" encoding="UTF-8"?>
<CompleteMultipartUploadResult>
<Bucket>bucket</Bucket>
<Key>write_large_blob_streaming</Key>
</CompleteMultipartUploadResult>""").getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
exchange.getResponseBody().write(response);
exchange.close();
return;
}
}

// sends an error back or let the request time out
if (useTimeout == false) {
Expand Down
Loading