1515
1616package com .amazonaws .mobileconnectors .s3 .transferutility ;
1717
18+ import com .amazonaws .AbortedException ;
1819import com .amazonaws .event .ProgressEvent ;
1920import com .amazonaws .event .ProgressListener ;
2021import com .amazonaws .services .s3 .AmazonS3 ;
2526import com .amazonaws .logging .LogFactory ;
2627
2728import java .util .concurrent .Callable ;
29+ import java .util .concurrent .TimeUnit ;
2830
2931class UploadPartTask implements Callable <Boolean > {
3032 private static final Log LOGGER = LogFactory .getLog (UploadPartTask .class );
31-
33+ private static final int RETRY_COUNT = 3 ;
3234
3335 private final UploadTask .UploadPartTaskMetadata uploadPartTaskMetadata ;
34- private final UploadTask . UploadTaskProgressListener uploadTaskProgressListener ;
36+ private final UploadPartTaskProgressListener uploadPartTaskProgressListener ;
3537 private final UploadPartRequest uploadPartRequest ;
3638 private final AmazonS3 s3 ;
3739 private final TransferDBUtil dbUtil ;
@@ -42,7 +44,7 @@ public UploadPartTask(UploadTask.UploadPartTaskMetadata uploadPartTaskMetadata,
4244 AmazonS3 s3 ,
4345 TransferDBUtil dbUtil ) {
4446 this .uploadPartTaskMetadata = uploadPartTaskMetadata ;
45- this .uploadTaskProgressListener = uploadTaskProgressListener ;
47+ this .uploadPartTaskProgressListener = new UploadPartTaskProgressListener ( uploadTaskProgressListener ) ;
4648 this .uploadPartRequest = uploadPartRequest ;
4749 this .s3 = s3 ;
4850 this .dbUtil = dbUtil ;
@@ -53,53 +55,75 @@ public UploadPartTask(UploadTask.UploadPartTaskMetadata uploadPartTaskMetadata,
5355 */
5456 @ Override
5557 public Boolean call () throws Exception {
56- try {
57- uploadPartTaskMetadata .state = TransferState .IN_PROGRESS ;
58- uploadPartRequest .setGeneralProgressListener (new UploadPartTaskProgressListener (uploadTaskProgressListener ));
59- final UploadPartResult putPartResult = s3 .uploadPart (uploadPartRequest );
60- uploadPartTaskMetadata .state = TransferState .PART_COMPLETED ;
61- dbUtil .updateState (uploadPartRequest .getId (), TransferState .PART_COMPLETED );
62- dbUtil .updateETag (uploadPartRequest .getId (), putPartResult .getETag ());
63- return true ;
64- } catch (final Exception e ) {
65- LOGGER .error ("Upload part interrupted: " + e );
66- ProgressEvent resetEvent = new ProgressEvent (0 );
67- resetEvent .setEventCode (ProgressEvent .RESET_EVENT_CODE );
68- uploadTaskProgressListener .progressChanged (new ProgressEvent (0 ));
69-
70- // Check if network is not connected, set the state to WAITING_FOR_NETWORK.
58+ uploadPartTaskMetadata .state = TransferState .IN_PROGRESS ;
59+ uploadPartRequest .setGeneralProgressListener (uploadPartTaskProgressListener );
60+ int retried = 1 ;
61+ while (true ) {
7162 try {
72- if (TransferNetworkLossHandler .getInstance () != null &&
73- !TransferNetworkLossHandler .getInstance ().isNetworkConnected ()) {
74- LOGGER .info ("Thread: [" + Thread .currentThread ().getId () + "]: Network wasn't available." );
75- /*
76- * Network connection is being interrupted. Moving the TransferState
77- * to WAITING_FOR_NETWORK till the network availability resumes.
78- */
79- uploadPartTaskMetadata .state = TransferState .WAITING_FOR_NETWORK ;
80- dbUtil .updateState (uploadPartRequest .getId (), TransferState .WAITING_FOR_NETWORK );
81- LOGGER .info ("Network Connection Interrupted: " +
82- "Moving the TransferState to WAITING_FOR_NETWORK" );
83- return false ;
63+ final UploadPartResult putPartResult = s3 .uploadPart (uploadPartRequest );
64+ setTaskState (TransferState .PART_COMPLETED );
65+ dbUtil .updateETag (uploadPartRequest .getId (), putPartResult .getETag ());
66+ return true ;
67+ } catch (AbortedException e ) {
68+ // If request got aborted, operation was paused or canceled. do not retry.
69+ LOGGER .debug ("Upload part aborted." );
70+ resetProgress ();
71+ return false ;
72+ } catch (final Exception e ) {
73+ LOGGER .error ("Unexpected error occurred: " + e );
74+ resetProgress ();
75+
76+ // Check if network is not connected, set the state to WAITING_FOR_NETWORK.
77+ try {
78+ if (TransferNetworkLossHandler .getInstance () != null &&
79+ !TransferNetworkLossHandler .getInstance ().isNetworkConnected ()) {
80+ LOGGER .info ("Thread: [" + Thread .currentThread ().getId () + "]: Network wasn't available." );
81+ /*
82+ * Network connection is being interrupted. Moving the TransferState
83+ * to WAITING_FOR_NETWORK till the network availability resumes.
84+ */
85+ uploadPartTaskMetadata .state = TransferState .WAITING_FOR_NETWORK ;
86+ dbUtil .updateState (uploadPartRequest .getId (), TransferState .WAITING_FOR_NETWORK );
87+ LOGGER .info ("Network Connection Interrupted: " +
88+ "Moving the TransferState to WAITING_FOR_NETWORK" );
89+ return false ;
90+ }
91+ } catch (TransferUtilityException transferUtilityException ) {
92+ LOGGER .error ("TransferUtilityException: [" + transferUtilityException + "]" );
93+ }
94+
95+ if (retried >= RETRY_COUNT ) {
96+ setTaskState (TransferState .FAILED );
97+ LOGGER .error ("Encountered error uploading part " , e );
98+ throw e ;
8499 }
85- } catch (TransferUtilityException transferUtilityException ) {
86- LOGGER .error ("TransferUtilityException: [" + transferUtilityException + "]" );
87- }
88100
89- // In other cases, set the transfer state to FAILED.
90- uploadPartTaskMetadata .state = TransferState .FAILED ;
91- dbUtil .updateState (uploadPartRequest .getId (), TransferState .FAILED );
92- LOGGER .error ("Encountered error uploading part " , e );
93- throw e ;
101+ // Sleep before retrying
102+ long delayMs = exponentialBackoffWithJitter (retried );
103+ LOGGER .info ("Retrying in " + delayMs + " ms." );
104+ TimeUnit .MILLISECONDS .sleep (delayMs );
105+ LOGGER .debug ("Retry attempt: " + retried ++, e );
106+ }
94107 }
95108 }
96109
110+ private void setTaskState (TransferState newState ) {
111+ uploadPartTaskMetadata .state = newState ;
112+ dbUtil .updateState (uploadPartRequest .getId (),newState );
113+ }
114+
115+ private void resetProgress () {
116+ ProgressEvent resetEvent = new ProgressEvent (0 );
117+ resetEvent .setEventCode (ProgressEvent .RESET_EVENT_CODE );
118+ uploadPartTaskProgressListener .progressChanged (resetEvent );
119+ }
120+
97121 /**
98122 * Progress Listener for a part
99123 */
100124 private class UploadPartTaskProgressListener implements ProgressListener {
101125
102- private UploadTask .UploadTaskProgressListener uploadTaskProgressListener ;
126+ private final UploadTask .UploadTaskProgressListener uploadTaskProgressListener ;
103127
104128 private long bytesTransferredSoFar ;
105129
@@ -111,15 +135,24 @@ public UploadPartTaskProgressListener(UploadTask.UploadTaskProgressListener prog
111135 public void progressChanged (ProgressEvent progressEvent ) {
112136 if (ProgressEvent .RESET_EVENT_CODE == progressEvent .getEventCode ()) {
113137 // Reset will discard what's been transferred
114- LOGGER .info ("Reset Event triggered. Resetting the bytesCurrent to 0." );
138+ LOGGER .debug ("Reset Event triggered. Resetting the bytesCurrent to 0." );
115139 // Reset the local counter to 0.
116140 bytesTransferredSoFar = 0 ;
117141 } else {
118142 bytesTransferredSoFar += progressEvent .getBytesTransferred ();
119143 }
120- this .uploadTaskProgressListener
121- .onProgressChanged (UploadPartTask .this .uploadPartRequest .getPartNumber (),
122- bytesTransferredSoFar );
144+ this .uploadTaskProgressListener .onProgressChanged (
145+ uploadPartRequest .getPartNumber (),
146+ bytesTransferredSoFar
147+ );
123148 }
124149 }
150+
151+ private long exponentialBackoffWithJitter (int retryAttempt ) {
152+ final long baseTimeMs = 1000L ;
153+ final long jitterFactor = 1000L ;
154+ long delay = baseTimeMs * (1 << retryAttempt );
155+ long jitter = (long ) (jitterFactor * Math .random ());
156+ return delay + jitter ;
157+ }
125158}
0 commit comments