44import software .amazon .awssdk .core .async .AsyncRequestBody ;
55import software .amazon .awssdk .services .s3 .model .PutObjectRequest ;
66import software .amazon .awssdk .transfer .s3 .S3TransferManager ;
7- import software .amazon .awssdk .transfer .s3 .model .FileUpload ;
87import software .amazon .awssdk .transfer .s3 .model .Upload ;
98import software .amazon .awssdk .transfer .s3 .model .UploadRequest ;
109import software .amazon .awssdk .transfer .s3 .progress .TransferListener ;
1413import java .io .InputStream ;
1514import java .util .HashMap ;
1615import java .util .Map ;
16+ import java .util .concurrent .ConcurrentHashMap ;
17+ import java .util .concurrent .ExecutionException ;
1718import java .util .concurrent .ExecutorService ;
1819import java .util .concurrent .Executors ;
20+ import java .util .concurrent .LinkedBlockingQueue ;
21+ import java .util .concurrent .ThreadPoolExecutor ;
22+ import java .util .concurrent .TimeUnit ;
23+ import java .util .concurrent .TimeoutException ;
1924import java .util .function .Consumer ;
2025import java .util .logging .Logger ;
2126
@@ -25,9 +30,22 @@ private Uploads() {}
2530 public static final int MULTIPART_UPLOAD_THRESHOLD = 16 *1024 *1024 ; // 16 MB
2631
2732 private static transient volatile Uploads instance ;
28- private final transient HashMap <FilePath , Upload > startedUploads = new HashMap <>();
29- private final ExecutorService executors = Executors .newScheduledThreadPool (1 , new NamedThreadFactory (Executors .defaultThreadFactory (), Uploads .class .getName ()));
30- private final transient HashMap <FilePath , InputStream > openedStreams = new HashMap <>();
33+
34+ private final transient Map <FilePath , Upload > startedUploads = new ConcurrentHashMap <>();
35+ private final ExecutorService executors ;
36+ // This creates a cached thread pool with an upper bound (5) on threads to be spawned on demand.
37+ {
38+ ThreadPoolExecutor pool = new ThreadPoolExecutor (
39+ 5 , 5 ,
40+ 60L , TimeUnit .SECONDS ,
41+ new LinkedBlockingQueue <>(),
42+ new NamedThreadFactory (Executors .defaultThreadFactory (), Uploads .class .getName ())
43+ );
44+ pool .allowCoreThreadTimeOut (true );
45+ executors = pool ;
46+ }
47+
48+ private final transient Map <FilePath , InputStream > openedStreams = new ConcurrentHashMap <>();
3149
3250 public Upload startUploading (S3TransferManager manager , FilePath file , InputStream inputStream , String bucketName , String objectName , Metadata metadata , TransferListener listener ) {
3351 UploadRequest .Builder request = UploadRequest .builder ();
@@ -43,16 +61,24 @@ public Upload startUploading(S3TransferManager manager, FilePath file, InputStre
4361 return upload ;
4462 }
4563
46- public void finishUploading (FilePath filePath ) throws InterruptedException {
64+ public void finishUploading (FilePath filePath ) throws InterruptedException , IOException {
4765 final Upload upload = startedUploads .remove (filePath );
4866 if (upload == null ) {
4967 LOGGER .info ("File: " + filePath .getName () + " already was uploaded" );
5068 return ;
5169 }
5270 try {
53- upload .completionFuture ().join ();
54- }
55- finally {
71+ upload .completionFuture ().get (1 , TimeUnit .HOURS );
72+ } catch (InterruptedException e ) {
73+ Thread .currentThread ().interrupt ();
74+ upload .completionFuture ().cancel (true ); // cancel the upload
75+ throw e ;
76+ } catch (ExecutionException e ) {
77+ throw new IOException ("Upload failed for: " + filePath .getName (), e .getCause ());
78+ } catch (TimeoutException e ) {
79+ upload .completionFuture ().cancel (true );
80+ throw new IOException ("Upload timed out for: " + filePath .getName (), e );
81+ } finally {
5682 closeStream (filePath );
5783 }
5884 }
0 commit comments