| 
9 | 9 | 
 
  | 
10 | 10 | package org.elasticsearch.repositories.s3;  | 
11 | 11 | 
 
  | 
12 |  | -import com.amazonaws.services.s3.AmazonS3;  | 
13 |  | -import com.amazonaws.services.s3.model.AmazonS3Exception;  | 
14 |  | -import com.amazonaws.services.s3.model.GetObjectRequest;  | 
15 |  | -import com.amazonaws.services.s3.model.S3Object;  | 
16 |  | -import com.amazonaws.services.s3.model.S3ObjectInputStream;  | 
 | 12 | +import software.amazon.awssdk.core.ResponseInputStream;  | 
 | 13 | +import software.amazon.awssdk.http.AbortableInputStream;  | 
 | 14 | +import software.amazon.awssdk.services.s3.model.GetObjectRequest;  | 
17 | 15 | 
 
  | 
18 |  | -import org.apache.http.client.methods.HttpGet;  | 
19 | 16 | import org.elasticsearch.common.io.Streams;  | 
20 | 17 | import org.elasticsearch.core.Nullable;  | 
21 | 18 | import org.elasticsearch.repositories.blobstore.RequestedRangeNotSatisfiedException;  | 
22 | 19 | import org.elasticsearch.rest.RestStatus;  | 
23 | 20 | import org.elasticsearch.test.ESTestCase;  | 
24 | 21 | 
 
  | 
 | 22 | +import software.amazon.awssdk.services.s3.S3Client;  | 
 | 23 | +import software.amazon.awssdk.services.s3.model.GetObjectResponse;  | 
 | 24 | +import software.amazon.awssdk.services.s3.model.S3Exception;  | 
 | 25 | + | 
25 | 26 | import java.io.ByteArrayInputStream;  | 
26 | 27 | import java.io.IOException;  | 
27 | 28 | import java.util.Arrays;  | 
@@ -115,34 +116,51 @@ public void testReadAfterBlobLengthThrowsRequestedRangeNotSatisfiedException() t  | 
115 | 116 |         }  | 
116 | 117 |     }  | 
117 | 118 | 
 
  | 
 | 119 | +    /**  | 
 | 120 | +     * Creates a mock BlobStore that returns a mock S3Client, configured to supply a #getObject response. The blob store is then wrapped in  | 
 | 121 | +     * a {@link S3RetryingInputStream}.  | 
 | 122 | +     *  | 
 | 123 | +     * @param data The data to stream.  | 
 | 124 | +     * @param position The position at which to start reading from the stream.  | 
 | 125 | +     * @param length How much to read from the data stream starting at {@code position}  | 
 | 126 | +     * @return A {@link S3RetryingInputStream} that reads from the data stream.  | 
 | 127 | +     * @throws IOException  | 
 | 128 | +     */  | 
118 | 129 |     private S3RetryingInputStream createInputStream(final byte[] data, @Nullable final Integer position, @Nullable final Integer length)  | 
119 | 130 |         throws IOException {  | 
120 |  | -        final AmazonS3 client = mock(AmazonS3.class);  | 
 | 131 | +        final S3Client client = mock(S3Client.class);  | 
121 | 132 |         final AmazonS3Reference clientReference = mock(AmazonS3Reference.class);  | 
122 | 133 |         when(clientReference.client()).thenReturn(client);  | 
123 | 134 |         final S3BlobStore blobStore = mock(S3BlobStore.class);  | 
124 | 135 |         when(blobStore.clientReference()).thenReturn(clientReference);  | 
125 | 136 | 
 
  | 
126 | 137 |         if (position != null && length != null) {  | 
127 | 138 |             if (data.length <= position) {  | 
128 |  | -                var amazonS3Exception = new AmazonS3Exception("test");  | 
129 |  | -                amazonS3Exception.setStatusCode(RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus());  | 
130 |  | -                when(client.getObject(any(GetObjectRequest.class))).thenThrow(amazonS3Exception);  | 
 | 139 | +                var s3Exception = S3Exception.builder().message("test");  | 
 | 140 | +                s3Exception.statusCode(RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus());  | 
 | 141 | +                when(client.getObject(any(GetObjectRequest.class))).thenThrow(s3Exception.build());  | 
131 | 142 |                 return new S3RetryingInputStream(randomPurpose(), blobStore, "_blob", position, Math.addExact(position, length - 1));  | 
132 | 143 |             }  | 
133 | 144 | 
 
  | 
134 |  | -            final S3Object s3Object = new S3Object();  | 
135 |  | -            s3Object.getObjectMetadata().setContentLength(length);  | 
136 |  | -            s3Object.setObjectContent(new S3ObjectInputStream(new ByteArrayInputStream(data, position, length), new HttpGet()));  | 
137 |  | -            when(client.getObject(any(GetObjectRequest.class))).thenReturn(s3Object);  | 
 | 145 | +            // NOMERGE: I think blobStore.getMetricPublisher(operation, purpose) needs to be defined, to fix the NPE. I didn't get that far.  | 
 | 146 | +            // TODO NOMERGE: revisit AbortableInputStream, I just threw it on to see if that fixed the NPE.  | 
 | 147 | +            ResponseInputStream<GetObjectResponse> objectResponse =  | 
 | 148 | +                new ResponseInputStream<>(  | 
 | 149 | +                    GetObjectResponse.builder().build(),//.contentLength(length.longValue()).build(),  | 
 | 150 | +                    AbortableInputStream.create(new ByteArrayInputStream(data, position, length))  | 
 | 151 | +                );  | 
 | 152 | +            when(client.getObject(any(GetObjectRequest.class))).thenReturn(objectResponse);  | 
138 | 153 |             return new S3RetryingInputStream(randomPurpose(), blobStore, "_blob", position, Math.addExact(position, length - 1));  | 
139 | 154 |         }  | 
140 | 155 | 
 
  | 
141 |  | -        final S3Object s3Object = new S3Object();  | 
142 |  | -        s3Object.getObjectMetadata().setContentLength(data.length);  | 
143 |  | -        s3Object.setObjectContent(new S3ObjectInputStream(new ByteArrayInputStream(data), new HttpGet()));  | 
144 |  | -        when(client.getObject(any(GetObjectRequest.class))).thenReturn(s3Object);  | 
 | 156 | +        // NOMERGE: I think blobStore.getMetricPublisher(operation, purpose) needs to be defined, to fix the NPE. I didn't get that far.  | 
 | 157 | +        // TODO NOMERGE: revisit AbortableInputStream, I just threw it on to see if that fixed the NPE.  | 
 | 158 | +        ResponseInputStream<GetObjectResponse> objectResponse =  | 
 | 159 | +            new ResponseInputStream<>(  | 
 | 160 | +                GetObjectResponse.builder().build(),//.contentLength(Long.valueOf(data.length)).build(),  | 
 | 161 | +                AbortableInputStream.create(new ByteArrayInputStream(data))  | 
 | 162 | +            );  | 
 | 163 | +        when(client.getObject(any(GetObjectRequest.class))).thenReturn(objectResponse);  | 
145 | 164 |         return new S3RetryingInputStream(randomPurpose(), blobStore, "_blob");  | 
146 | 165 |     }  | 
147 | 166 | }  | 
148 |  | -// TODO NOMERGE bring these tests back  | 
 | 
0 commit comments