@@ -90,7 +90,6 @@ public class QueryBatcherImpl extends BatcherImpl implements QueryBatcher {
9090 private Calendar jobStartTime ;
9191 private Calendar jobEndTime ;
9292 private long maxUris = Long .MAX_VALUE ;
93- private AtomicBoolean jobDone = new AtomicBoolean (false );
9493 private long maxBatches = Long .MAX_VALUE ;
9594
9695 public QueryBatcherImpl (QueryDefinition query , DataMovementManager moveMgr , ForestConfiguration forestConfig ) {
@@ -617,10 +616,7 @@ public void run() {
617616 "for that forest has already been retrieved" , forest .getForestName (), forestBatchNum , start );
618617 return ;
619618 }
620- if (jobDone .get ()) {
621- logger .info ("The number of uris collected has reached the maximum limit." );
622- return ;
623- }
619+
624620 // don't proceed if this job is stopped (because dataMovementManager.stopJob was called)
625621 if ( stopped .get () == true ) {
626622 logger .warn ("Cancelling task to query forest '{}' forestBatchNum {} with start {} after the job is stopped" ,
@@ -662,21 +658,20 @@ public void run() {
662658 for ( String uri : results ) {
663659 uris .add ( uri );
664660 }
665- if ( uris .size () == getBatchSize () ) {
666- nextAfterUri = uris .get (getBatchSize () - 1 );
667- // this is a full batch
668- launchNextTask ();
669- }
661+
670662 batch = batch
671663 .withItems (uris .toArray (new String [uris .size ()]))
672664 .withServerTimestamp (serverTimestamp .get ())
673665 .withJobResultsSoFar (resultsSoFar .addAndGet (uris .size ()))
674666 .withForestResultsSoFar (forestResults .get (forest ).addAndGet (uris .size ()));
675667
676668 if (maxUris <= (resultsSoFar .longValue ())) {
677- jobDone .set (true );
678669 isDone .set (true );
679- }
670+ } else if ( uris .size () == getBatchSize () ) {
671+ nextAfterUri = uris .get (getBatchSize () - 1 );
672+ // this is a full batch
673+ launchNextTask ();
674+ }
680675
681676 logger .trace ("batch size={}, jobBatchNumber={}, jobResultsSoFar={}, forest={}" , uris .size (),
682677 batch .getJobBatchNumber (), batch .getJobResultsSoFar (), forest .getForestName ());
@@ -734,7 +729,10 @@ private void launchNextTask() {
734729 }
735730 AtomicBoolean isDone = forestIsDone .get (forest );
736731 // we made it to the end, so don't launch anymore tasks
737- if ( isDone .get () == true ) return ;
732+ if ( isDone .get () == true ) {
733+ shutdownIfAllForestsAreDone ();
734+ return ;
735+ }
738736 long nextStart = start + getBatchSize ();
739737 threadPool .execute (new QueryTask (moveMgr , batcher , forest , query , forestBatchNum + 1 , nextStart , nextAfterUri ));
740738 }
0 commit comments