|
18 | 18 | import org.elasticsearch.common.io.Streams; |
19 | 19 | import org.elasticsearch.common.regex.Regex; |
20 | 20 | import org.elasticsearch.core.SuppressForbidden; |
21 | | -import org.elasticsearch.core.Tuple; |
22 | 21 | import org.elasticsearch.rest.RestStatus; |
23 | 22 | import org.elasticsearch.rest.RestUtils; |
24 | 23 | import org.elasticsearch.test.fixture.HttpHeaderParser; |
|
27 | 26 | import org.elasticsearch.xcontent.XContentFactory; |
28 | 27 | import org.elasticsearch.xcontent.XContentType; |
29 | 28 |
|
30 | | -import java.io.BufferedReader; |
31 | 29 | import java.io.IOException; |
32 | | -import java.io.InputStream; |
33 | | -import java.io.InputStreamReader; |
34 | 30 | import java.net.URLDecoder; |
35 | 31 | import java.util.HashMap; |
36 | 32 | import java.util.Locale; |
37 | 33 | import java.util.Map; |
38 | 34 | import java.util.Objects; |
39 | | -import java.util.Optional; |
40 | 35 | import java.util.concurrent.atomic.AtomicInteger; |
41 | | -import java.util.regex.Matcher; |
42 | | -import java.util.regex.Pattern; |
43 | 36 | import java.util.stream.Collectors; |
44 | | -import java.util.zip.GZIPInputStream; |
45 | 37 |
|
46 | 38 | import static fixture.gcs.MockGcsBlobStore.failAndThrow; |
47 | 39 | import static java.nio.charset.StandardCharsets.UTF_8; |
48 | | -import static java.util.stream.Collectors.joining; |
49 | | -import static org.elasticsearch.core.Strings.format; |
50 | 40 |
|
51 | 41 | /** |
52 | 42 | * Minimal HTTP handler that acts as a Google Cloud Storage compliant server |
@@ -183,26 +173,18 @@ public void handle(final HttpExchange exchange) throws IOException { |
183 | 173 | exchange.getResponseBody().write(response); |
184 | 174 |
|
185 | 175 | } else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=multipart*", request)) { |
186 | | - // Multipart upload |
187 | | - Optional<Tuple<String, BytesReference>> content = parseMultipartRequestBody(requestBody.streamInput()); |
188 | | - if (content.isPresent()) { |
| 176 | + try { |
| 177 | + final var multipartUpload = MultipartUpload.parseBody(exchange, requestBody.streamInput()); |
189 | 178 | final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH); |
190 | 179 | final MockGcsBlobStore.BlobVersion newBlobVersion = mockGcsBlobStore.updateBlob( |
191 | | - content.get().v1(), |
| 180 | + multipartUpload.name(), |
192 | 181 | ifGenerationMatch, |
193 | | - content.get().v2() |
| 182 | + multipartUpload.content() |
194 | 183 | ); |
195 | 184 | writeBlobVersionAsJson(exchange, newBlobVersion); |
196 | | - } else { |
197 | | - throw new AssertionError( |
198 | | - "Could not read multi-part request to [" |
199 | | - + request |
200 | | - + "] with headers [" |
201 | | - + new HashMap<>(exchange.getRequestHeaders()) |
202 | | - + "]" |
203 | | - ); |
| 185 | + } catch (IllegalArgumentException e) { |
| 186 | + throw new AssertionError(e); |
204 | 187 | } |
205 | | - |
206 | 188 | } else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=resumable*", request)) { |
207 | 189 | // Resumable upload initialization https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload |
208 | 190 | final Map<String, String> params = new HashMap<>(); |
@@ -328,81 +310,6 @@ private static String httpServerUrl(final HttpExchange exchange) { |
328 | 310 | return "http://" + exchange.getRequestHeaders().get("HOST").get(0); |
329 | 311 | } |
330 | 312 |
|
331 | | - private static final Pattern NAME_PATTERN = Pattern.compile("\"name\":\"([^\"]*)\""); |
332 | | - |
333 | | - public static Optional<Tuple<String, BytesReference>> parseMultipartRequestBody(final InputStream requestBody) throws IOException { |
334 | | - Tuple<String, BytesReference> content = null; |
335 | | - final BytesReference fullRequestBody; |
336 | | - try (InputStream in = new GZIPInputStream(requestBody)) { |
337 | | - fullRequestBody = Streams.readFully(in); |
338 | | - } |
339 | | - String name = null; |
340 | | - boolean skippedEmptyLine = false; |
341 | | - int startPos = 0; |
342 | | - int endPos = 0; |
343 | | - while (startPos < fullRequestBody.length()) { |
344 | | - do { |
345 | | - endPos = fullRequestBody.indexOf((byte) '\r', endPos + 1); |
346 | | - } while (endPos >= 0 && fullRequestBody.get(endPos + 1) != '\n'); |
347 | | - boolean markAndContinue = false; |
348 | | - final String bucketPrefix = "{\"bucket\":"; |
349 | | - if (startPos > 0) { |
350 | | - startPos += 2; |
351 | | - } |
352 | | - if (name == null || skippedEmptyLine == false) { |
353 | | - if ((skippedEmptyLine == false && endPos == startPos) |
354 | | - || (fullRequestBody.get(startPos) == '-' && fullRequestBody.get(startPos + 1) == '-')) { |
355 | | - markAndContinue = true; |
356 | | - } else { |
357 | | - final String start = fullRequestBody.slice(startPos, Math.min(endPos - startPos, bucketPrefix.length())).utf8ToString(); |
358 | | - if (start.toLowerCase(Locale.ROOT).startsWith("content")) { |
359 | | - markAndContinue = true; |
360 | | - } else if (start.startsWith(bucketPrefix)) { |
361 | | - markAndContinue = true; |
362 | | - final String line = fullRequestBody.slice( |
363 | | - startPos + bucketPrefix.length(), |
364 | | - endPos - startPos - bucketPrefix.length() |
365 | | - ).utf8ToString(); |
366 | | - Matcher matcher = NAME_PATTERN.matcher(line); |
367 | | - if (matcher.find()) { |
368 | | - name = matcher.group(1); |
369 | | - } |
370 | | - } |
371 | | - } |
372 | | - skippedEmptyLine = markAndContinue && endPos == startPos; |
373 | | - startPos = endPos; |
374 | | - } else { |
375 | | - while (isEndOfPart(fullRequestBody, endPos) == false) { |
376 | | - endPos = fullRequestBody.indexOf((byte) '\r', endPos + 1); |
377 | | - } |
378 | | - content = Tuple.tuple(name, fullRequestBody.slice(startPos, endPos - startPos)); |
379 | | - break; |
380 | | - } |
381 | | - } |
382 | | - if (content == null) { |
383 | | - final InputStream stream = fullRequestBody.streamInput(); |
384 | | - logger.warn( |
385 | | - () -> format( |
386 | | - "Failed to find multi-part upload in [%s]", |
387 | | - new BufferedReader(new InputStreamReader(stream)).lines().collect(joining("\n")) |
388 | | - ) |
389 | | - ); |
390 | | - } |
391 | | - return Optional.ofNullable(content); |
392 | | - } |
393 | | - |
394 | | - private static final byte[] END_OF_PARTS_MARKER = "\r\n--__END_OF_PART__".getBytes(UTF_8); |
395 | | - |
396 | | - private static boolean isEndOfPart(BytesReference fullRequestBody, int endPos) { |
397 | | - for (int i = 0; i < END_OF_PARTS_MARKER.length; i++) { |
398 | | - final byte b = END_OF_PARTS_MARKER[i]; |
399 | | - if (fullRequestBody.get(endPos + i) != b) { |
400 | | - return false; |
401 | | - } |
402 | | - } |
403 | | - return true; |
404 | | - } |
405 | | - |
406 | 313 | private static String requireHeader(HttpExchange exchange, String headerName) { |
407 | 314 | final String headerValue = exchange.getRequestHeaders().getFirst(headerName); |
408 | 315 | if (headerValue != null) { |
|
0 commit comments