1818
1919package org .apache .hadoop .hive .metastore .leader ;
2020
21+ import com .cronutils .utils .VisibleForTesting ;
22+ import com .google .common .util .concurrent .ThreadFactoryBuilder ;
23+
2124import org .apache .hadoop .conf .Configuration ;
2225import org .apache .hadoop .hive .metastore .HiveMetaStore ;
2326import org .apache .hadoop .hive .metastore .MetastoreTaskThread ;
24- import org .apache .hadoop .hive .metastore .ThreadPool ;
2527import org .apache .hadoop .hive .metastore .conf .MetastoreConf ;
2628import org .apache .hadoop .hive .metastore .txn .service .CompactionHouseKeeperService ;
2729import org .apache .hadoop .hive .metastore .utils .JavaUtils ;
2830
2931import java .util .ArrayList ;
3032import java .util .Collection ;
3133import java .util .List ;
34+ import java .util .concurrent .Executors ;
35+ import java .util .concurrent .ScheduledExecutorService ;
36+ import java .util .concurrent .ThreadFactory ;
3237import java .util .concurrent .TimeUnit ;
3338
3439import static java .util .Objects .requireNonNull ;
@@ -38,7 +43,7 @@ public class HouseKeepingTasks implements LeaderElection.LeadershipStateListener
3843 private final Configuration configuration ;
3944
4045 // shut down pool when new leader is selected
41- private ThreadPool metastoreTaskThreadPool ;
46+ private ScheduledExecutorService metastoreTaskThreadPool ;
4247
4348 private boolean runOnlyRemoteTasks ;
4449
@@ -94,30 +99,24 @@ public void takeLeadership(LeaderElection election) throws Exception {
9499 throw new IllegalStateException ("There should be no running tasks before taking the leadership!" );
95100 }
96101 runningTasks = new ArrayList <>();
97- metastoreTaskThreadPool = ThreadPool .initialize (configuration );
102+ ThreadFactory threadFactory = new ThreadFactoryBuilder ().setDaemon (true )
103+ .setNameFormat ("Metastore Scheduled Worker(" + election .getName () + ") %d" ).build ();
104+ final List <MetastoreTaskThread > tasks ;
98105 if (!runOnlyRemoteTasks ) {
99- List <MetastoreTaskThread > alwaysTasks = new ArrayList <>(getAlwaysTasks ());
100- for (MetastoreTaskThread task : alwaysTasks ) {
101- task .setConf (configuration );
102- task .enforceMutex (election .enforceMutex ());
103- long freq = task .runFrequency (TimeUnit .MILLISECONDS );
104- // For backwards compatibility, since some threads used to be hard coded but only run if
105- // frequency was > 0
106- if (freq > 0 ) {
107- runningTasks .add (task );
108- metastoreTaskThreadPool .getPool ().scheduleAtFixedRate (task , freq , freq , TimeUnit .MILLISECONDS );
109- }
110- }
106+ tasks = new ArrayList <>(getAlwaysTasks ());
111107 } else {
112- List <MetastoreTaskThread > remoteOnlyTasks = new ArrayList <>(getRemoteOnlyTasks ());
113- for (MetastoreTaskThread task : remoteOnlyTasks ) {
114- task .setConf (configuration );
115- task .enforceMutex (election .enforceMutex ());
116- long freq = task .runFrequency (TimeUnit .MILLISECONDS );
117- if (freq > 0 ) {
118- runningTasks .add (task );
119- metastoreTaskThreadPool .getPool ().scheduleAtFixedRate (task , freq , freq , TimeUnit .MILLISECONDS );
120- }
108+ tasks = new ArrayList <>(getRemoteOnlyTasks ());
109+ }
110+ int poolSize = Math .min (MetastoreConf .getIntVar (configuration ,
111+ MetastoreConf .ConfVars .THREAD_POOL_SIZE ), tasks .size ());
112+ metastoreTaskThreadPool = Executors .newScheduledThreadPool (poolSize , threadFactory );
113+ for (MetastoreTaskThread task : tasks ) {
114+ task .setConf (configuration );
115+ task .enforceMutex (election .enforceMutex ());
116+ long freq = task .runFrequency (TimeUnit .MILLISECONDS );
117+ if (freq > 0 ) {
118+ runningTasks .add (task );
119+ metastoreTaskThreadPool .scheduleAtFixedRate (task , freq , freq , TimeUnit .MILLISECONDS );
121120 }
122121 }
123122
@@ -141,4 +140,8 @@ public void lossLeadership(LeaderElection election) throws Exception {
141140 }
142141 }
143142
143+ @ VisibleForTesting
144+ public ScheduledExecutorService getExecutorService () {
145+ return metastoreTaskThreadPool ;
146+ }
144147}
0 commit comments