2424import java .lang .invoke .MethodHandles ;
2525import java .util .ArrayList ;
2626import java .util .Collections ;
27- import java .util .Iterator ;
27+ import java .util .HashMap ;
2828import java .util .LinkedHashMap ;
2929import java .util .List ;
3030import java .util .Map ;
3131import java .util .Set ;
32- import java .util .concurrent .ConcurrentHashMap ;
3332import java .util .concurrent .ExecutorService ;
33+ import java .util .concurrent .Future ;
3434import java .util .concurrent .SynchronousQueue ;
3535import java .util .concurrent .TimeUnit ;
3636import java .util .function .Predicate ;
@@ -70,35 +70,27 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
7070
7171 private static final Logger log = LoggerFactory .getLogger (MethodHandles .lookup ().lookupClass ());
7272
73- private OverseerTaskQueue workQueue ;
74- private DistributedMap runningMap ;
75- private DistributedMap completedMap ;
76- private DistributedMap failureMap ;
73+ private final OverseerTaskQueue workQueue ;
74+ private final DistributedMap runningMap ;
75+ private final DistributedMap completedMap ;
76+ private final DistributedMap failureMap ;
7777
7878 /**
79- * Set that maintains a list of all the tasks that are running. This is keyed on zk id of the
80- * task.
79+ * All the tasks that have been submitted to the runner thread pool. This is keyed on zk id of the
80+ * task. It may contain tasks that have completed execution, have been entered into the
81+ * completed/failed map in zk but not deleted from the work-queue as that is a batched operation.
82+ * This collection is accessed only in the main loop and does not need to be synchronized.
8183 */
82- private final Set <String > runningTasks ;
84+ private final Map <String , Future <?> > runningTasks ;
8385
84- /** List of completed tasks. This is used to clean up workQueue in zk. */
85- private final ConcurrentHashMap <String , QueueEvent > completedTasks ;
86+ private final String myId ;
8687
87- private volatile String myId ;
88-
89- private volatile ZkStateReader zkStateReader ;
88+ private final ZkStateReader zkStateReader ;
9089
9190 private boolean isClosed ;
9291
93- private volatile Stats stats ;
94- private SolrMetricsContext overseerTaskProcessorMetricsContext ;
95-
96- /**
97- * Set of tasks that have been picked up for processing but not cleaned up from zk work-queue. It
98- * may contain tasks that have completed execution, have been entered into the completed/failed
99- * map in zk but not deleted from the work-queue as that is a batched operation.
100- */
101- private final Set <String > runningZKTasks ;
92+ private final Stats stats ;
93+ private final SolrMetricsContext overseerTaskProcessorMetricsContext ;
10294
10395 /**
10496 * This map may contain tasks which are read from work queue but could not be executed because
@@ -116,7 +108,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
116108 new Predicate <>() {
117109 @ Override
118110 public boolean test (String s ) {
119- return runningTasks .contains (s ) || blockedTasks .containsKey (s );
111+ return runningTasks .containsKey (s ) || blockedTasks .containsKey (s );
120112 }
121113
122114 @ Override
@@ -129,9 +121,9 @@ public String toString() {
129121
130122 protected OverseerMessageHandlerSelector selector ;
131123
132- private OverseerNodePrioritizer prioritizer ;
124+ private final OverseerNodePrioritizer prioritizer ;
133125
134- private String thisNode ;
126+ private final String thisNode ;
135127
136128 public OverseerTaskProcessor (
137129 ZkStateReader zkStateReader ,
@@ -153,9 +145,7 @@ public OverseerTaskProcessor(
153145 this .runningMap = runningMap ;
154146 this .completedMap = completedMap ;
155147 this .failureMap = failureMap ;
156- this .runningZKTasks = ConcurrentHashMap .newKeySet ();
157- this .runningTasks = ConcurrentHashMap .newKeySet ();
158- this .completedTasks = new ConcurrentHashMap <>();
148+ this .runningTasks = new HashMap <>();
159149 thisNode = MDCLoggingContext .getNodeName ();
160150
161151 overseerTaskProcessorMetricsContext = solrMetricsContext .getChildContext (this );
@@ -235,13 +225,7 @@ public void run() {
235225 continue ; // not a no, not a yes, try asking again
236226 }
237227
238- if (log .isDebugEnabled ()) {
239- log .debug (
240- "Cleaning up work-queue. #Running tasks: {} #Completed tasks: {}" ,
241- runningTasks .size (),
242- completedTasks .size ());
243- }
244- cleanUpWorkQueue ();
228+ cleanUpRunningTasks ();
245229
246230 printTrackingMaps ();
247231
@@ -254,7 +238,9 @@ public void run() {
254238 waited = true ;
255239 }
256240
257- if (waited ) cleanUpWorkQueue ();
241+ if (waited ) {
242+ cleanUpRunningTasks ();
243+ }
258244
259245 ArrayList <QueueEvent > heads = new ArrayList <>(blockedTasks .size () + MAX_PARALLEL_TASKS );
260246 heads .addAll (blockedTasks .values ());
@@ -309,17 +295,16 @@ public void run() {
309295 // on the OverseerCollectionMessageHandler
310296 batchSessionId ++;
311297
312- boolean tooManyTasks = false ;
313- for (QueueEvent head : heads ) {
314- if (!tooManyTasks ) {
315- tooManyTasks = runningTasks .size () >= MAX_PARALLEL_TASKS ;
316- }
317- if (tooManyTasks ) {
298+ for (int i = 0 ; i < heads .size (); i ++) {
299+ QueueEvent head = heads .get (i );
300+
301+ if (runningTasks .size () >= MAX_PARALLEL_TASKS ) {
318302 // Too many tasks are running, just shove the rest into the "blocked" queue.
319- blockedTasks .put (head .getId (), head );
320- continue ;
303+ heads . subList ( i , heads . size ()). forEach ( h -> blockedTasks .put (h .getId (), h ) );
304+ break ;
321305 }
322- if (runningZKTasks .contains (head .getId ())) continue ;
306+
307+ if (runningTasks .containsKey (head .getId ())) continue ;
323308 final ZkNodeProps message = ZkNodeProps .load (head .getBytes ());
324309 final String asyncId = message .getStr (ASYNC );
325310 if (hasLeftOverItems ) {
@@ -329,14 +314,14 @@ public void run() {
329314 log .debug (
330315 "Found already processed task in workQueue, cleaning up. AsyncId [{}]" ,
331316 asyncId );
332- workQueue .remove (head );
317+ workQueue .remove (head , false );
333318 continue ;
334319 }
335320 }
336321 String operation = message .getStr (Overseer .QUEUE_OPERATION );
337322 if (operation == null ) {
338323 log .error ("Msg does not have required {} : {}" , Overseer .QUEUE_OPERATION , message );
339- workQueue .remove (head );
324+ workQueue .remove (head , asyncId == null );
340325 continue ;
341326 }
342327 OverseerMessageHandler messageHandler = selector .selectOverseerMessageHandler (message );
@@ -349,7 +334,11 @@ public void run() {
349334 continue ;
350335 }
351336 try {
352- markTaskAsRunning (head , asyncId );
337+ if (asyncId != null ) {
338+ // Store the async ID in ZK distributed map before trying to execute the task
339+ runningMap .put (asyncId , null );
340+ }
341+
353342 if (log .isDebugEnabled ()) {
354343 log .debug ("Marked task [{}] as running" , head .getId ());
355344 }
@@ -372,8 +361,9 @@ public void run() {
372361 head .getId (),
373362 message );
374363 }
375- Runner runner = new Runner (messageHandler , message , operation , head , lock );
376- tpe .execute (runner );
364+ Runner runner = createRunner (messageHandler , message , operation , head , lock );
365+ Future <?> future = tpe .submit (runner );
366+ runningTasks .put (head .getId (), future );
377367 }
378368
379369 } catch (KeeperException e ) {
@@ -405,14 +395,12 @@ public void run() {
405395 }
406396 }
407397
408- private void cleanUpWorkQueue () throws KeeperException , InterruptedException {
409- Iterator <Map .Entry <String , QueueEvent >> it = completedTasks .entrySet ().iterator ();
410- while (it .hasNext ()) {
411- Map .Entry <String , QueueEvent > entry = it .next ();
412- workQueue .remove (entry .getValue ());
413- runningZKTasks .remove (entry .getKey ());
414- it .remove ();
398+ /** Remove all entries from {@link #runningTasks} that are done. */
399+ private void cleanUpRunningTasks () {
400+ if (log .isDebugEnabled ()) {
401+ log .debug ("Cleaning up running tasks. #Running tasks: {}" , runningTasks .size ());
415402 }
403+ runningTasks .entrySet ().removeIf (e -> e .getValue ().isDone ());
416404 }
417405
418406 @ Override
@@ -429,29 +417,19 @@ public void close() {
429417
430418 public static List <String > getSortedOverseerNodeNames (SolrZkClient zk )
431419 throws KeeperException , InterruptedException {
432- List <String > children = null ;
433- try {
434- children = zk .getChildren (Overseer .OVERSEER_ELECT + LeaderElector .ELECTION_NODE , null , true );
435- } catch (Exception e ) {
436- log .warn ("error " , e );
437- return new ArrayList <>();
438- }
439- LeaderElector .sortSeqs (children );
420+ List <String > children =
421+ getSortedElectionNodes (zk , Overseer .OVERSEER_ELECT + LeaderElector .ELECTION_NODE );
422+
440423 ArrayList <String > nodeNames = new ArrayList <>(children .size ());
441424 for (String c : children ) nodeNames .add (LeaderElector .getNodeName (c ));
442425 return nodeNames ;
443426 }
444427
445428 public static List <String > getSortedElectionNodes (SolrZkClient zk , String path )
446429 throws KeeperException , InterruptedException {
447- List <String > children = null ;
448- try {
449- children = zk .getChildren (path , null , true );
450- LeaderElector .sortSeqs (children );
451- return children ;
452- } catch (Exception e ) {
453- throw e ;
454- }
430+ List <String > children = zk .getChildren (path , null , true );
431+ LeaderElector .sortSeqs (children );
432+ return children ;
455433 }
456434
457435 public static String getLeaderNode (SolrZkClient zkClient )
@@ -518,12 +496,17 @@ public boolean isClosed() {
518496 return isClosed ;
519497 }
520498
521- private void markTaskAsRunning (QueueEvent head , String asyncId )
522- throws KeeperException , InterruptedException {
523- runningZKTasks .add (head .getId ());
524- runningTasks .add (head .getId ());
525-
526- if (asyncId != null ) runningMap .put (asyncId , null );
499+ /**
500+ * Create a runner instance to execute a single task. This method mosty exists to provide an
501+ * extension point for tests.
502+ */
503+ protected Runner createRunner (
504+ OverseerMessageHandler messageHandler ,
505+ ZkNodeProps message ,
506+ String operation ,
507+ QueueEvent head ,
508+ OverseerMessageHandler .Lock lock ) {
509+ return new Runner (messageHandler , message , operation , head , lock );
527510 }
528511
529512 protected class Runner implements Runnable {
@@ -589,7 +572,7 @@ public void run() {
589572 }
590573 }
591574
592- markTaskComplete (head . getId (), asyncId );
575+ markTaskComplete (asyncId );
593576 if (log .isDebugEnabled ()) {
594577 log .debug ("Marked task [{}] as completed." , head .getId ());
595578 }
@@ -609,15 +592,15 @@ public void run() {
609592 log .error ("KeeperException" , e );
610593 } catch (InterruptedException e ) {
611594 // Reset task from tracking data structures so that it can be retried.
612- resetTaskWithException (messageHandler , head .getId (), asyncId , taskKey , message );
595+ resetTaskWithException (head .getId (), asyncId , taskKey );
613596 log .warn ("Resetting task {} as the thread was interrupted." , head .getId ());
614597 Thread .currentThread ().interrupt ();
615598 } finally {
616599 lock .unlock ();
617600 if (!success ) {
618601 // Reset task from tracking data structures so that it can be retried.
619602 try {
620- resetTaskWithException (messageHandler , head .getId (), asyncId , taskKey , message );
603+ resetTaskWithException (head .getId (), asyncId , taskKey );
621604 } catch (IllegalStateException ignore ) {
622605
623606 }
@@ -628,26 +611,26 @@ public void run() {
628611 }
629612 }
630613
631- private void markTaskComplete (String id , String asyncId )
632- throws KeeperException , InterruptedException {
633- completedTasks .put (id , head );
634- runningTasks .remove (id );
614+ /**
615+ * Invoked by the runner thread once the task is successfully completed (can be with error
616+ * status). We remove the task from the work queue.
617+ */
618+ private void markTaskComplete (String asyncId ) throws KeeperException , InterruptedException {
635619
636620 if (asyncId != null ) {
637621 if (!runningMap .remove (asyncId )) {
638622 log .warn ("Could not find and remove async call [{}] from the running map." , asyncId );
639623 }
640624 }
641625
642- workQueue .remove (head );
626+ workQueue .remove (head , asyncId == null );
643627 }
644628
645- private void resetTaskWithException (
646- OverseerMessageHandler messageHandler ,
647- String id ,
648- String asyncId ,
649- String taskKey ,
650- ZkNodeProps message ) {
629+ /**
630+ * Reset the task so it will be retried by a future iteration of the main loop. We remove the
631+ * async ID from the running map, but we keep the task in the work queue.
632+ */
633+ private void resetTaskWithException (String id , String asyncId , String taskKey ) {
651634 log .warn ("Resetting task: {}, requestid: {}, taskKey: {}" , id , asyncId , taskKey );
652635 try {
653636 if (asyncId != null ) {
@@ -656,7 +639,6 @@ private void resetTaskWithException(
656639 }
657640 }
658641
659- runningTasks .remove (id );
660642 } catch (KeeperException e ) {
661643 log .error ("KeeperException" , e );
662644 } catch (InterruptedException e ) {
@@ -682,10 +664,8 @@ private boolean isSuccessful() {
682664
683665 private void printTrackingMaps () {
684666 if (log .isDebugEnabled ()) {
685- log .debug ("RunningTasks: {}" , runningTasks );
667+ log .debug ("RunningTasks: {}" , runningTasks . keySet () );
686668 log .debug ("BlockedTasks: {}" , blockedTasks .keySet ()); // nowarn
687- log .debug ("CompletedTasks: {}" , completedTasks .keySet ()); // nowarn
688- log .debug ("RunningZKTasks: {}" , runningZKTasks ); // nowarn
689669 }
690670 }
691671
0 commit comments