3737import org .apache .hugegraph .util .Log ;
3838import org .slf4j .Logger ;
3939
40+ /**
41+ * Central task management system that coordinates task scheduling and execution.
42+ * Manages task schedulers for different graphs and handles role-based execution.
43+ * <p>
44+ * Note: The local master-worker mechanism will be deprecated in version 1.7
45+ * (configuration has been removed from config files).
46+ */
4047public final class TaskManager {
4148
4249 private static final Logger LOG = Log .logger (TaskManager .class );
4350
4451 public static final String TASK_WORKER_PREFIX = "task-worker" ;
4552 public static final String TASK_WORKER = TASK_WORKER_PREFIX + "-%d" ;
4653 public static final String TASK_DB_WORKER = "task-db-worker-%d" ;
47- public static final String SERVER_INFO_DB_WORKER =
48- "server-info-db-worker-%d" ;
54+ public static final String SERVER_INFO_DB_WORKER = "server-info-db-worker-%d" ;
4955 public static final String TASK_SCHEDULER = "task-scheduler-%d" ;
5056
5157 public static final String OLAP_TASK_WORKER = "olap-task-worker-%d" ;
5258 public static final String SCHEMA_TASK_WORKER = "schema-task-worker-%d" ;
5359 public static final String EPHEMERAL_TASK_WORKER = "ephemeral-task-worker-%d" ;
5460 public static final String DISTRIBUTED_TASK_SCHEDULER = "distributed-scheduler-%d" ;
5561
56- protected static final long SCHEDULE_PERIOD = 1000L ; // unit ms
62+ static final long SCHEDULE_PERIOD = 1000L ; // unit ms
5763 private static final long TX_CLOSE_TIMEOUT = 30L ; // unit s
5864 private static final int THREADS = 4 ;
5965 private static final TaskManager MANAGER = new TaskManager (THREADS );
@@ -87,17 +93,13 @@ private TaskManager(int pool) {
8793 this .serverInfoDbExecutor = ExecutorUtil .newFixedThreadPool (
8894 1 , SERVER_INFO_DB_WORKER );
8995
90- this .schemaTaskExecutor = ExecutorUtil .newFixedThreadPool (pool ,
91- SCHEMA_TASK_WORKER );
92- this .olapTaskExecutor = ExecutorUtil .newFixedThreadPool (pool ,
93- OLAP_TASK_WORKER );
94- this .ephemeralTaskExecutor = ExecutorUtil .newFixedThreadPool (pool ,
95- EPHEMERAL_TASK_WORKER );
96+ this .schemaTaskExecutor = ExecutorUtil .newFixedThreadPool (pool , SCHEMA_TASK_WORKER );
97+ this .olapTaskExecutor = ExecutorUtil .newFixedThreadPool (pool , OLAP_TASK_WORKER );
98+ this .ephemeralTaskExecutor = ExecutorUtil .newFixedThreadPool (pool , EPHEMERAL_TASK_WORKER );
9699 this .distributedSchedulerExecutor =
97- ExecutorUtil .newPausableScheduledThreadPool (1 ,
98- DISTRIBUTED_TASK_SCHEDULER );
100+ ExecutorUtil .newPausableScheduledThreadPool (1 , DISTRIBUTED_TASK_SCHEDULER );
99101
100- // For schedule task to run, just one thread is ok
102+ // For a schedule task to run, just one thread is ok
101103 this .schedulerExecutor = ExecutorUtil .newPausableScheduledThreadPool (
102104 1 , TASK_SCHEDULER );
103105 // Start after 10x period time waiting for HugeGraphServer startup
@@ -111,7 +113,9 @@ public void addScheduler(HugeGraphParams graph) {
111113 E .checkArgumentNotNull (graph , "The graph can't be null" );
112114 LOG .info ("Use {} as the scheduler of graph ({})" ,
113115 graph .schedulerType (), graph .name ());
114- // TODO: If the current service is bound to a specified non-DEFAULT graph space, the graph outside of the current graph space will no longer create task schedulers (graph space)
116+ // TODO: If the current service is bound to a specified non-DEFAULT graph space, the
117+ // graph outside of the current graph space will no longer create task schedulers (graph
118+ // space)
115119 switch (graph .schedulerType ()) {
116120 case "distributed" : {
117121 TaskScheduler scheduler =
@@ -194,7 +198,7 @@ private void closeTaskTx(HugeGraphParams graph) {
194198
195199 private void closeSchedulerTx (HugeGraphParams graph ) {
196200 final Callable <Void > closeTx = () -> {
197- // Do close-tx for current thread
201+ // Do close-tx for the current thread
198202 graph .closeTx ();
199203 // Let other threads run
200204 Thread .yield ();
@@ -209,7 +213,7 @@ private void closeSchedulerTx(HugeGraphParams graph) {
209213
210214 private void closeDistributedSchedulerTx (HugeGraphParams graph ) {
211215 final Callable <Void > closeTx = () -> {
212- // Do close-tx for current thread
216+ // Do close-tx for the current thread
213217 graph .closeTx ();
214218 // Let other threads run
215219 Thread .yield ();
@@ -252,8 +256,7 @@ public void shutdown(long timeout) {
252256 if (!this .schedulerExecutor .isShutdown ()) {
253257 this .schedulerExecutor .shutdown ();
254258 try {
255- terminated = this .schedulerExecutor .awaitTermination (timeout ,
256- unit );
259+ terminated = this .schedulerExecutor .awaitTermination (timeout , unit );
257260 } catch (Throwable e ) {
258261 ex = e ;
259262 }
@@ -262,8 +265,7 @@ public void shutdown(long timeout) {
262265 if (terminated && !this .distributedSchedulerExecutor .isShutdown ()) {
263266 this .distributedSchedulerExecutor .shutdown ();
264267 try {
265- terminated = this .distributedSchedulerExecutor .awaitTermination (timeout ,
266- unit );
268+ terminated = this .distributedSchedulerExecutor .awaitTermination (timeout , unit );
267269 } catch (Throwable e ) {
268270 ex = e ;
269271 }
@@ -272,8 +274,7 @@ public void shutdown(long timeout) {
272274 if (terminated && !this .taskExecutor .isShutdown ()) {
273275 this .taskExecutor .shutdown ();
274276 try {
275- terminated = this .taskExecutor .awaitTermination (timeout ,
276- unit );
277+ terminated = this .taskExecutor .awaitTermination (timeout , unit );
277278 } catch (Throwable e ) {
278279 ex = e ;
279280 }
@@ -282,8 +283,7 @@ public void shutdown(long timeout) {
282283 if (terminated && !this .serverInfoDbExecutor .isShutdown ()) {
283284 this .serverInfoDbExecutor .shutdown ();
284285 try {
285- terminated = this .serverInfoDbExecutor .awaitTermination (timeout ,
286- unit );
286+ terminated = this .serverInfoDbExecutor .awaitTermination (timeout , unit );
287287 } catch (Throwable e ) {
288288 ex = e ;
289289 }
@@ -292,8 +292,7 @@ public void shutdown(long timeout) {
292292 if (terminated && !this .taskDbExecutor .isShutdown ()) {
293293 this .taskDbExecutor .shutdown ();
294294 try {
295- terminated = this .taskDbExecutor .awaitTermination (timeout ,
296- unit );
295+ terminated = this .taskDbExecutor .awaitTermination (timeout , unit );
297296 } catch (Throwable e ) {
298297 ex = e ;
299298 }
@@ -302,8 +301,7 @@ public void shutdown(long timeout) {
302301 if (terminated && !this .ephemeralTaskExecutor .isShutdown ()) {
303302 this .ephemeralTaskExecutor .shutdown ();
304303 try {
305- terminated = this .ephemeralTaskExecutor .awaitTermination (timeout ,
306- unit );
304+ terminated = this .ephemeralTaskExecutor .awaitTermination (timeout , unit );
307305 } catch (Throwable e ) {
308306 ex = e ;
309307 }
@@ -312,8 +310,7 @@ public void shutdown(long timeout) {
312310 if (terminated && !this .schemaTaskExecutor .isShutdown ()) {
313311 this .schemaTaskExecutor .shutdown ();
314312 try {
315- terminated = this .schemaTaskExecutor .awaitTermination (timeout ,
316- unit );
313+ terminated = this .schemaTaskExecutor .awaitTermination (timeout , unit );
317314 } catch (Throwable e ) {
318315 ex = e ;
319316 }
@@ -322,8 +319,7 @@ public void shutdown(long timeout) {
322319 if (terminated && !this .olapTaskExecutor .isShutdown ()) {
323320 this .olapTaskExecutor .shutdown ();
324321 try {
325- terminated = this .olapTaskExecutor .awaitTermination (timeout ,
326- unit );
322+ terminated = this .olapTaskExecutor .awaitTermination (timeout , unit );
327323 } catch (Throwable e ) {
328324 ex = e ;
329325 }
@@ -360,8 +356,7 @@ public void onAsRoleMaster() {
360356 if (serverInfoManager != null ) {
361357 serverInfoManager .changeServerRole (NodeRole .MASTER );
362358 } else {
363- LOG .warn ("ServerInfoManager is null for graph {}" ,
364- entry .graphName ());
359+ LOG .warn ("ServerInfoManager is null for graph {}" , entry .graphName ());
365360 }
366361 }
367362 } catch (Throwable e ) {
@@ -377,8 +372,7 @@ public void onAsRoleWorker() {
377372 if (serverInfoManager != null ) {
378373 serverInfoManager .changeServerRole (NodeRole .WORKER );
379374 } else {
380- LOG .warn ("ServerInfoManager is null for graph {}" ,
381- entry .graphName ());
375+ LOG .warn ("ServerInfoManager is null for graph {}" , entry .graphName ());
382376 }
383377 }
384378 } catch (Throwable e ) {
@@ -387,8 +381,8 @@ public void onAsRoleWorker() {
387381 }
388382 }
389383
390- protected void notifyNewTask (HugeTask <?> task ) {
391- Queue <Runnable > queue = (( ThreadPoolExecutor ) this .schedulerExecutor )
384+ void notifyNewTask (HugeTask <?> task ) {
385+ Queue <Runnable > queue = this .schedulerExecutor
392386 .getQueue ();
393387 if (queue .size () <= 1 ) {
394388 /*
@@ -406,10 +400,9 @@ private void scheduleOrExecuteJob() {
406400 // Called by scheduler timer
407401 try {
408402 for (TaskScheduler entry : this .schedulers .values ()) {
409- TaskScheduler scheduler = entry ;
410- // Maybe other thread close&remove scheduler at the same time
411- synchronized (scheduler ) {
412- this .scheduleOrExecuteJobForGraph (scheduler );
403+ // Maybe other threads close&remove scheduler at the same time
404+ synchronized (entry ) {
405+ this .scheduleOrExecuteJobForGraph (entry );
413406 }
414407 }
415408 } catch (Throwable e ) {
0 commit comments