diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index fee728933c343..2d6590a17f1db 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -10,6 +10,7 @@ import fixture.gcs.FakeOAuth2HttpHandler; import fixture.gcs.GoogleCloudStorageHttpHandler; +import fixture.gcs.MultipartUpload; import com.google.api.client.http.HttpExecuteInterceptor; import com.google.api.client.http.HttpRequestInitializer; @@ -43,7 +44,6 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.core.Tuple; import org.elasticsearch.http.ResponseInjectingHttpHandler; import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; @@ -62,7 +62,6 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -70,7 +69,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import static fixture.gcs.GoogleCloudStorageHttpHandler.parseMultipartRequestBody; import static fixture.gcs.TestUtils.createServiceAccount; import static java.nio.charset.StandardCharsets.UTF_8; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; @@ -80,7 +78,6 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING; -import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -268,17 +265,16 @@ public void testWriteBlobWithRetries() throws Exception { final CountDown countDown = new CountDown(maxRetries); final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).build(); - final byte[] bytes = randomBlobContent(); + final byte[] bytes = randomBlobContent(0); httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> { assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart")); if (countDown.countDown()) { - Optional> content = parseMultipartRequestBody(exchange.getRequestBody()); - assertThat(content, isPresent()); - assertThat(content.get().v1(), equalTo(blobContainer.path().buildAsString() + "write_blob_max_retries")); - if (Objects.deepEquals(bytes, BytesReference.toBytes(content.get().v2()))) { + MultipartUpload multipartUpload = MultipartUpload.parseBody(exchange, exchange.getRequestBody()); + assertEquals(multipartUpload.name(), blobContainer.path().buildAsString() + "write_blob_max_retries"); + if (multipartUpload.content().equals(new BytesArray(bytes))) { byte[] response = Strings.format(""" {"bucket":"bucket","name":"%s"} - """, content.get().v1()).getBytes(UTF_8); + """, multipartUpload.name()).getBytes(UTF_8); exchange.getResponseHeaders().add("Content-Type", "application/json"); exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); exchange.getResponseBody().write(response); diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java index c9a52f3c2ffb5..b83d3381b6f96 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -18,7 +18,6 @@ import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.core.SuppressForbidden; -import org.elasticsearch.core.Tuple; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestUtils; import org.elasticsearch.test.fixture.HttpHeaderParser; @@ -27,26 +26,17 @@ import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentType; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.net.URLDecoder; import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; -import java.util.zip.GZIPInputStream; import static fixture.gcs.MockGcsBlobStore.failAndThrow; import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.stream.Collectors.joining; -import static org.elasticsearch.core.Strings.format; /** * Minimal HTTP handler that acts as a Google Cloud Storage compliant server @@ -183,26 +173,18 @@ public void handle(final HttpExchange exchange) throws IOException { exchange.getResponseBody().write(response); } else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=multipart*", request)) { - // Multipart upload - Optional> content = parseMultipartRequestBody(requestBody.streamInput()); - if (content.isPresent()) { + try { + final var multipartUpload = MultipartUpload.parseBody(exchange, requestBody.streamInput()); final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH); final MockGcsBlobStore.BlobVersion newBlobVersion = mockGcsBlobStore.updateBlob( - content.get().v1(), + multipartUpload.name(), ifGenerationMatch, - content.get().v2() + multipartUpload.content() ); writeBlobVersionAsJson(exchange, newBlobVersion); - } else { - throw new AssertionError( - "Could not read multi-part request to [" - + request - + "] with headers [" - + new HashMap<>(exchange.getRequestHeaders()) - + "]" - ); + } catch (IllegalArgumentException e) { + throw new AssertionError(e); } - } else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=resumable*", request)) { // Resumable upload initialization https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload final Map params = new HashMap<>(); @@ -328,81 +310,6 @@ private static String httpServerUrl(final HttpExchange exchange) { return "http://" + exchange.getRequestHeaders().get("HOST").get(0); } - private static final Pattern NAME_PATTERN = Pattern.compile("\"name\":\"([^\"]*)\""); - - public static Optional> parseMultipartRequestBody(final InputStream requestBody) throws IOException { - Tuple content = null; - final BytesReference fullRequestBody; - try (InputStream in = new GZIPInputStream(requestBody)) { - fullRequestBody = Streams.readFully(in); - } - String name = null; - boolean skippedEmptyLine = false; - int startPos = 0; - int endPos = 0; - while (startPos < fullRequestBody.length()) { - do { - endPos = fullRequestBody.indexOf((byte) '\r', endPos + 1); - } while (endPos >= 0 && fullRequestBody.get(endPos + 1) != '\n'); - boolean markAndContinue = false; - final String bucketPrefix = "{\"bucket\":"; - if (startPos > 0) { - startPos += 2; - } - if (name == null || skippedEmptyLine == false) { - if ((skippedEmptyLine == false && endPos == startPos) - || (fullRequestBody.get(startPos) == '-' && fullRequestBody.get(startPos + 1) == '-')) { - markAndContinue = true; - } else { - final String start = fullRequestBody.slice(startPos, Math.min(endPos - startPos, bucketPrefix.length())).utf8ToString(); - if (start.toLowerCase(Locale.ROOT).startsWith("content")) { - markAndContinue = true; - } else if (start.startsWith(bucketPrefix)) { - markAndContinue = true; - final String line = fullRequestBody.slice( - startPos + bucketPrefix.length(), - endPos - startPos - bucketPrefix.length() - ).utf8ToString(); - Matcher matcher = NAME_PATTERN.matcher(line); - if (matcher.find()) { - name = matcher.group(1); - } - } - } - skippedEmptyLine = markAndContinue && endPos == startPos; - startPos = endPos; - } else { - while (isEndOfPart(fullRequestBody, endPos) == false) { - endPos = fullRequestBody.indexOf((byte) '\r', endPos + 1); - } - content = Tuple.tuple(name, fullRequestBody.slice(startPos, endPos - startPos)); - break; - } - } - if (content == null) { - final InputStream stream = fullRequestBody.streamInput(); - logger.warn( - () -> format( - "Failed to find multi-part upload in [%s]", - new BufferedReader(new InputStreamReader(stream)).lines().collect(joining("\n")) - ) - ); - } - return Optional.ofNullable(content); - } - - private static final byte[] END_OF_PARTS_MARKER = "\r\n--__END_OF_PART__".getBytes(UTF_8); - - private static boolean isEndOfPart(BytesReference fullRequestBody, int endPos) { - for (int i = 0; i < END_OF_PARTS_MARKER.length; i++) { - final byte b = END_OF_PARTS_MARKER[i]; - if (fullRequestBody.get(endPos + i) != b) { - return false; - } - } - return true; - } - private static String requireHeader(HttpExchange exchange, String headerName) { final String headerValue = exchange.getRequestHeaders().getFirst(headerName); if (headerValue != null) { diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java new file mode 100644 index 0000000000000..9dba250a207b5 --- /dev/null +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MultipartUpload.java @@ -0,0 +1,200 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package fixture.gcs; + +import com.sun.net.httpserver.HttpExchange; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.CompositeBytesReference; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.regex.Pattern; +import java.util.zip.GZIPInputStream; + +public record MultipartUpload(String bucket, String name, String generation, String crc32, String md5, BytesReference content) { + + static final byte[] BODY_PART_HEADERS_DELIMITER = new byte[] { '\r', '\n', '\r', '\n' }; + static final Pattern METADATA_PATTERN = Pattern.compile("\"(bucket|name|generation|crc32c|md5Hash)\":\"([^\"]*)\""); + static final Pattern BOUNDARY_HEADER_PATTERN = Pattern.compile("multipart/\\w+; boundary=\\\"?(.*)\\\"?"); + + /** + * Reads HTTP content of MultipartUpload. First part is always json metadata, followed by binary parts. + * Every part has own headers and content. Parts are separated by dash-boundary(--boundary) delimiter, + * and boundary is defined in the HTTP header Content-Type, + * like this {@code multipart/related; boundary=__END_OF_PART__4914cd49-4065-44f6-9846-ce805fe1e77f__}. + * Last part, close-delimiter, is dashed from both sides {@code --boundary--}. + * Part headers are separated from the content by double CRLF. + * More details here rfc2046. + * + *
+     *     {@code
+     * --boundary CRLF
+     * headers CRLF
+     * CRLF
+     * content CRLF
+     * --boundary CRLF
+     * headers CRLF
+     * CRLF
+     * content CRLF
+     * --boundary--
+     *     }
+     * 
+ */ + public static MultipartUpload parseBody(HttpExchange exchange, InputStream gzipInput) throws IOException { + var m = BOUNDARY_HEADER_PATTERN.matcher(exchange.getRequestHeaders().getFirst("Content-Type")); + if (m.matches() == false) { + throw new IllegalStateException("boundary header is not present"); + } + var boundary = m.group(1); + try (var input = new GZIPInputStream(gzipInput)) { + return parseBody(boundary, input); + } + } + + // for tests + static MultipartUpload parseBody(String boundary, InputStream input) throws IOException { + var reader = new MultipartContentReader(boundary, input); + + // read first body-part - blob metadata json + var metadataBytes = reader.next(); + var match = METADATA_PATTERN.matcher(metadataBytes.utf8ToString()); + String bucket = "", name = "", gen = "", crc = "", md5 = ""; + while (match.find()) { + switch (match.group(1)) { + case "bucket" -> bucket = match.group(2); + case "name" -> name = match.group(2); + case "generation" -> gen = match.group(2); + case "crc32c" -> crc = match.group(2); + case "md5Hash" -> md5 = match.group(2); + } + } + + // read and combine remaining parts + var blobParts = new ArrayList(); + while (reader.hasNext()) { + blobParts.add(reader.next()); + } + var compositeBuf = CompositeBytesReference.of(blobParts.toArray(new BytesReference[0])); + + return new MultipartUpload(bucket, name, gen, crc, md5, compositeBuf); + } + + /** + * Must call after reading body-part-delimiter to see if there are more parts. + * If there are no parts, a closing double dash is expected, otherwise CRLF. + */ + static boolean readCloseDelimiterOrCRLF(InputStream is) throws IOException { + var d1 = is.read(); + var d2 = is.read(); + if (d1 == '-' && d2 == '-') { + return true; + } else if (d1 == '\r' && d2 == '\n') { + return false; + } else { + throw new IllegalStateException("expect '--' or CRLF, got " + d1 + " " + d2); + } + } + + /** + * Read bytes from stream into buffer until reach given delimiter. The delimiter is consumed too. + */ + static BytesReference readUntilDelimiter(InputStream is, byte[] delimiter) throws IOException { + assert delimiter.length > 0; + var out = new ByteArrayOutputStream(1024); + var delimiterMatchLen = 0; + while (true) { + var c = is.read(); + if (c == -1) { + throw new IllegalStateException("expected delimiter, but reached end of stream "); + } + var b = (byte) c; + out.write(b); + if (delimiter[delimiterMatchLen] == b) { + delimiterMatchLen++; + if (delimiterMatchLen >= delimiter.length) { + var bytes = out.toByteArray(); + return new BytesArray(bytes, 0, bytes.length - delimiter.length); + } + } else { + if (delimiter[0] == b) { + delimiterMatchLen = 1; + } else { + delimiterMatchLen = 0; + } + } + } + } + + /** + * Discard bytes from stream until reach given delimiter. The delimiter is consumed too. + */ + static void skipUntilDelimiter(InputStream is, byte[] delimiter) throws IOException { + assert delimiter.length > 0; + var delimiterMatchLen = 0; + while (true) { + var c = is.read(); + if (c == -1) { + throw new IllegalStateException("expected delimiter, but reached end of stream "); + } + var b = (byte) c; + if (delimiter[delimiterMatchLen] == b) { + delimiterMatchLen++; + if (delimiterMatchLen >= delimiter.length) { + return; + } + } else { + if (delimiter[0] == b) { + delimiterMatchLen = 1; + } else { + delimiterMatchLen = 0; + } + } + } + } + + /** + * Multipart content iterator. + */ + static class MultipartContentReader implements Iterator { + private final InputStream input; + private final byte[] bodyPartDelimiter; + private boolean done; + + MultipartContentReader(String boundary, InputStream input) throws IOException { + this.input = input; + this.bodyPartDelimiter = ("\r\n--" + boundary).getBytes(); + byte[] dashBoundary = ("--" + boundary).getBytes(); + skipUntilDelimiter(input, dashBoundary); + readCloseDelimiterOrCRLF(input); + } + + @Override + public boolean hasNext() { + return done == false; + } + + @Override + public BytesReference next() { + try { + skipUntilDelimiter(input, BODY_PART_HEADERS_DELIMITER); + BytesReference buf = readUntilDelimiter(input, bodyPartDelimiter); + done = readCloseDelimiterOrCRLF(input); + return buf; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java index 3eade77fe1046..2185f495ea81f 100644 --- a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java +++ b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/GoogleCloudStorageHttpHandlerTests.java @@ -102,9 +102,10 @@ public void testSimpleObjectOperations() { assertEquals(new TestHttpResponse(RestStatus.OK, """ {"kind":"storage#objects","items":[],"prefixes":[]}"""), listBlobs(handler, bucket, "some/other/path", null)); + var boundary = newMultipartBoundary(); assertEquals( new TestHttpResponse(RestStatus.OK, """ - --__END_OF_PART__d8b50acb-87dc-4630-a3d3-17d187132ebc__ + --$boundary Content-Length: 168 Content-Type: application/http content-id: 1 @@ -115,13 +116,13 @@ public void testSimpleObjectOperations() { - --__END_OF_PART__d8b50acb-87dc-4630-a3d3-17d187132ebc__ - """.replaceAll("\n", "\r\n")), + --$boundary-- + """.replace("\n", "\r\n").replace("$boundary", boundary)), handleRequest( handler, "POST", "/batch/storage/v1", - createBatchDeleteRequest(bucket, blobName), + createBatchDeleteRequest(bucket, boundary, blobName), Headers.of("Content-Type", "mixed/multipart") ) ); @@ -131,7 +132,7 @@ public void testSimpleObjectOperations() { handler, "POST", "/batch/storage/v1", - createBatchDeleteRequest(bucket, blobName), + createBatchDeleteRequest(bucket, boundary, blobName), Headers.of("Content-Type", "mixed/multipart") ).restStatus() ); @@ -615,11 +616,16 @@ private static TestHttpResponse executeMultipartUpload( BytesReference bytes, Long ifGenerationMatch ) { + var headers = new Headers(); + // multipart upload is required to provide boundary header + var boundary = newMultipartBoundary(); + headers.put("Content-Type", List.of("multipart/related; boundary=" + boundary)); return handleRequest( handler, "POST", "/upload/storage/v1/b/" + bucket + "/" + generateQueryString("uploadType", "multipart", "ifGenerationMatch", ifGenerationMatch), - createGzipCompressedMultipartUploadBody(bucket, blobName, bytes) + createGzipCompressedMultipartUploadBody(bucket, blobName, bytes, boundary), + headers ); } @@ -785,25 +791,37 @@ private static Headers rangeHeader(long start, long end) { return Headers.of("Range", Strings.format("bytes=%d-%d", start, end)); } - private static BytesReference createGzipCompressedMultipartUploadBody(String bucketName, String path, BytesReference content) { + private static String newMultipartBoundary() { + return "__END_OF_PART__" + randomUUID(); + } + + private static BytesReference createGzipCompressedMultipartUploadBody( + String bucketName, + String path, + BytesReference content, + String boundary + ) { final String metadataString = Strings.format("{\"bucket\":\"%s\", \"name\":\"%s\"}", bucketName, path); - final BytesReference header = new BytesArray(Strings.format(""" - --__END_OF_PART__a607a67c-6df7-4b87-b8a1-81f639a75a97__ - Content-Length: %d + final String headerStr = """ + --$boundary + Content-Length: $metadata-length Content-Type: application/json; charset=UTF-8 content-transfer-encoding: binary - %s - --__END_OF_PART__a607a67c-6df7-4b87-b8a1-81f639a75a97__ + $metadata + --$boundary Content-Type: application/octet-stream content-transfer-encoding: binary - """.replaceAll("\n", "\r\n"), metadataString.length(), metadataString).getBytes(StandardCharsets.UTF_8)); - + """.replace("\n", "\r\n") + .replace("$boundary", boundary) + .replace("$metadata-length", Integer.toString(metadataString.length())) + .replace("$metadata", metadataString); + final BytesReference header = new BytesArray(headerStr.getBytes(StandardCharsets.UTF_8)); final BytesReference footer = new BytesArray(""" - --__END_OF_PART__a607a67c-6df7-4b87-b8a1-81f639a75a97__-- - """.replaceAll("\n", "\r\n")); + --$boundary-- + """.replace("\n", "\r\n").replace("$boundary", boundary)); final ByteArrayOutputStream out = new ByteArrayOutputStream(); try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(out)) { gzipOutputStream.write(BytesReference.toBytes(CompositeBytesReference.of(header, content, footer))); @@ -813,7 +831,7 @@ private static BytesReference createGzipCompressedMultipartUploadBody(String buc return new BytesArray(out.toByteArray()); } - private static String createBatchDeleteRequest(String bucketName, String... paths) { + private static String createBatchDeleteRequest(String bucketName, String boundary, String... paths) { final String deleteRequestTemplate = """ DELETE %s/storage/v1/b/%s/o/%s HTTP/1.1 Authorization: Bearer foo @@ -822,14 +840,14 @@ private static String createBatchDeleteRequest(String bucketName, String... path """; final String partTemplate = """ - --__END_OF_PART__d8b50acb-87dc-4630-a3d3-17d187132ebc__ + --$boundary Content-Length: %d Content-Type: application/http content-id: %d content-transfer-encoding: binary %s - """; + """.replace("$boundary", boundary); StringBuilder builder = new StringBuilder(); AtomicInteger contentId = new AtomicInteger(); Arrays.stream(paths).forEach(p -> { @@ -837,7 +855,7 @@ private static String createBatchDeleteRequest(String bucketName, String... path final String part = Strings.format(partTemplate, deleteRequest.length(), contentId.incrementAndGet(), deleteRequest); builder.append(part); }); - builder.append("--__END_OF_PART__d8b50acb-87dc-4630-a3d3-17d187132ebc__"); + builder.append("--").append(boundary).append("--"); return builder.toString(); } diff --git a/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java new file mode 100644 index 0000000000000..6b9b8a201bfb7 --- /dev/null +++ b/test/fixtures/gcs-fixture/src/test/java/fixture/gcs/MultipartUploadTests.java @@ -0,0 +1,97 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package fixture.gcs; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.CompositeBytesReference; +import org.elasticsearch.common.io.stream.ByteArrayStreamInput; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.Arrays; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class MultipartUploadTests extends ESTestCase { + + // produces content that does not contain boundary + static String randomPartContent(int len, String boundary) { + assert len > 0 && boundary.isEmpty() == false; + var content = randomAlphanumericOfLength(len); + var replacement = boundary.getBytes(UTF_8); + replacement[0]++; // change single char to make it different from original + return content.replace(boundary, Arrays.toString(replacement)); + } + + public void testGenericMultipart() throws IOException { + var boundary = randomAlphanumericOfLength(between(1, 70)); + var part1 = "plain text\nwith line break"; + var part2 = ""; + var part3 = randomPartContent(between(1, 1024), boundary); + var strInput = """ + --$boundary\r + \r + \r + $part1\r + --$boundary\r + X-Header: x-man\r + \r + $part2\r + --$boundary\r + Content-Type: application/octet-stream\r + \r + $part3\r + --$boundary--""".replace("$boundary", boundary).replace("$part1", part1).replace("$part2", part2).replace("$part3", part3); + + var reader = new MultipartUpload.MultipartContentReader(boundary, new ByteArrayStreamInput(strInput.getBytes())); + assertEquals(part1, reader.next().utf8ToString()); + assertEquals(part2, reader.next().utf8ToString()); + assertEquals(part3, reader.next().utf8ToString()); + assertFalse(reader.hasNext()); + } + + public void testReadUntilDelimiter() throws IOException { + for (int run = 0; run < 100; run++) { + var delimitedContent = DelimitedContent.randomContent(); + var inputStream = delimitedContent.toBytesReference().streamInput(); + var readBytes = MultipartUpload.readUntilDelimiter(inputStream, delimitedContent.delimiter); + assertEquals(new BytesArray(delimitedContent.before), readBytes); + var readRemaining = inputStream.readAllBytes(); + assertArrayEquals(delimitedContent.after, readRemaining); + } + } + + public void testSkipUntilDelimiter() throws IOException { + for (int run = 0; run < 100; run++) { + var delimitedContent = DelimitedContent.randomContent(); + var inputStream = delimitedContent.toBytesReference().streamInput(); + MultipartUpload.skipUntilDelimiter(inputStream, delimitedContent.delimiter); + var readRemaining = inputStream.readAllBytes(); + assertArrayEquals(delimitedContent.after, readRemaining); + } + } + + record DelimitedContent(byte[] before, byte[] delimiter, byte[] after) { + + static DelimitedContent randomContent() { + var before = randomAlphanumericOfLength(between(0, 1024 * 1024)).getBytes(UTF_8); + var delimiter = randomByteArrayOfLength(between(1, 70)); + delimiter[0] = '\r'; // make it distinguishable from the initial bytes + var after = randomAlphanumericOfLength(between(0, 1024 * 1024)).getBytes(UTF_8); + return new DelimitedContent(before, delimiter, after); + } + + BytesReference toBytesReference() { + return CompositeBytesReference.of(new BytesArray(before), new BytesArray(delimiter), new BytesArray(after)); + } + } + +}