3232import org .apache .amoro .exception .IllegalTaskStateException ;
3333import org .apache .amoro .exception .ObjectNotExistsException ;
3434import org .apache .amoro .exception .PluginRetryAuthException ;
35+ import org .apache .amoro .resource .Resource ;
36+ import org .apache .amoro .resource .ResourceContainer ;
3537import org .apache .amoro .resource .ResourceGroup ;
38+ import org .apache .amoro .resource .ResourceType ;
3639import org .apache .amoro .server .catalog .CatalogManager ;
40+ import org .apache .amoro .server .dashboard .model .OptimizerResourceInfo ;
41+ import org .apache .amoro .server .manager .AbstractOptimizerContainer ;
3742import org .apache .amoro .server .optimizing .OptimizingProcess ;
3843import org .apache .amoro .server .optimizing .OptimizingQueue ;
3944import org .apache .amoro .server .optimizing .OptimizingStatus ;
4348import org .apache .amoro .server .persistence .mapper .ResourceMapper ;
4449import org .apache .amoro .server .persistence .mapper .TableProcessMapper ;
4550import org .apache .amoro .server .process .TableProcessMeta ;
51+ import org .apache .amoro .server .resource .Containers ;
4652import org .apache .amoro .server .resource .OptimizerInstance ;
4753import org .apache .amoro .server .resource .OptimizerManager ;
4854import org .apache .amoro .server .resource .OptimizerThread ;
@@ -89,6 +95,8 @@ public class DefaultOptimizingService extends StatedPersistentBase
8995
9096 private static final Logger LOG = LoggerFactory .getLogger (DefaultOptimizingService .class );
9197
98+ private final long groupMinParallelismCheckInterval ;
99+ private final int groupMaxKeepingAttempts ;
92100 private final long optimizerTouchTimeout ;
93101 private final long taskAckTimeout ;
94102 private final long taskExecuteTimeout ;
@@ -99,7 +107,9 @@ public class DefaultOptimizingService extends StatedPersistentBase
99107 private final Map <String , OptimizingQueue > optimizingQueueByGroup = new ConcurrentHashMap <>();
100108 private final Map <String , OptimizingQueue > optimizingQueueByToken = new ConcurrentHashMap <>();
101109 private final Map <String , OptimizerInstance > authOptimizers = new ConcurrentHashMap <>();
102- private final OptimizerKeeper optimizerKeeper = new OptimizerKeeper ();
110+ private final OptimizerKeeper optimizerKeeper = new OptimizerKeeper ("optimizer-keeper-thread" );
111+ private final OptimizerGroupKeeper optimizerGroupKeeper =
112+ new OptimizerGroupKeeper ("optimizer-group-keeper-thread" );
103113 private final OptimizingConfigWatcher optimizingConfigWatcher = new OptimizingConfigWatcher ();
104114 private final CatalogManager catalogManager ;
105115 private final OptimizerManager optimizerManager ;
@@ -126,6 +136,11 @@ public DefaultOptimizingService(
126136 serviceConfig .getDurationInMillis (AmoroManagementConf .OPTIMIZER_POLLING_TIMEOUT );
127137 this .breakQuotaLimit =
128138 serviceConfig .getBoolean (AmoroManagementConf .OPTIMIZING_BREAK_QUOTA_LIMIT_ENABLED );
139+ this .groupMinParallelismCheckInterval =
140+ serviceConfig .getDurationInMillis (
141+ AmoroManagementConf .OPTIMIZER_GROUP_MIN_PARALLELISM_CHECK_INTERVAL );
142+ this .groupMaxKeepingAttempts =
143+ serviceConfig .getInteger (AmoroManagementConf .OPTIMIZER_GROUP_MAX_KEEPING_ATTEMPTS );
129144 this .tableService = tableService ;
130145 this .catalogManager = catalogManager ;
131146 this .optimizerManager = optimizerManager ;
@@ -161,6 +176,7 @@ private void loadOptimizingQueues(List<DefaultTableRuntime> tableRuntimeList) {
161176 Optional .ofNullable (tableRuntimes ).orElseGet (ArrayList ::new ),
162177 maxPlanningParallelism );
163178 optimizingQueueByGroup .put (groupName , optimizingQueue );
179+ optimizerGroupKeeper .keepInTouch (groupName , 1 );
164180 });
165181 optimizers .forEach (optimizer -> registerOptimizer (optimizer , false ));
166182 // Avoid keeping the tables in processing/pending status forever in below cases:
@@ -348,7 +364,9 @@ public void createResourceGroup(ResourceGroup resourceGroup) {
348364 planExecutor ,
349365 new ArrayList <>(),
350366 maxPlanningParallelism );
351- optimizingQueueByGroup .put (resourceGroup .getName (), optimizingQueue );
367+ String groupName = resourceGroup .getName ();
368+ optimizingQueueByGroup .put (groupName , optimizingQueue );
369+ optimizerGroupKeeper .keepInTouch (groupName , 1 );
352370 });
353371 }
354372
@@ -369,6 +387,7 @@ public void dispose() {
369387 // dispose all queues
370388 optimizingQueueByGroup .values ().forEach (OptimizingQueue ::dispose );
371389 optimizerKeeper .dispose ();
390+ optimizerGroupKeeper .dispose ();
372391 tableHandlerChain .dispose ();
373392 optimizingQueueByGroup .clear ();
374393 optimizingQueueByToken .clear ();
@@ -439,6 +458,7 @@ protected void initHandler(List<TableRuntime> tableRuntimeList) {
439458 .map (t -> (DefaultTableRuntime ) t )
440459 .collect (Collectors .toList ()));
441460 optimizerKeeper .start ();
461+ optimizerGroupKeeper .start ();
442462 optimizingConfigWatcher .start ();
443463 LOG .info ("SuspendingDetector for Optimizer has been started." );
444464 LOG .info ("OptimizerManagementService initializing has completed" );
@@ -489,21 +509,16 @@ public OptimizerInstance getOptimizer() {
489509 }
490510 }
491511
492- private class OptimizerKeeper implements Runnable {
493-
494- private volatile boolean stopped = false ;
495- private final Thread thread = new Thread (this , "optimizer-keeper-thread" );
496- private final DelayQueue <OptimizerKeepingTask > suspendingQueue = new DelayQueue <>();
512+ protected abstract class AbstractKeeper <T extends Delayed > implements Runnable {
513+ protected volatile boolean stopped = false ;
514+ protected final Thread thread = new Thread (this );
515+ protected final DelayQueue <T > suspendingQueue = new DelayQueue <>();
497516
498- public OptimizerKeeper () {
517+ public AbstractKeeper (String threadName ) {
518+ thread .setName (threadName );
499519 thread .setDaemon (true );
500520 }
501521
502- public void keepInTouch (OptimizerInstance optimizerInstance ) {
503- Preconditions .checkNotNull (optimizerInstance , "token can not be null" );
504- suspendingQueue .add (new OptimizerKeepingTask (optimizerInstance ));
505- }
506-
507522 public void start () {
508523 thread .start ();
509524 }
@@ -517,30 +532,49 @@ public void dispose() {
517532 public void run () {
518533 while (!stopped ) {
519534 try {
520- OptimizerKeepingTask keepingTask = suspendingQueue .take ();
521- String token = keepingTask .getToken ();
522- boolean isExpired = !keepingTask .tryKeeping ();
523- if (isExpired ) {
524- LOG .info ("Optimizer {} has been expired, unregister it" , keepingTask .getOptimizer ());
525- unregisterOptimizer (token );
526- }
527- Optional .ofNullable (keepingTask .getQueue ())
528- .ifPresent (
529- queue ->
530- queue
531- .collectTasks (buildSuspendingPredication (authOptimizers .keySet ()))
532- .forEach (task -> retryTask (task , queue )));
533- if (!isExpired ) {
534- LOG .debug ("Optimizer {} is being touched, keep it" , keepingTask .getOptimizer ());
535- keepInTouch (keepingTask .getOptimizer ());
536- }
535+ T keepingTask = suspendingQueue .take ();
536+ this .processTask (keepingTask );
537537 } catch (InterruptedException ignored ) {
538538 } catch (Throwable t ) {
539- LOG .error ("OptimizerKeeper has encountered a problem." , t );
539+ LOG .error ("{} has encountered a problem." , this . getClass (). getSimpleName () , t );
540540 }
541541 }
542542 }
543543
544+ protected abstract void processTask (T task ) throws Exception ;
545+ }
546+
547+ private class OptimizerKeeper extends AbstractKeeper <OptimizerKeepingTask > {
548+
549+ public OptimizerKeeper (String threadName ) {
550+ super (threadName );
551+ }
552+
553+ public void keepInTouch (OptimizerInstance optimizerInstance ) {
554+ Preconditions .checkNotNull (optimizerInstance , "token can not be null" );
555+ suspendingQueue .add (new OptimizerKeepingTask (optimizerInstance ));
556+ }
557+
558+ @ Override
559+ protected void processTask (OptimizerKeepingTask keepingTask ) {
560+ String token = keepingTask .getToken ();
561+ boolean isExpired = !keepingTask .tryKeeping ();
562+ if (isExpired ) {
563+ LOG .info ("Optimizer {} has been expired, unregister it" , keepingTask .getOptimizer ());
564+ unregisterOptimizer (token );
565+ }
566+ Optional .ofNullable (keepingTask .getQueue ())
567+ .ifPresent (
568+ queue ->
569+ queue
570+ .collectTasks (buildSuspendingPredication (authOptimizers .keySet ()))
571+ .forEach (task -> retryTask (task , queue )));
572+ if (!isExpired ) {
573+ LOG .debug ("Optimizer {} is being touched, keep it" , keepingTask .getOptimizer ());
574+ keepInTouch (keepingTask .getOptimizer ());
575+ }
576+ }
577+
544578 private void retryTask (TaskRuntime <?> task , OptimizingQueue queue ) {
545579 if (isTaskExecTimeout (task )) {
546580 LOG .warn (
@@ -629,4 +663,148 @@ void dispose() {
629663 scheduler .shutdown ();
630664 }
631665 }
666+
667+ private class OptimizerGroupKeepingTask implements Delayed {
668+
669+ private final String groupName ;
670+ private final long lastCheckTime ;
671+ private final int attempts ;
672+
673+ public OptimizerGroupKeepingTask (String groupName , int attempts ) {
674+ this .groupName = groupName ;
675+ this .lastCheckTime = System .currentTimeMillis ();
676+ this .attempts = attempts ;
677+ }
678+
679+ @ Override
680+ public long getDelay (@ NotNull TimeUnit unit ) {
681+ return unit .convert (
682+ lastCheckTime + groupMinParallelismCheckInterval * attempts - System .currentTimeMillis (),
683+ TimeUnit .MILLISECONDS );
684+ }
685+
686+ @ Override
687+ public int compareTo (@ NotNull Delayed o ) {
688+ OptimizerGroupKeepingTask another = (OptimizerGroupKeepingTask ) o ;
689+ return Long .compare (lastCheckTime , another .lastCheckTime );
690+ }
691+
692+ public int getMinParallelism (ResourceGroup resourceGroup ) {
693+ if (!resourceGroup
694+ .getProperties ()
695+ .containsKey (OptimizerProperties .OPTIMIZER_GROUP_MIN_PARALLELISM )) {
696+ return 0 ;
697+ }
698+ String minParallelism =
699+ resourceGroup .getProperties ().get (OptimizerProperties .OPTIMIZER_GROUP_MIN_PARALLELISM );
700+ try {
701+ return Integer .parseInt (minParallelism );
702+ } catch (Throwable t ) {
703+ LOG .warn ("Illegal minParallelism : {}, will use default value 0" , minParallelism , t );
704+ return 0 ;
705+ }
706+ }
707+
708+ public int tryKeeping (ResourceGroup resourceGroup ) {
709+ List <OptimizerInstance > optimizers = optimizerManager .listOptimizers (groupName );
710+ OptimizerResourceInfo optimizerResourceInfo = new OptimizerResourceInfo ();
711+ optimizers .forEach (
712+ e -> {
713+ optimizerResourceInfo .addOccupationCore (e .getThreadCount ());
714+ optimizerResourceInfo .addOccupationMemory (e .getMemoryMb ());
715+ });
716+ return getMinParallelism (resourceGroup ) - optimizerResourceInfo .getOccupationCore ();
717+ }
718+
719+ public ResourceGroup getResourceGroup () {
720+ OptimizingQueue optimizingQueue = optimizingQueueByGroup .get (groupName );
721+ if (optimizingQueue == null ) {
722+ return null ;
723+ }
724+ return optimizingQueue .getOptimizerGroup ();
725+ }
726+
727+ public String getGroupName () {
728+ return groupName ;
729+ }
730+
731+ public int getAttempts () {
732+ return attempts ;
733+ }
734+ }
735+
736+ /**
737+ * Optimizer group keeper thread responsible for monitoring resource group status and
738+ * automatically maintaining optimizer resources.
739+ */
740+ private class OptimizerGroupKeeper extends AbstractKeeper <OptimizerGroupKeepingTask > {
741+
742+ public OptimizerGroupKeeper (String threadName ) {
743+ super (threadName );
744+ }
745+
746+ public void keepInTouch (String groupName , int attempts ) {
747+ Preconditions .checkNotNull (groupName , "groupName can not be null" );
748+ Preconditions .checkArgument (attempts > 0 , "attempts must be greater than 0" );
749+ if (this .stopped ) {
750+ return ;
751+ }
752+ suspendingQueue .add (new OptimizerGroupKeepingTask (groupName , attempts ));
753+ }
754+
755+ @ Override
756+ protected void processTask (OptimizerGroupKeepingTask keepingTask ) {
757+ ResourceGroup resourceGroup = keepingTask .getResourceGroup ();
758+ if (resourceGroup == null ) {
759+ LOG .warn (
760+ "ResourceGroup:{} may have been deleted, stop keeping it" , keepingTask .getGroupName ());
761+ return ;
762+ }
763+
764+ int requiredCores = keepingTask .tryKeeping (resourceGroup );
765+ if (requiredCores <= 0 ) {
766+ LOG .debug (
767+ "The Resource Group:{} has sufficient resources, keep it" , resourceGroup .getName ());
768+ keepInTouch (resourceGroup .getName (), 1 );
769+ return ;
770+ }
771+
772+ if (keepingTask .getAttempts () > groupMaxKeepingAttempts ) {
773+ int minParallelism = keepingTask .getMinParallelism (resourceGroup );
774+ LOG .warn (
775+ "Resource Group:{}, creating optimizer {} times in a row, optimizers still below min-parallel:{}, will reset min-parallel to {}" ,
776+ resourceGroup .getName (),
777+ keepingTask .getAttempts (),
778+ minParallelism ,
779+ minParallelism - requiredCores );
780+ resourceGroup
781+ .getProperties ()
782+ .put (
783+ OptimizerProperties .OPTIMIZER_GROUP_MIN_PARALLELISM ,
784+ String .valueOf (minParallelism - requiredCores ));
785+ updateResourceGroup (resourceGroup );
786+ optimizerManager .updateResourceGroup (resourceGroup );
787+ keepInTouch (resourceGroup .getName (), 1 );
788+ return ;
789+ }
790+
791+ Resource resource =
792+ new Resource .Builder (
793+ resourceGroup .getContainer (), resourceGroup .getName (), ResourceType .OPTIMIZER )
794+ .setProperties (resourceGroup .getProperties ())
795+ .setThreadCount (requiredCores )
796+ .build ();
797+ ResourceContainer rc = Containers .get (resource .getContainerName ());
798+ try {
799+ ((AbstractOptimizerContainer ) rc ).requestResource (resource );
800+ optimizerManager .createResource (resource );
801+ } finally {
802+ keepInTouch (resourceGroup .getName (), keepingTask .getAttempts () + 1 );
803+ }
804+ LOG .info (
805+ "Resource Group:{} has insufficient resources, created an optimizer with parallelism of {}" ,
806+ resourceGroup .getName (),
807+ requiredCores );
808+ }
809+ }
632810}
0 commit comments