Skip to content

Commit 5a7a425

Browse files
authored
Refactor GCS fixture multipart parser (#125828)
1 parent 299bf44 commit 5a7a425

File tree

5 files changed

+347
-129
lines changed

5 files changed

+347
-129
lines changed

modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import fixture.gcs.FakeOAuth2HttpHandler;
1212
import fixture.gcs.GoogleCloudStorageHttpHandler;
13+
import fixture.gcs.MultipartUpload;
1314

1415
import com.google.api.client.http.HttpExecuteInterceptor;
1516
import com.google.api.client.http.HttpRequestInitializer;
@@ -43,7 +44,6 @@
4344
import org.elasticsearch.core.Nullable;
4445
import org.elasticsearch.core.SuppressForbidden;
4546
import org.elasticsearch.core.TimeValue;
46-
import org.elasticsearch.core.Tuple;
4747
import org.elasticsearch.http.ResponseInjectingHttpHandler;
4848
import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase;
4949
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
@@ -62,15 +62,13 @@
6262
import java.util.Locale;
6363
import java.util.Map;
6464
import java.util.Objects;
65-
import java.util.Optional;
6665
import java.util.Queue;
6766
import java.util.concurrent.ConcurrentHashMap;
6867
import java.util.concurrent.ConcurrentLinkedQueue;
6968
import java.util.concurrent.atomic.AtomicBoolean;
7069
import java.util.concurrent.atomic.AtomicInteger;
7170
import java.util.concurrent.atomic.AtomicReference;
7271

73-
import static fixture.gcs.GoogleCloudStorageHttpHandler.parseMultipartRequestBody;
7472
import static fixture.gcs.TestUtils.createServiceAccount;
7573
import static java.nio.charset.StandardCharsets.UTF_8;
7674
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
@@ -80,7 +78,6 @@
8078
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING;
8179
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING;
8280
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING;
83-
import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent;
8481
import static org.hamcrest.Matchers.anyOf;
8582
import static org.hamcrest.Matchers.containsString;
8683
import static org.hamcrest.Matchers.equalTo;
@@ -268,17 +265,16 @@ public void testWriteBlobWithRetries() throws Exception {
268265
final CountDown countDown = new CountDown(maxRetries);
269266

270267
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).build();
271-
final byte[] bytes = randomBlobContent();
268+
final byte[] bytes = randomBlobContent(0);
272269
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
273270
assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
274271
if (countDown.countDown()) {
275-
Optional<Tuple<String, BytesReference>> content = parseMultipartRequestBody(exchange.getRequestBody());
276-
assertThat(content, isPresent());
277-
assertThat(content.get().v1(), equalTo(blobContainer.path().buildAsString() + "write_blob_max_retries"));
278-
if (Objects.deepEquals(bytes, BytesReference.toBytes(content.get().v2()))) {
272+
MultipartUpload multipartUpload = MultipartUpload.parseBody(exchange, exchange.getRequestBody());
273+
assertEquals(multipartUpload.name(), blobContainer.path().buildAsString() + "write_blob_max_retries");
274+
if (multipartUpload.content().equals(new BytesArray(bytes))) {
279275
byte[] response = Strings.format("""
280276
{"bucket":"bucket","name":"%s"}
281-
""", content.get().v1()).getBytes(UTF_8);
277+
""", multipartUpload.name()).getBytes(UTF_8);
282278
exchange.getResponseHeaders().add("Content-Type", "application/json");
283279
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
284280
exchange.getResponseBody().write(response);

test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java

Lines changed: 6 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.common.io.Streams;
1919
import org.elasticsearch.common.regex.Regex;
2020
import org.elasticsearch.core.SuppressForbidden;
21-
import org.elasticsearch.core.Tuple;
2221
import org.elasticsearch.rest.RestStatus;
2322
import org.elasticsearch.rest.RestUtils;
2423
import org.elasticsearch.test.fixture.HttpHeaderParser;
@@ -27,26 +26,17 @@
2726
import org.elasticsearch.xcontent.XContentFactory;
2827
import org.elasticsearch.xcontent.XContentType;
2928

30-
import java.io.BufferedReader;
3129
import java.io.IOException;
32-
import java.io.InputStream;
33-
import java.io.InputStreamReader;
3430
import java.net.URLDecoder;
3531
import java.util.HashMap;
3632
import java.util.Locale;
3733
import java.util.Map;
3834
import java.util.Objects;
39-
import java.util.Optional;
4035
import java.util.concurrent.atomic.AtomicInteger;
41-
import java.util.regex.Matcher;
42-
import java.util.regex.Pattern;
4336
import java.util.stream.Collectors;
44-
import java.util.zip.GZIPInputStream;
4537

4638
import static fixture.gcs.MockGcsBlobStore.failAndThrow;
4739
import static java.nio.charset.StandardCharsets.UTF_8;
48-
import static java.util.stream.Collectors.joining;
49-
import static org.elasticsearch.core.Strings.format;
5040

5141
/**
5242
* Minimal HTTP handler that acts as a Google Cloud Storage compliant server
@@ -183,26 +173,18 @@ public void handle(final HttpExchange exchange) throws IOException {
183173
exchange.getResponseBody().write(response);
184174

185175
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=multipart*", request)) {
186-
// Multipart upload
187-
Optional<Tuple<String, BytesReference>> content = parseMultipartRequestBody(requestBody.streamInput());
188-
if (content.isPresent()) {
176+
try {
177+
final var multipartUpload = MultipartUpload.parseBody(exchange, requestBody.streamInput());
189178
final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH);
190179
final MockGcsBlobStore.BlobVersion newBlobVersion = mockGcsBlobStore.updateBlob(
191-
content.get().v1(),
180+
multipartUpload.name(),
192181
ifGenerationMatch,
193-
content.get().v2()
182+
multipartUpload.content()
194183
);
195184
writeBlobVersionAsJson(exchange, newBlobVersion);
196-
} else {
197-
throw new AssertionError(
198-
"Could not read multi-part request to ["
199-
+ request
200-
+ "] with headers ["
201-
+ new HashMap<>(exchange.getRequestHeaders())
202-
+ "]"
203-
);
185+
} catch (IllegalArgumentException e) {
186+
throw new AssertionError(e);
204187
}
205-
206188
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=resumable*", request)) {
207189
// Resumable upload initialization https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
208190
final Map<String, String> params = new HashMap<>();
@@ -328,81 +310,6 @@ private static String httpServerUrl(final HttpExchange exchange) {
328310
return "http://" + exchange.getRequestHeaders().get("HOST").get(0);
329311
}
330312

331-
private static final Pattern NAME_PATTERN = Pattern.compile("\"name\":\"([^\"]*)\"");
332-
333-
public static Optional<Tuple<String, BytesReference>> parseMultipartRequestBody(final InputStream requestBody) throws IOException {
334-
Tuple<String, BytesReference> content = null;
335-
final BytesReference fullRequestBody;
336-
try (InputStream in = new GZIPInputStream(requestBody)) {
337-
fullRequestBody = Streams.readFully(in);
338-
}
339-
String name = null;
340-
boolean skippedEmptyLine = false;
341-
int startPos = 0;
342-
int endPos = 0;
343-
while (startPos < fullRequestBody.length()) {
344-
do {
345-
endPos = fullRequestBody.indexOf((byte) '\r', endPos + 1);
346-
} while (endPos >= 0 && fullRequestBody.get(endPos + 1) != '\n');
347-
boolean markAndContinue = false;
348-
final String bucketPrefix = "{\"bucket\":";
349-
if (startPos > 0) {
350-
startPos += 2;
351-
}
352-
if (name == null || skippedEmptyLine == false) {
353-
if ((skippedEmptyLine == false && endPos == startPos)
354-
|| (fullRequestBody.get(startPos) == '-' && fullRequestBody.get(startPos + 1) == '-')) {
355-
markAndContinue = true;
356-
} else {
357-
final String start = fullRequestBody.slice(startPos, Math.min(endPos - startPos, bucketPrefix.length())).utf8ToString();
358-
if (start.toLowerCase(Locale.ROOT).startsWith("content")) {
359-
markAndContinue = true;
360-
} else if (start.startsWith(bucketPrefix)) {
361-
markAndContinue = true;
362-
final String line = fullRequestBody.slice(
363-
startPos + bucketPrefix.length(),
364-
endPos - startPos - bucketPrefix.length()
365-
).utf8ToString();
366-
Matcher matcher = NAME_PATTERN.matcher(line);
367-
if (matcher.find()) {
368-
name = matcher.group(1);
369-
}
370-
}
371-
}
372-
skippedEmptyLine = markAndContinue && endPos == startPos;
373-
startPos = endPos;
374-
} else {
375-
while (isEndOfPart(fullRequestBody, endPos) == false) {
376-
endPos = fullRequestBody.indexOf((byte) '\r', endPos + 1);
377-
}
378-
content = Tuple.tuple(name, fullRequestBody.slice(startPos, endPos - startPos));
379-
break;
380-
}
381-
}
382-
if (content == null) {
383-
final InputStream stream = fullRequestBody.streamInput();
384-
logger.warn(
385-
() -> format(
386-
"Failed to find multi-part upload in [%s]",
387-
new BufferedReader(new InputStreamReader(stream)).lines().collect(joining("\n"))
388-
)
389-
);
390-
}
391-
return Optional.ofNullable(content);
392-
}
393-
394-
private static final byte[] END_OF_PARTS_MARKER = "\r\n--__END_OF_PART__".getBytes(UTF_8);
395-
396-
private static boolean isEndOfPart(BytesReference fullRequestBody, int endPos) {
397-
for (int i = 0; i < END_OF_PARTS_MARKER.length; i++) {
398-
final byte b = END_OF_PARTS_MARKER[i];
399-
if (fullRequestBody.get(endPos + i) != b) {
400-
return false;
401-
}
402-
}
403-
return true;
404-
}
405-
406313
private static String requireHeader(HttpExchange exchange, String headerName) {
407314
final String headerValue = exchange.getRequestHeaders().getFirst(headerName);
408315
if (headerValue != null) {

0 commit comments

Comments
 (0)