Skip to content

Commit 673b58b

Browse files
DaveCTurnergeorgewallace
authored andcommitted
Reduce scope of AmazonS3Reference (elastic#114989)
It's possible that the client config, particularly its credentials, might change in the middle of a long-running operation such as a large multipart upload. Prior to this commit we would hold onto the same `AmazonS3` instance for the entire operation, but really there's no need to do so, we can obtain a potentially-fresher instance for each API call.
1 parent 78fcec0 commit 673b58b

File tree

2 files changed

+108
-85
lines changed

2 files changed

+108
-85
lines changed

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 83 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -160,75 +160,77 @@ public void writeMetadataBlob(
160160
) throws IOException {
161161
assert purpose != OperationPurpose.SNAPSHOT_DATA && BlobContainer.assertPurposeConsistency(purpose, blobName) : purpose;
162162
final String absoluteBlobKey = buildKey(blobName);
163-
try (
164-
AmazonS3Reference clientReference = blobStore.clientReference();
165-
ChunkedBlobOutputStream<PartETag> out = new ChunkedBlobOutputStream<>(blobStore.bigArrays(), blobStore.bufferSizeInBytes()) {
163+
try (ChunkedBlobOutputStream<PartETag> out = new ChunkedBlobOutputStream<>(blobStore.bigArrays(), blobStore.bufferSizeInBytes()) {
166164

167-
private final SetOnce<String> uploadId = new SetOnce<>();
165+
private final SetOnce<String> uploadId = new SetOnce<>();
168166

169-
@Override
170-
protected void flushBuffer() throws IOException {
171-
flushBuffer(false);
172-
}
167+
@Override
168+
protected void flushBuffer() throws IOException {
169+
flushBuffer(false);
170+
}
173171

174-
private void flushBuffer(boolean lastPart) throws IOException {
175-
if (buffer.size() == 0) {
176-
return;
177-
}
178-
if (flushedBytes == 0L) {
179-
assert lastPart == false : "use single part upload if there's only a single part";
172+
private void flushBuffer(boolean lastPart) throws IOException {
173+
if (buffer.size() == 0) {
174+
return;
175+
}
176+
if (flushedBytes == 0L) {
177+
assert lastPart == false : "use single part upload if there's only a single part";
178+
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
180179
uploadId.set(
181180
SocketAccess.doPrivileged(
182181
() -> clientReference.client()
183182
.initiateMultipartUpload(initiateMultiPartUpload(purpose, absoluteBlobKey))
184183
.getUploadId()
185184
)
186185
);
187-
if (Strings.isEmpty(uploadId.get())) {
188-
throw new IOException("Failed to initialize multipart upload " + absoluteBlobKey);
189-
}
190186
}
191-
assert lastPart == false || successful : "must only write last part if successful";
192-
final UploadPartRequest uploadRequest = createPartUploadRequest(
193-
purpose,
194-
buffer.bytes().streamInput(),
195-
uploadId.get(),
196-
parts.size() + 1,
197-
absoluteBlobKey,
198-
buffer.size(),
199-
lastPart
200-
);
201-
final UploadPartResult uploadResponse = SocketAccess.doPrivileged(
202-
() -> clientReference.client().uploadPart(uploadRequest)
203-
);
204-
finishPart(uploadResponse.getPartETag());
187+
if (Strings.isEmpty(uploadId.get())) {
188+
throw new IOException("Failed to initialize multipart upload " + absoluteBlobKey);
189+
}
205190
}
191+
assert lastPart == false || successful : "must only write last part if successful";
192+
final UploadPartRequest uploadRequest = createPartUploadRequest(
193+
purpose,
194+
buffer.bytes().streamInput(),
195+
uploadId.get(),
196+
parts.size() + 1,
197+
absoluteBlobKey,
198+
buffer.size(),
199+
lastPart
200+
);
201+
final UploadPartResult uploadResponse;
202+
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
203+
uploadResponse = SocketAccess.doPrivileged(() -> clientReference.client().uploadPart(uploadRequest));
204+
}
205+
finishPart(uploadResponse.getPartETag());
206+
}
206207

207-
@Override
208-
protected void onCompletion() throws IOException {
209-
if (flushedBytes == 0L) {
210-
writeBlob(purpose, blobName, buffer.bytes(), failIfAlreadyExists);
211-
} else {
212-
flushBuffer(true);
213-
final CompleteMultipartUploadRequest complRequest = new CompleteMultipartUploadRequest(
214-
blobStore.bucket(),
215-
absoluteBlobKey,
216-
uploadId.get(),
217-
parts
218-
);
219-
S3BlobStore.configureRequestForMetrics(complRequest, blobStore, Operation.PUT_MULTIPART_OBJECT, purpose);
208+
@Override
209+
protected void onCompletion() throws IOException {
210+
if (flushedBytes == 0L) {
211+
writeBlob(purpose, blobName, buffer.bytes(), failIfAlreadyExists);
212+
} else {
213+
flushBuffer(true);
214+
final CompleteMultipartUploadRequest complRequest = new CompleteMultipartUploadRequest(
215+
blobStore.bucket(),
216+
absoluteBlobKey,
217+
uploadId.get(),
218+
parts
219+
);
220+
S3BlobStore.configureRequestForMetrics(complRequest, blobStore, Operation.PUT_MULTIPART_OBJECT, purpose);
221+
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
220222
SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest));
221223
}
222224
}
225+
}
223226

224-
@Override
225-
protected void onFailure() {
226-
if (Strings.hasText(uploadId.get())) {
227-
abortMultiPartUpload(purpose, uploadId.get(), absoluteBlobKey);
228-
}
227+
@Override
228+
protected void onFailure() {
229+
if (Strings.hasText(uploadId.get())) {
230+
abortMultiPartUpload(purpose, uploadId.get(), absoluteBlobKey);
229231
}
230232
}
231-
) {
233+
}) {
232234
writer.accept(out);
233235
out.markSuccess();
234236
}
@@ -360,12 +362,9 @@ public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<St
360362

361363
@Override
362364
public Map<String, BlobMetadata> listBlobsByPrefix(OperationPurpose purpose, @Nullable String blobNamePrefix) throws IOException {
363-
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
364-
return executeListing(
365-
purpose,
366-
clientReference,
367-
listObjectsRequest(purpose, blobNamePrefix == null ? keyPath : buildKey(blobNamePrefix))
368-
).stream()
365+
try {
366+
return executeListing(purpose, listObjectsRequest(purpose, blobNamePrefix == null ? keyPath : buildKey(blobNamePrefix)))
367+
.stream()
369368
.flatMap(listing -> listing.getObjectSummaries().stream())
370369
.map(summary -> new BlobMetadata(summary.getKey().substring(keyPath.length()), summary.getSize()))
371370
.collect(Collectors.toMap(BlobMetadata::name, Function.identity()));
@@ -381,8 +380,8 @@ public Map<String, BlobMetadata> listBlobs(OperationPurpose purpose) throws IOEx
381380

382381
@Override
383382
public Map<String, BlobContainer> children(OperationPurpose purpose) throws IOException {
384-
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
385-
return executeListing(purpose, clientReference, listObjectsRequest(purpose, keyPath)).stream().flatMap(listing -> {
383+
try {
384+
return executeListing(purpose, listObjectsRequest(purpose, keyPath)).stream().flatMap(listing -> {
386385
assert listing.getObjectSummaries().stream().noneMatch(s -> {
387386
for (String commonPrefix : listing.getCommonPrefixes()) {
388387
if (s.getKey().substring(keyPath.length()).startsWith(commonPrefix)) {
@@ -403,21 +402,19 @@ public Map<String, BlobContainer> children(OperationPurpose purpose) throws IOEx
403402
}
404403
}
405404

406-
private List<ObjectListing> executeListing(
407-
OperationPurpose purpose,
408-
AmazonS3Reference clientReference,
409-
ListObjectsRequest listObjectsRequest
410-
) {
405+
private List<ObjectListing> executeListing(OperationPurpose purpose, ListObjectsRequest listObjectsRequest) {
411406
final List<ObjectListing> results = new ArrayList<>();
412407
ObjectListing prevListing = null;
413408
while (true) {
414409
ObjectListing list;
415-
if (prevListing != null) {
416-
final var listNextBatchOfObjectsRequest = new ListNextBatchOfObjectsRequest(prevListing);
417-
S3BlobStore.configureRequestForMetrics(listNextBatchOfObjectsRequest, blobStore, Operation.LIST_OBJECTS, purpose);
418-
list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(listNextBatchOfObjectsRequest));
419-
} else {
420-
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
410+
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
411+
if (prevListing != null) {
412+
final var listNextBatchOfObjectsRequest = new ListNextBatchOfObjectsRequest(prevListing);
413+
S3BlobStore.configureRequestForMetrics(listNextBatchOfObjectsRequest, blobStore, Operation.LIST_OBJECTS, purpose);
414+
list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(listNextBatchOfObjectsRequest));
415+
} else {
416+
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
417+
}
421418
}
422419
results.add(list);
423420
if (list.isTruncated()) {
@@ -504,13 +501,14 @@ void executeMultipartUpload(
504501
final SetOnce<String> uploadId = new SetOnce<>();
505502
final String bucketName = s3BlobStore.bucket();
506503
boolean success = false;
507-
try (AmazonS3Reference clientReference = s3BlobStore.clientReference()) {
508-
509-
uploadId.set(
510-
SocketAccess.doPrivileged(
511-
() -> clientReference.client().initiateMultipartUpload(initiateMultiPartUpload(purpose, blobName)).getUploadId()
512-
)
513-
);
504+
try {
505+
try (AmazonS3Reference clientReference = s3BlobStore.clientReference()) {
506+
uploadId.set(
507+
SocketAccess.doPrivileged(
508+
() -> clientReference.client().initiateMultipartUpload(initiateMultiPartUpload(purpose, blobName)).getUploadId()
509+
)
510+
);
511+
}
514512
if (Strings.isEmpty(uploadId.get())) {
515513
throw new IOException("Failed to initialize multipart upload " + blobName);
516514
}
@@ -531,8 +529,12 @@ void executeMultipartUpload(
531529
);
532530
bytesCount += uploadRequest.getPartSize();
533531

534-
final UploadPartResult uploadResponse = SocketAccess.doPrivileged(() -> clientReference.client().uploadPart(uploadRequest));
535-
parts.add(uploadResponse.getPartETag());
532+
try (AmazonS3Reference clientReference = s3BlobStore.clientReference()) {
533+
final UploadPartResult uploadResponse = SocketAccess.doPrivileged(
534+
() -> clientReference.client().uploadPart(uploadRequest)
535+
);
536+
parts.add(uploadResponse.getPartETag());
537+
}
536538
}
537539

538540
if (bytesCount != blobSize) {
@@ -548,7 +550,9 @@ void executeMultipartUpload(
548550
parts
549551
);
550552
S3BlobStore.configureRequestForMetrics(complRequest, blobStore, Operation.PUT_MULTIPART_OBJECT, purpose);
551-
SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest));
553+
try (AmazonS3Reference clientReference = s3BlobStore.clientReference()) {
554+
SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest));
555+
}
552556
success = true;
553557

554558
} catch (final AmazonClientException e) {

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,7 @@ public void testExecuteSingleUpload() throws IOException {
114114
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
115115
}
116116

117-
final AmazonS3 client = mock(AmazonS3.class);
118-
final AmazonS3Reference clientReference = new AmazonS3Reference(client);
119-
when(blobStore.clientReference()).thenReturn(clientReference);
117+
final AmazonS3 client = configureMockClient(blobStore);
120118

121119
final ArgumentCaptor<PutObjectRequest> argumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
122120
when(client.putObject(argumentCaptor.capture())).thenReturn(new PutObjectResult());
@@ -187,9 +185,7 @@ public void testExecuteMultipartUpload() throws IOException {
187185
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
188186
}
189187

190-
final AmazonS3 client = mock(AmazonS3.class);
191-
final AmazonS3Reference clientReference = new AmazonS3Reference(client);
192-
when(blobStore.clientReference()).thenReturn(clientReference);
188+
final AmazonS3 client = configureMockClient(blobStore);
193189

194190
final ArgumentCaptor<InitiateMultipartUploadRequest> initArgCaptor = ArgumentCaptor.forClass(InitiateMultipartUploadRequest.class);
195191
final InitiateMultipartUploadResult initResult = new InitiateMultipartUploadResult();
@@ -260,6 +256,8 @@ public void testExecuteMultipartUpload() throws IOException {
260256

261257
final List<String> actualETags = compRequest.getPartETags().stream().map(PartETag::getETag).collect(Collectors.toList());
262258
assertEquals(expectedEtags, actualETags);
259+
260+
closeMockClient(blobStore);
263261
}
264262

265263
public void testExecuteMultipartUploadAborted() {
@@ -356,6 +354,27 @@ public void testExecuteMultipartUploadAborted() {
356354
assertEquals(blobName, abortRequest.getKey());
357355
assertEquals(uploadId, abortRequest.getUploadId());
358356
}
357+
358+
closeMockClient(blobStore);
359+
}
360+
361+
private static AmazonS3 configureMockClient(S3BlobStore blobStore) {
362+
final AmazonS3 client = mock(AmazonS3.class);
363+
try (AmazonS3Reference clientReference = new AmazonS3Reference(client)) {
364+
clientReference.mustIncRef(); // held by the mock, ultimately released in closeMockClient
365+
when(blobStore.clientReference()).then(invocation -> {
366+
clientReference.mustIncRef();
367+
return clientReference;
368+
});
369+
}
370+
return client;
371+
}
372+
373+
private static void closeMockClient(S3BlobStore blobStore) {
374+
final var finalClientReference = blobStore.clientReference();
375+
assertFalse(finalClientReference.decRef());
376+
assertTrue(finalClientReference.decRef());
377+
assertFalse(finalClientReference.hasReferences());
359378
}
360379

361380
public void testNumberOfMultipartsWithZeroPartSize() {

0 commit comments

Comments
 (0)