@@ -160,75 +160,77 @@ public void writeMetadataBlob(
160160 ) throws IOException {
161161 assert purpose != OperationPurpose .SNAPSHOT_DATA && BlobContainer .assertPurposeConsistency (purpose , blobName ) : purpose ;
162162 final String absoluteBlobKey = buildKey (blobName );
163- try (
164- AmazonS3Reference clientReference = blobStore .clientReference ();
165- ChunkedBlobOutputStream <PartETag > out = new ChunkedBlobOutputStream <>(blobStore .bigArrays (), blobStore .bufferSizeInBytes ()) {
163+ try (ChunkedBlobOutputStream <PartETag > out = new ChunkedBlobOutputStream <>(blobStore .bigArrays (), blobStore .bufferSizeInBytes ()) {
166164
167- private final SetOnce <String > uploadId = new SetOnce <>();
165+ private final SetOnce <String > uploadId = new SetOnce <>();
168166
169- @ Override
170- protected void flushBuffer () throws IOException {
171- flushBuffer (false );
172- }
167+ @ Override
168+ protected void flushBuffer () throws IOException {
169+ flushBuffer (false );
170+ }
173171
174- private void flushBuffer (boolean lastPart ) throws IOException {
175- if (buffer .size () == 0 ) {
176- return ;
177- }
178- if (flushedBytes == 0L ) {
179- assert lastPart == false : "use single part upload if there's only a single part" ;
172+ private void flushBuffer (boolean lastPart ) throws IOException {
173+ if (buffer .size () == 0 ) {
174+ return ;
175+ }
176+ if (flushedBytes == 0L ) {
177+ assert lastPart == false : "use single part upload if there's only a single part" ;
178+ try (AmazonS3Reference clientReference = blobStore .clientReference ()) {
180179 uploadId .set (
181180 SocketAccess .doPrivileged (
182181 () -> clientReference .client ()
183182 .initiateMultipartUpload (initiateMultiPartUpload (purpose , absoluteBlobKey ))
184183 .getUploadId ()
185184 )
186185 );
187- if (Strings .isEmpty (uploadId .get ())) {
188- throw new IOException ("Failed to initialize multipart upload " + absoluteBlobKey );
189- }
190186 }
191- assert lastPart == false || successful : "must only write last part if successful" ;
192- final UploadPartRequest uploadRequest = createPartUploadRequest (
193- purpose ,
194- buffer .bytes ().streamInput (),
195- uploadId .get (),
196- parts .size () + 1 ,
197- absoluteBlobKey ,
198- buffer .size (),
199- lastPart
200- );
201- final UploadPartResult uploadResponse = SocketAccess .doPrivileged (
202- () -> clientReference .client ().uploadPart (uploadRequest )
203- );
204- finishPart (uploadResponse .getPartETag ());
187+ if (Strings .isEmpty (uploadId .get ())) {
188+ throw new IOException ("Failed to initialize multipart upload " + absoluteBlobKey );
189+ }
205190 }
191+ assert lastPart == false || successful : "must only write last part if successful" ;
192+ final UploadPartRequest uploadRequest = createPartUploadRequest (
193+ purpose ,
194+ buffer .bytes ().streamInput (),
195+ uploadId .get (),
196+ parts .size () + 1 ,
197+ absoluteBlobKey ,
198+ buffer .size (),
199+ lastPart
200+ );
201+ final UploadPartResult uploadResponse ;
202+ try (AmazonS3Reference clientReference = blobStore .clientReference ()) {
203+ uploadResponse = SocketAccess .doPrivileged (() -> clientReference .client ().uploadPart (uploadRequest ));
204+ }
205+ finishPart (uploadResponse .getPartETag ());
206+ }
206207
207- @ Override
208- protected void onCompletion () throws IOException {
209- if (flushedBytes == 0L ) {
210- writeBlob (purpose , blobName , buffer .bytes (), failIfAlreadyExists );
211- } else {
212- flushBuffer (true );
213- final CompleteMultipartUploadRequest complRequest = new CompleteMultipartUploadRequest (
214- blobStore .bucket (),
215- absoluteBlobKey ,
216- uploadId .get (),
217- parts
218- );
219- S3BlobStore .configureRequestForMetrics (complRequest , blobStore , Operation .PUT_MULTIPART_OBJECT , purpose );
208+ @ Override
209+ protected void onCompletion () throws IOException {
210+ if (flushedBytes == 0L ) {
211+ writeBlob (purpose , blobName , buffer .bytes (), failIfAlreadyExists );
212+ } else {
213+ flushBuffer (true );
214+ final CompleteMultipartUploadRequest complRequest = new CompleteMultipartUploadRequest (
215+ blobStore .bucket (),
216+ absoluteBlobKey ,
217+ uploadId .get (),
218+ parts
219+ );
220+ S3BlobStore .configureRequestForMetrics (complRequest , blobStore , Operation .PUT_MULTIPART_OBJECT , purpose );
221+ try (AmazonS3Reference clientReference = blobStore .clientReference ()) {
220222 SocketAccess .doPrivilegedVoid (() -> clientReference .client ().completeMultipartUpload (complRequest ));
221223 }
222224 }
225+ }
223226
224- @ Override
225- protected void onFailure () {
226- if (Strings .hasText (uploadId .get ())) {
227- abortMultiPartUpload (purpose , uploadId .get (), absoluteBlobKey );
228- }
227+ @ Override
228+ protected void onFailure () {
229+ if (Strings .hasText (uploadId .get ())) {
230+ abortMultiPartUpload (purpose , uploadId .get (), absoluteBlobKey );
229231 }
230232 }
231- ) {
233+ } ) {
232234 writer .accept (out );
233235 out .markSuccess ();
234236 }
@@ -360,12 +362,9 @@ public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<St
360362
361363 @ Override
362364 public Map <String , BlobMetadata > listBlobsByPrefix (OperationPurpose purpose , @ Nullable String blobNamePrefix ) throws IOException {
363- try (AmazonS3Reference clientReference = blobStore .clientReference ()) {
364- return executeListing (
365- purpose ,
366- clientReference ,
367- listObjectsRequest (purpose , blobNamePrefix == null ? keyPath : buildKey (blobNamePrefix ))
368- ).stream ()
365+ try {
366+ return executeListing (purpose , listObjectsRequest (purpose , blobNamePrefix == null ? keyPath : buildKey (blobNamePrefix )))
367+ .stream ()
369368 .flatMap (listing -> listing .getObjectSummaries ().stream ())
370369 .map (summary -> new BlobMetadata (summary .getKey ().substring (keyPath .length ()), summary .getSize ()))
371370 .collect (Collectors .toMap (BlobMetadata ::name , Function .identity ()));
@@ -381,8 +380,8 @@ public Map<String, BlobMetadata> listBlobs(OperationPurpose purpose) throws IOEx
381380
382381 @ Override
383382 public Map <String , BlobContainer > children (OperationPurpose purpose ) throws IOException {
384- try ( AmazonS3Reference clientReference = blobStore . clientReference ()) {
385- return executeListing (purpose , clientReference , listObjectsRequest (purpose , keyPath )).stream ().flatMap (listing -> {
383+ try {
384+ return executeListing (purpose , listObjectsRequest (purpose , keyPath )).stream ().flatMap (listing -> {
386385 assert listing .getObjectSummaries ().stream ().noneMatch (s -> {
387386 for (String commonPrefix : listing .getCommonPrefixes ()) {
388387 if (s .getKey ().substring (keyPath .length ()).startsWith (commonPrefix )) {
@@ -403,21 +402,19 @@ public Map<String, BlobContainer> children(OperationPurpose purpose) throws IOEx
403402 }
404403 }
405404
406- private List <ObjectListing > executeListing (
407- OperationPurpose purpose ,
408- AmazonS3Reference clientReference ,
409- ListObjectsRequest listObjectsRequest
410- ) {
405+ private List <ObjectListing > executeListing (OperationPurpose purpose , ListObjectsRequest listObjectsRequest ) {
411406 final List <ObjectListing > results = new ArrayList <>();
412407 ObjectListing prevListing = null ;
413408 while (true ) {
414409 ObjectListing list ;
415- if (prevListing != null ) {
416- final var listNextBatchOfObjectsRequest = new ListNextBatchOfObjectsRequest (prevListing );
417- S3BlobStore .configureRequestForMetrics (listNextBatchOfObjectsRequest , blobStore , Operation .LIST_OBJECTS , purpose );
418- list = SocketAccess .doPrivileged (() -> clientReference .client ().listNextBatchOfObjects (listNextBatchOfObjectsRequest ));
419- } else {
420- list = SocketAccess .doPrivileged (() -> clientReference .client ().listObjects (listObjectsRequest ));
410+ try (AmazonS3Reference clientReference = blobStore .clientReference ()) {
411+ if (prevListing != null ) {
412+ final var listNextBatchOfObjectsRequest = new ListNextBatchOfObjectsRequest (prevListing );
413+ S3BlobStore .configureRequestForMetrics (listNextBatchOfObjectsRequest , blobStore , Operation .LIST_OBJECTS , purpose );
414+ list = SocketAccess .doPrivileged (() -> clientReference .client ().listNextBatchOfObjects (listNextBatchOfObjectsRequest ));
415+ } else {
416+ list = SocketAccess .doPrivileged (() -> clientReference .client ().listObjects (listObjectsRequest ));
417+ }
421418 }
422419 results .add (list );
423420 if (list .isTruncated ()) {
@@ -504,13 +501,14 @@ void executeMultipartUpload(
504501 final SetOnce <String > uploadId = new SetOnce <>();
505502 final String bucketName = s3BlobStore .bucket ();
506503 boolean success = false ;
507- try (AmazonS3Reference clientReference = s3BlobStore .clientReference ()) {
508-
509- uploadId .set (
510- SocketAccess .doPrivileged (
511- () -> clientReference .client ().initiateMultipartUpload (initiateMultiPartUpload (purpose , blobName )).getUploadId ()
512- )
513- );
504+ try {
505+ try (AmazonS3Reference clientReference = s3BlobStore .clientReference ()) {
506+ uploadId .set (
507+ SocketAccess .doPrivileged (
508+ () -> clientReference .client ().initiateMultipartUpload (initiateMultiPartUpload (purpose , blobName )).getUploadId ()
509+ )
510+ );
511+ }
514512 if (Strings .isEmpty (uploadId .get ())) {
515513 throw new IOException ("Failed to initialize multipart upload " + blobName );
516514 }
@@ -531,8 +529,12 @@ void executeMultipartUpload(
531529 );
532530 bytesCount += uploadRequest .getPartSize ();
533531
534- final UploadPartResult uploadResponse = SocketAccess .doPrivileged (() -> clientReference .client ().uploadPart (uploadRequest ));
535- parts .add (uploadResponse .getPartETag ());
532+ try (AmazonS3Reference clientReference = s3BlobStore .clientReference ()) {
533+ final UploadPartResult uploadResponse = SocketAccess .doPrivileged (
534+ () -> clientReference .client ().uploadPart (uploadRequest )
535+ );
536+ parts .add (uploadResponse .getPartETag ());
537+ }
536538 }
537539
538540 if (bytesCount != blobSize ) {
@@ -548,7 +550,9 @@ void executeMultipartUpload(
548550 parts
549551 );
550552 S3BlobStore .configureRequestForMetrics (complRequest , blobStore , Operation .PUT_MULTIPART_OBJECT , purpose );
551- SocketAccess .doPrivilegedVoid (() -> clientReference .client ().completeMultipartUpload (complRequest ));
553+ try (AmazonS3Reference clientReference = s3BlobStore .clientReference ()) {
554+ SocketAccess .doPrivilegedVoid (() -> clientReference .client ().completeMultipartUpload (complRequest ));
555+ }
552556 success = true ;
553557
554558 } catch (final AmazonClientException e ) {
0 commit comments