@@ -257,6 +257,10 @@ protected long getRetryDelayInMillis() {
257257 };
258258 }
259259
260+ private static S3HttpHandler .S3Request parseRequest (HttpExchange exchange ) {
261+ return new S3HttpHandler ("bucket" ).parseRequest (exchange );
262+ }
263+
260264 public void testWriteBlobWithRetries () throws Exception {
261265 final int maxRetries = randomInt (5 );
262266 final CountDown countDown = new CountDown (maxRetries + 1 );
@@ -265,10 +269,8 @@ public void testWriteBlobWithRetries() throws Exception {
265269
266270 final byte [] bytes = randomBlobContent ();
267271 httpServer .createContext (downloadStorageEndpoint (blobContainer , "write_blob_max_retries" ), exchange -> {
268- final S3HttpHandler .RequestComponents requestComponents = S3HttpHandler .parseRequestComponents (
269- S3HttpHandler .getRawRequestString (exchange )
270- );
271- if ("PUT" .equals (requestComponents .method ()) && requestComponents .query ().isEmpty ()) {
272+ final S3HttpHandler .S3Request s3Request = parseRequest (exchange );
273+ if (s3Request .isPutObjectRequest ()) {
272274 if (countDown .countDown ()) {
273275 final BytesReference body = Streams .readFully (exchange .getRequestBody ());
274276 if (Objects .deepEquals (bytes , BytesReference .toBytes (body ))) {
@@ -353,8 +355,8 @@ public void testWriteBlobWithExceptionThrownAtClosingTime() throws Exception {
353355
354356 var uploadedBytes = new AtomicReference <BytesReference >();
355357 httpServer .createContext (downloadStorageEndpoint (blobContainer , blobName ), exchange -> {
356- var requestComponents = S3HttpHandler . parseRequestComponents ( S3HttpHandler . getRawRequestString ( exchange ) );
357- if ("PUT" . equals ( requestComponents .method ()) && requestComponents . query (). isEmpty ()) {
358+ var requestComponents = parseRequest ( exchange );
359+ if (requestComponents .isPutObjectRequest ()) {
358360 var body = Streams .readFully (exchange .getRequestBody ());
359361 if (uploadedBytes .compareAndSet (null , body )) {
360362 exchange .sendResponseHeaders (HttpStatus .SC_OK , -1 );
@@ -408,12 +410,10 @@ public void testWriteLargeBlob() throws Exception {
408410 final CountDown countDownComplete = new CountDown (nbErrors );
409411
410412 httpServer .createContext (downloadStorageEndpoint (blobContainer , "write_large_blob" ), exchange -> {
411- final S3HttpHandler .RequestComponents requestComponents = S3HttpHandler .parseRequestComponents (
412- S3HttpHandler .getRawRequestString (exchange )
413- );
413+ final S3HttpHandler .S3Request s3Request = parseRequest (exchange );
414414 final long contentLength = Long .parseLong (exchange .getRequestHeaders ().getFirst ("Content-Length" ));
415415
416- if ("POST" . equals ( requestComponents . method ()) && requestComponents . query (). equals ( "uploads" )) {
416+ if (s3Request . isInitiateMultipartUploadRequest ( )) {
417417 // initiate multipart upload request
418418 if (countDownInitiate .countDown ()) {
419419 byte [] response = ("""
@@ -429,39 +429,37 @@ public void testWriteLargeBlob() throws Exception {
429429 exchange .close ();
430430 return ;
431431 }
432- } else if ("PUT" .equals (requestComponents .method ())
433- && requestComponents .query ().contains ("uploadId=TEST" )
434- && requestComponents .query ().contains ("partNumber=" )) {
435- // upload part request
436- MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream (exchange .getRequestBody ());
437- BytesReference bytes = Streams .readFully (md5 );
438- assertThat ((long ) bytes .length (), anyOf (equalTo (lastPartSize ), equalTo (bufferSize .getBytes ())));
439- assertThat (contentLength , anyOf (equalTo (lastPartSize ), equalTo (bufferSize .getBytes ())));
440-
441- if (countDownUploads .decrementAndGet () % 2 == 0 ) {
442- exchange .getResponseHeaders ().add ("ETag" , Base16 .encodeAsString (md5 .getMd5Digest ()));
443- exchange .sendResponseHeaders (HttpStatus .SC_OK , -1 );
444- exchange .close ();
445- return ;
446- }
432+ } else if (s3Request .isUploadPartRequest ()) {
433+ // upload part request
434+ MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream (exchange .getRequestBody ());
435+ BytesReference bytes = Streams .readFully (md5 );
436+ assertThat ((long ) bytes .length (), anyOf (equalTo (lastPartSize ), equalTo (bufferSize .getBytes ())));
437+ assertThat (contentLength , anyOf (equalTo (lastPartSize ), equalTo (bufferSize .getBytes ())));
438+
439+ if (countDownUploads .decrementAndGet () % 2 == 0 ) {
440+ exchange .getResponseHeaders ().add ("ETag" , Base16 .encodeAsString (md5 .getMd5Digest ()));
441+ exchange .sendResponseHeaders (HttpStatus .SC_OK , -1 );
442+ exchange .close ();
443+ return ;
444+ }
447445
448- } else if ("POST" .equals (requestComponents .method ()) && requestComponents .query ().equals ("uploadId=TEST" )) {
449- // complete multipart upload request
450- if (countDownComplete .countDown ()) {
451- Streams .readFully (exchange .getRequestBody ());
452- byte [] response = ("""
453- <?xml version="1.0" encoding="UTF-8"?>
454- <CompleteMultipartUploadResult>
455- <Bucket>bucket</Bucket>
456- <Key>write_large_blob</Key>
457- </CompleteMultipartUploadResult>""" ).getBytes (StandardCharsets .UTF_8 );
458- exchange .getResponseHeaders ().add ("Content-Type" , "application/xml" );
459- exchange .sendResponseHeaders (HttpStatus .SC_OK , response .length );
460- exchange .getResponseBody ().write (response );
461- exchange .close ();
462- return ;
463- }
446+ } else if (s3Request .isCompleteMultipartUploadRequest ()) {
447+ // complete multipart upload request
448+ if (countDownComplete .countDown ()) {
449+ Streams .readFully (exchange .getRequestBody ());
450+ byte [] response = ("""
451+ <?xml version="1.0" encoding="UTF-8"?>
452+ <CompleteMultipartUploadResult>
453+ <Bucket>bucket</Bucket>
454+ <Key>write_large_blob</Key>
455+ </CompleteMultipartUploadResult>""" ).getBytes (StandardCharsets .UTF_8 );
456+ exchange .getResponseHeaders ().add ("Content-Type" , "application/xml" );
457+ exchange .sendResponseHeaders (HttpStatus .SC_OK , response .length );
458+ exchange .getResponseBody ().write (response );
459+ exchange .close ();
460+ return ;
464461 }
462+ }
465463
466464 // sends an error back or let the request time out
467465 if (useTimeout == false ) {
@@ -510,12 +508,10 @@ public void testWriteLargeBlobStreaming() throws Exception {
510508 final CountDown countDownComplete = new CountDown (nbErrors );
511509
512510 httpServer .createContext (downloadStorageEndpoint (blobContainer , "write_large_blob_streaming" ), exchange -> {
513- final S3HttpHandler .RequestComponents requestComponents = S3HttpHandler .parseRequestComponents (
514- S3HttpHandler .getRawRequestString (exchange )
515- );
511+ final S3HttpHandler .S3Request s3Request = parseRequest (exchange );
516512 final long contentLength = Long .parseLong (exchange .getRequestHeaders ().getFirst ("Content-Length" ));
517513
518- if ("POST" . equals ( requestComponents . method ()) && requestComponents . query (). equals ( "uploads" )) {
514+ if (s3Request . isInitiateMultipartUploadRequest ( )) {
519515 // initiate multipart upload request
520516 if (countDownInitiate .countDown ()) {
521517 byte [] response = ("""
@@ -531,38 +527,36 @@ public void testWriteLargeBlobStreaming() throws Exception {
531527 exchange .close ();
532528 return ;
533529 }
534- } else if ("PUT" .equals (requestComponents .method ())
535- && requestComponents .query ().contains ("uploadId=TEST" )
536- && requestComponents .query ().contains ("partNumber=" )) {
537- // upload part request
538- MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream (exchange .getRequestBody ());
539- BytesReference bytes = Streams .readFully (md5 );
540-
541- if (counterUploads .incrementAndGet () % 2 == 0 ) {
542- bytesReceived .addAndGet (bytes .length ());
543- exchange .getResponseHeaders ().add ("ETag" , Base16 .encodeAsString (md5 .getMd5Digest ()));
544- exchange .sendResponseHeaders (HttpStatus .SC_OK , -1 );
545- exchange .close ();
546- return ;
547- }
530+ } else if (s3Request .isUploadPartRequest ()) {
531+ // upload part request
532+ MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream (exchange .getRequestBody ());
533+ BytesReference bytes = Streams .readFully (md5 );
534+
535+ if (counterUploads .incrementAndGet () % 2 == 0 ) {
536+ bytesReceived .addAndGet (bytes .length ());
537+ exchange .getResponseHeaders ().add ("ETag" , Base16 .encodeAsString (md5 .getMd5Digest ()));
538+ exchange .sendResponseHeaders (HttpStatus .SC_OK , -1 );
539+ exchange .close ();
540+ return ;
541+ }
548542
549- } else if ("POST" .equals (requestComponents .method ()) && requestComponents .query ().equals ("uploadId=TEST" )) {
550- // complete multipart upload request
551- if (countDownComplete .countDown ()) {
552- Streams .readFully (exchange .getRequestBody ());
553- byte [] response = ("""
554- <?xml version="1.0" encoding="UTF-8"?>
555- <CompleteMultipartUploadResult>
556- <Bucket>bucket</Bucket>
557- <Key>write_large_blob_streaming</Key>
558- </CompleteMultipartUploadResult>""" ).getBytes (StandardCharsets .UTF_8 );
559- exchange .getResponseHeaders ().add ("Content-Type" , "application/xml" );
560- exchange .sendResponseHeaders (HttpStatus .SC_OK , response .length );
561- exchange .getResponseBody ().write (response );
562- exchange .close ();
563- return ;
564- }
543+ } else if (s3Request .isCompleteMultipartUploadRequest ()) {
544+ // complete multipart upload request
545+ if (countDownComplete .countDown ()) {
546+ Streams .readFully (exchange .getRequestBody ());
547+ byte [] response = ("""
548+ <?xml version="1.0" encoding="UTF-8"?>
549+ <CompleteMultipartUploadResult>
550+ <Bucket>bucket</Bucket>
551+ <Key>write_large_blob_streaming</Key>
552+ </CompleteMultipartUploadResult>""" ).getBytes (StandardCharsets .UTF_8 );
553+ exchange .getResponseHeaders ().add ("Content-Type" , "application/xml" );
554+ exchange .sendResponseHeaders (HttpStatus .SC_OK , response .length );
555+ exchange .getResponseBody ().write (response );
556+ exchange .close ();
557+ return ;
565558 }
559+ }
566560
567561 // sends an error back or let the request time out
568562 if (useTimeout == false ) {
0 commit comments