Skip to content

Commit 8627f33

Browse files
committed
refactor core wrapper related
1 parent 515793b commit 8627f33

File tree

11 files changed

+136
-269
lines changed

11 files changed

+136
-269
lines changed

common/src/main/java/org/dromara/dynamictp/common/entity/DtpExecutorProps.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,19 +55,6 @@ public class DtpExecutorProps extends TpExecutorProps {
5555
*/
5656
private String threadNamePrefix = "dtp";
5757

58-
/**
59-
* Whether to wait for scheduled tasks to complete on shutdown,
60-
* not interrupting running tasks and executing all tasks in the queue.
61-
*/
62-
private boolean waitForTasksToCompleteOnShutdown = true;
63-
64-
/**
65-
* The maximum number of seconds that this executor is supposed to block
66-
* on shutdown in order to wait for remaining tasks to complete their execution
67-
* before the rest of the container continues to shut down.
68-
*/
69-
private int awaitTerminationSeconds = 3;
70-
7158
/**
7259
* If pre start all core threads.
7360
*/

common/src/main/java/org/dromara/dynamictp/common/entity/TpExecutorProps.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,19 @@ public class TpExecutorProps {
107107
*/
108108
private boolean notifyEnabled = true;
109109

110+
/**
111+
* Whether to wait for scheduled tasks to complete on shutdown,
112+
* not interrupting running tasks and executing all tasks in the queue.
113+
*/
114+
private boolean waitForTasksToCompleteOnShutdown = true;
115+
116+
/**
117+
* The maximum number of seconds that this executor is supposed to block
118+
* on shutdown in order to wait for remaining tasks to complete their execution
119+
* before the rest of the container continues to shut down.
120+
*/
121+
private int awaitTerminationSeconds = 3;
122+
110123
/**
111124
* Task execute timeout, unit (ms).
112125
*/
@@ -130,7 +143,7 @@ public class TpExecutorProps {
130143
/**
131144
* Aware names.
132145
*/
133-
private List<String> awareNames;
146+
private Set<String> awareNames;
134147

135148
/**
136149
* check core param is inValid

core/src/main/java/org/dromara/dynamictp/core/DtpRegistry.java

Lines changed: 16 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -225,65 +225,40 @@ private static void doRefresh(ExecutorWrapper executorWrapper, DtpExecutorProps
225225
}
226226
// update queue
227227
updateQueueProps(executor, props);
228-
229-
if (executorWrapper.isDtpExecutor()) {
230-
doRefreshDtp(executorWrapper, props);
231-
return;
232-
}
233-
doRefreshCommon(executorWrapper, props);
228+
doRefreshWrapper(executorWrapper, props);
234229
}
235230

236-
private static void doRefreshCommon(ExecutorWrapper executorWrapper, DtpExecutorProps props) {
231+
private static void doRefreshWrapper(ExecutorWrapper executorWrapper, DtpExecutorProps props) {
237232

238233
if (StringUtils.isNotBlank(props.getThreadPoolAliasName())) {
239234
executorWrapper.setThreadPoolAliasName(props.getThreadPoolAliasName());
240235
}
241-
242236
ExecutorAdapter<?> executor = executorWrapper.getExecutor();
243237
// update reject handler
244-
String currentRejectHandlerType = executor.getRejectHandlerType();
245-
if (!Objects.equals(currentRejectHandlerType, props.getRejectedHandlerType())) {
238+
executorWrapper.setRejectEnhanced(props.isRejectEnhanced());
239+
if (!Objects.equals(executor.getRejectHandlerType(), props.getRejectedHandlerType())) {
246240
val rejectHandler = RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType());
247-
executor.setRejectedExecutionHandler(rejectHandler);
241+
executorWrapper.setRejectHandler(rejectHandler);
248242
}
243+
244+
// update task wrappers
249245
List<TaskWrapper> taskWrappers = TaskWrappers.getInstance().getByNames(props.getTaskWrapperNames());
250246
executorWrapper.setTaskWrappers(taskWrappers);
251247

248+
executorWrapper.setThreadPoolAliasName(props.getThreadPoolAliasName());
249+
executorWrapper.setNotifyItems(props.getNotifyItems());
250+
executorWrapper.setPlatformIds(props.getPlatformIds());
251+
executorWrapper.setNotifyEnabled(props.isNotifyEnabled());
252+
executorWrapper.setPreStartAllCoreThreads(props.isPreStartAllCoreThreads());
253+
executorWrapper.setWaitForTasksToCompleteOnShutdown(props.isWaitForTasksToCompleteOnShutdown());
254+
executorWrapper.setAwaitTerminationSeconds(props.getAwaitTerminationSeconds());
255+
executorWrapper.setAwareNames(props.getAwareNames());
256+
252257
// update notify related
253258
NotifyHelper.updateNotifyInfo(executorWrapper, props, dtpProperties.getPlatforms());
254-
// update aware related
255-
AwareManager.refresh(executorWrapper, props);
256-
}
257-
258-
private static void doRefreshDtp(ExecutorWrapper executorWrapper, DtpExecutorProps props) {
259259

260-
DtpExecutor executor = (DtpExecutor) executorWrapper.getExecutor();
261-
if (StringUtils.isNotBlank(props.getThreadPoolAliasName())) {
262-
executor.setThreadPoolAliasName(props.getThreadPoolAliasName());
263-
}
264-
// update reject handler
265-
executor.setRejectEnhanced(props.isRejectEnhanced());
266-
if (!Objects.equals(executor.getRejectHandlerType(), props.getRejectedHandlerType())) {
267-
executor.setRejectHandler(RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType()));
268-
}
269-
executor.setWaitForTasksToCompleteOnShutdown(props.isWaitForTasksToCompleteOnShutdown());
270-
executor.setAwaitTerminationSeconds(props.getAwaitTerminationSeconds());
271-
executor.setPreStartAllCoreThreads(props.isPreStartAllCoreThreads());
272-
List<TaskWrapper> taskWrappers = TaskWrappers.getInstance().getByNames(props.getTaskWrapperNames());
273-
executor.setTaskWrappers(taskWrappers);
274-
275-
// update notify related
276-
NotifyHelper.updateNotifyInfo(executor, props, dtpProperties.getPlatforms());
277260
// update aware related
278261
AwareManager.refresh(executorWrapper, props);
279-
updateWrapper(executorWrapper, executor);
280-
}
281-
282-
private static void updateWrapper(ExecutorWrapper executorWrapper, DtpExecutor executor) {
283-
executorWrapper.setThreadPoolAliasName(executor.getThreadPoolAliasName());
284-
executorWrapper.setNotifyItems(executor.getNotifyItems());
285-
executorWrapper.setPlatformIds(executor.getPlatformIds());
286-
executorWrapper.setNotifyEnabled(executor.isNotifyEnabled());
287262
}
288263

289264
/**

core/src/main/java/org/dromara/dynamictp/core/executor/DtpExecutor.java

Lines changed: 0 additions & 153 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import org.dromara.dynamictp.common.entity.NotifyItem;
2626
import org.dromara.dynamictp.core.aware.AwareManager;
2727
import org.dromara.dynamictp.core.aware.TaskEnhanceAware;
28-
import org.dromara.dynamictp.core.notifier.manager.NotifyHelper;
29-
import org.dromara.dynamictp.core.reject.RejectHandlerGetter;
3028
import org.dromara.dynamictp.core.spring.SpringExecutor;
3129
import org.dromara.dynamictp.core.support.ExecutorAdapter;
3230
import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper;
@@ -61,16 +59,6 @@ public class DtpExecutor extends ThreadPoolExecutor
6159
*/
6260
protected String threadPoolName;
6361

64-
/**
65-
* Simple Business alias Name of Dynamic ThreadPool. Use for notify.
66-
*/
67-
private String threadPoolAliasName;
68-
69-
/**
70-
* If enable notify.
71-
*/
72-
private boolean notifyEnabled = true;
73-
7462
/**
7563
* Notify items, see {@link NotifyItemEnum}.
7664
*/
@@ -91,54 +79,11 @@ public class DtpExecutor extends ThreadPoolExecutor
9179
*/
9280
private Set<String> pluginNames = Sets.newHashSet();
9381

94-
/**
95-
* Aware names.
96-
*/
97-
private Set<String> awareNames = Sets.newHashSet();
98-
99-
/**
100-
* If pre start all core threads.
101-
*/
102-
private boolean preStartAllCoreThreads;
103-
10482
/**
10583
* RejectHandler type.
10684
*/
10785
private String rejectHandlerType;
10886

109-
/**
110-
* If enhance reject.
111-
*/
112-
private boolean rejectEnhanced = true;
113-
114-
/**
115-
* for manual builder thread pools only
116-
*/
117-
private long runTimeout = 0;
118-
119-
/**
120-
* for manual builder thread pools only
121-
*/
122-
private boolean tryInterrupt = false;
123-
124-
/**
125-
* for manual builder thread pools only
126-
*/
127-
private long queueTimeout = 0;
128-
129-
/**
130-
* Whether to wait for scheduled tasks to complete on shutdown,
131-
* not interrupting running tasks and executing all tasks in the queue.
132-
*/
133-
protected boolean waitForTasksToCompleteOnShutdown = false;
134-
135-
/**
136-
* The maximum number of seconds that this executor is supposed to block
137-
* on shutdown in order to wait for remaining tasks to complete their execution
138-
* before the rest of the container continues to shut down.
139-
*/
140-
protected int awaitTerminationSeconds = 0;
141-
14287
public DtpExecutor(int corePoolSize,
14388
int maximumPoolSize,
14489
long keepAliveTime,
@@ -228,24 +173,6 @@ protected void terminated() {
228173
AwareManager.terminated(this);
229174
}
230175

231-
public void initialize() {
232-
NotifyHelper.initNotify(this);
233-
if (preStartAllCoreThreads) {
234-
prestartAllCoreThreads();
235-
}
236-
// reset reject handler in initialize phase according to rejectEnhanced
237-
setRejectHandler(RejectHandlerGetter.buildRejectedHandler(getRejectHandlerType()));
238-
}
239-
240-
public void setRejectHandler(RejectedExecutionHandler handler) {
241-
this.rejectHandlerType = handler.getClass().getSimpleName();
242-
if (!isRejectEnhanced()) {
243-
setRejectedExecutionHandler(handler);
244-
return;
245-
}
246-
setRejectedExecutionHandler(RejectHandlerGetter.getProxy(handler));
247-
}
248-
249176
private void tryPrintError(Runnable r, Throwable t) {
250177
if (Objects.nonNull(t)) {
251178
log.error("DynamicTp execute, thread {} throw exception, traceId {}",
@@ -277,22 +204,6 @@ public void setThreadPoolName(String threadPoolName) {
277204
this.threadPoolName = threadPoolName;
278205
}
279206

280-
public String getThreadPoolAliasName() {
281-
return threadPoolAliasName;
282-
}
283-
284-
public void setThreadPoolAliasName(String threadPoolAliasName) {
285-
this.threadPoolAliasName = threadPoolAliasName;
286-
}
287-
288-
public boolean isNotifyEnabled() {
289-
return notifyEnabled;
290-
}
291-
292-
public void setNotifyEnabled(boolean notifyEnabled) {
293-
this.notifyEnabled = notifyEnabled;
294-
}
295-
296207
public List<NotifyItem> getNotifyItems() {
297208
return notifyItems;
298209
}
@@ -327,30 +238,6 @@ public void setPluginNames(Set<String> pluginNames) {
327238
this.pluginNames = pluginNames;
328239
}
329240

330-
public Set<String> getAwareNames() {
331-
return awareNames;
332-
}
333-
334-
public void setAwareNames(Set<String> awareNames) {
335-
this.awareNames = awareNames;
336-
}
337-
338-
public boolean isPreStartAllCoreThreads() {
339-
return preStartAllCoreThreads;
340-
}
341-
342-
public void setPreStartAllCoreThreads(boolean preStartAllCoreThreads) {
343-
this.preStartAllCoreThreads = preStartAllCoreThreads;
344-
}
345-
346-
public boolean isRejectEnhanced() {
347-
return rejectEnhanced;
348-
}
349-
350-
public void setRejectEnhanced(boolean rejectEnhanced) {
351-
this.rejectEnhanced = rejectEnhanced;
352-
}
353-
354241
@Override
355242
public String getRejectHandlerType() {
356243
return rejectHandlerType;
@@ -360,46 +247,6 @@ public void setRejectHandlerType(String rejectHandlerType) {
360247
this.rejectHandlerType = rejectHandlerType;
361248
}
362249

363-
public long getRunTimeout() {
364-
return runTimeout;
365-
}
366-
367-
public void setRunTimeout(long runTimeout) {
368-
this.runTimeout = runTimeout;
369-
}
370-
371-
public boolean isTryInterrupt() {
372-
return tryInterrupt;
373-
}
374-
375-
public void setTryInterrupt(boolean tryInterrupt) {
376-
this.tryInterrupt = tryInterrupt;
377-
}
378-
379-
public long getQueueTimeout() {
380-
return queueTimeout;
381-
}
382-
383-
public void setQueueTimeout(long queueTimeout) {
384-
this.queueTimeout = queueTimeout;
385-
}
386-
387-
public boolean isWaitForTasksToCompleteOnShutdown() {
388-
return waitForTasksToCompleteOnShutdown;
389-
}
390-
391-
public void setWaitForTasksToCompleteOnShutdown(boolean waitForTasksToCompleteOnShutdown) {
392-
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
393-
}
394-
395-
public int getAwaitTerminationSeconds() {
396-
return awaitTerminationSeconds;
397-
}
398-
399-
public void setAwaitTerminationSeconds(int awaitTerminationSeconds) {
400-
this.awaitTerminationSeconds = awaitTerminationSeconds;
401-
}
402-
403250
/**
404251
* In order for the field can be assigned by reflection.
405252
*

core/src/main/java/org/dromara/dynamictp/core/executor/ExecutorType.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
package org.dromara.dynamictp.core.executor;
1919

2020
import lombok.AllArgsConstructor;
21-
import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor;
2221
import lombok.Getter;
22+
import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor;
2323
import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor;
24+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
25+
26+
import java.util.concurrent.ThreadPoolExecutor;
2427

2528
/**
2629
* ExecutorType related
@@ -37,6 +40,16 @@ public enum ExecutorType {
3740
*/
3841
COMMON("common", DtpExecutor.class),
3942

43+
/**
44+
* Common juc executor type.
45+
*/
46+
COMMON_JUC("common-juc", ThreadPoolExecutor.class),
47+
48+
/**
49+
* Common spring executor type.
50+
*/
51+
COMMON_SPRING("common-spring", ThreadPoolTaskExecutor.class),
52+
4053
/**
4154
* Eager executor type.
4255
*/

core/src/main/java/org/dromara/dynamictp/core/notifier/manager/NotifyHelper.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ public static void updateNotifyInfo(DtpExecutor executor, DtpExecutorProps props
185185
allNotifyItems);
186186
executor.setNotifyItems(allNotifyItems);
187187
executor.setPlatformIds(props.getPlatformIds());
188-
executor.setNotifyEnabled(props.isNotifyEnabled());
189188
}
190189

191190
private static void refreshNotify(String poolName,

0 commit comments

Comments
 (0)