3232import java .io .IOException ;
3333import java .util .NoSuchElementException ;
3434import java .util .UUID ;
35- import java .util .concurrent .Callable ;
3635import java .util .concurrent .CountDownLatch ;
3736import java .util .concurrent .ExecutorService ;
37+ import java .util .concurrent .TimeUnit ;
3838import java .util .concurrent .atomic .AtomicInteger ;
3939import java .util .concurrent .atomic .AtomicReference ;
4040import java .util .function .BooleanSupplier ;
4343import org .slf4j .Logger ;
4444import org .slf4j .LoggerFactory ;
4545
46-
4746/**
4847 * An implementation of {@link TransferStrategy} that provides a default implementation {@code transfer}
4948 * implementation.
@@ -127,25 +126,22 @@ public void transfer() throws IOException {
127126 }
128127
129128 private void transferAllJobBlobs () throws IOException {
130- final BooleanSupplier cancelationCheck = () -> ! canceled ;
129+ final BooleanSupplier cancellationCheck = () -> !canceled ;
131130 final AtomicInteger numBlobsRemaining = new AtomicInteger (jobState .numBlobsInJob ());
132131
133- while ( ! Thread .currentThread ().isInterrupted () && cancelationCheck .getAsBoolean () && numBlobsRemaining .get () > 0 ) {
132+ while (! Thread .currentThread ().isInterrupted () && cancellationCheck .getAsBoolean () && numBlobsRemaining .get () > 0 ) {
134133 try {
135- final Iterable <JobPart > jobParts = jobPartsNotAlreadyTransferred (cancelationCheck );
134+ final Iterable <JobPart > jobParts = jobPartsNotAlreadyTransferred (cancellationCheck );
136135
137136 final int numJobParts = Iterables .size (jobParts );
138137
139138 if (numJobParts == 0 ) {
140139 break ;
141- } else if (numJobParts < 0 ) {
142- LOG .error ("Had negative number of job parts" );
143- return ;
144140 }
145141
146- final CountDownLatch jobCompletionWait = new CountDownLatch (1 );
147- transferJobParts (jobParts , jobCompletionWait , numBlobsRemaining , cancelationCheck );
148- jobCompletionWait .await ();
142+ final CountDownLatch countDownLatch = new CountDownLatch (numJobParts );
143+ transferJobParts (jobParts , countDownLatch , numBlobsRemaining , cancellationCheck );
144+ countDownLatch .await ();
149145 } catch (final Ds3NoMoreRetriesException | FailedRequestException e ) {
150146 emitFailureEvent (makeFailureEvent (failureActivity , e , firstChunk ()));
151147 throw e ;
@@ -158,15 +154,15 @@ private void transferAllJobBlobs() throws IOException {
158154 }
159155
160156
161- private Iterable <JobPart > jobPartsNotAlreadyTransferred (final BooleanSupplier transferPredicate ) throws IOException , InterruptedException {
157+ private Iterable <JobPart > jobPartsNotAlreadyTransferred (final BooleanSupplier cancellationCheck ) throws IOException , InterruptedException {
162158 // If we've been canceled, bail
163- if ( ! transferPredicate .getAsBoolean ()) {
159+ if ( ! cancellationCheck .getAsBoolean ()) {
164160 return FluentIterable .of ();
165161 }
166162
167163 return FluentIterable
168164 .from (blobStrategy .getWork ())
169- .filter (jobPart -> jobState .contains (jobPart .getBlob ()) && transferPredicate .getAsBoolean ());
165+ .filter (jobPart -> jobState .contains (jobPart .getBlob ()) && cancellationCheck .getAsBoolean ());
170166 }
171167
172168 private void emitFailureAndSetCachedException (final Throwable t ) {
@@ -187,46 +183,33 @@ private synchronized void maybeSetCachedException(final Throwable t) {
187183 private void transferJobParts (final Iterable <JobPart > jobParts ,
188184 final CountDownLatch countDownLatch ,
189185 final AtomicInteger numBlobsTransferred ,
190- final BooleanSupplier transferPredicate )
191- {
192- final AtomicInteger numJobPartsToTransfer = new AtomicInteger (Iterables .size (jobParts ));
193-
186+ final BooleanSupplier cancellationCheck ) {
194187 for (final JobPart jobPart : jobParts ) {
195188 if (executorService .isShutdown ()) {
196189 LOG .debug ("Executor service is shut down, decrementing countdown latch" );
197190 countDownLatch .countDown ();
198191 } else {
199- maybeSubmitJobPartTransfer (countDownLatch , numBlobsTransferred , transferPredicate , numJobPartsToTransfer , jobPart );
192+ executorService .execute (() -> {
193+ try {
194+ if (cancellationCheck .getAsBoolean ()) {
195+ transferMethod .transferJobPart (jobPart );
196+ }
197+ } catch (final RuntimeException e ) {
198+ emitFailureAndSetCachedException (e );
199+ throw e ;
200+ } catch (final Exception e ) {
201+ emitFailureAndSetCachedException (e );
202+ throw new RuntimeException (e );
203+ } finally {
204+ jobState .blobTransferredOrFailed (jobPart .getBlob ());
205+ numBlobsTransferred .decrementAndGet ();
206+ countDownLatch .countDown ();
207+ }
208+ });
200209 }
201210 }
202211 }
203212
204- private void maybeSubmitJobPartTransfer (final CountDownLatch countDownLatch , final AtomicInteger numBlobsTransferred , final BooleanSupplier transferPredicate , final AtomicInteger numJobPartsToTransfer , final JobPart jobPart ) {
205- executorService .submit ((Callable <Void >) () -> {
206- try {
207- if ( ! transferPredicate .getAsBoolean ()) {
208- countDownLatch .countDown ();
209- } else {
210- transferMethod .transferJobPart (jobPart );
211- }
212- } catch (final RuntimeException e ) {
213- emitFailureAndSetCachedException (e );
214- throw e ;
215- } catch (final Exception e ) {
216- emitFailureAndSetCachedException (e );
217- throw new RuntimeException (e );
218- } finally {
219- jobState .blobTransferredOrFailed (jobPart .getBlob ());
220- numBlobsTransferred .decrementAndGet ();
221- if (numJobPartsToTransfer .decrementAndGet () <= 0 ) {
222- countDownLatch .countDown ();
223- }
224- }
225-
226- return null ;
227- });
228- }
229-
230213 private void emitFailureEvent (final FailureEvent failureEvent ) {
231214 eventDispatcher .emitFailureEvent (failureEvent );
232215 }
@@ -261,6 +244,11 @@ private Objects firstChunk() {
261244 public void close () {
262245 canceled = true ;
263246 executorService .shutdown ();
247+ try {
248+ executorService .awaitTermination (10000 , TimeUnit .MILLISECONDS );
249+ } catch (final InterruptedException e ) {
250+ executorService .shutdownNow ();
251+ }
264252 }
265253
266254 @ Override
0 commit comments