Skip to content

Commit 74f8a5d

Browse files
committed
refactor core wrapper related
1 parent 8627f33 commit 74f8a5d

File tree

5 files changed

+83
-132
lines changed

5 files changed

+83
-132
lines changed

core/src/main/java/org/dromara/dynamictp/core/DtpRegistry.java

Lines changed: 49 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import java.util.concurrent.Executor;
5656
import java.util.stream.Collectors;
5757

58-
import static java.util.stream.Collectors.toList;
5958
import static org.dromara.dynamictp.common.constant.DynamicTpConst.M_1;
6059
import static org.dromara.dynamictp.common.constant.DynamicTpConst.PROPERTIES_CHANGE_SHOW_STYLE;
6160

@@ -113,21 +112,6 @@ public static void registerExecutor(ExecutorWrapper wrapper, String source) {
113112
EXECUTOR_REGISTRY.putIfAbsent(wrapper.getThreadPoolName(), wrapper);
114113
}
115114

116-
/**
117-
* Get DtpExecutor by thread pool name.
118-
*
119-
* @param name thread pool name
120-
* @return the managed DtpExecutor instance
121-
*/
122-
public static DtpExecutor getDtpExecutor(String name) {
123-
val executorWrapper = getExecutorWrapper(name);
124-
if (!executorWrapper.isDtpExecutor()) {
125-
log.error("The specified executor is not a DtpExecutor, name: {}", name);
126-
throw new DtpException("The specified executor is not a DtpExecutor, name: " + name);
127-
}
128-
return (DtpExecutor) executorWrapper.getExecutor();
129-
}
130-
131115
/**
132116
* Get executor by thread pool name.
133117
*
@@ -158,6 +142,21 @@ public static ExecutorWrapper getExecutorWrapper(String name) {
158142
return executorWrapper;
159143
}
160144

145+
/**
146+
* Get DtpExecutor by thread pool name.
147+
*
148+
* @param name thread pool name
149+
* @return the managed DtpExecutor instance
150+
*/
151+
public static DtpExecutor getDtpExecutor(String name) {
152+
val executorWrapper = getExecutorWrapper(name);
153+
if (!executorWrapper.isDtpExecutor()) {
154+
log.error("The specified executor is not a DtpExecutor, name: {}", name);
155+
throw new DtpException("The specified executor is not a DtpExecutor, name: " + name);
156+
}
157+
return (DtpExecutor) executorWrapper.getExecutor();
158+
}
159+
161160
/**
162161
* Refresh while the listening configuration changed.
163162
*
@@ -223,44 +222,10 @@ private static void doRefresh(ExecutorWrapper executorWrapper, DtpExecutorProps
223222
if (!Objects.equals(executor.allowsCoreThreadTimeOut(), props.isAllowCoreThreadTimeOut())) {
224223
executor.allowCoreThreadTimeOut(props.isAllowCoreThreadTimeOut());
225224
}
226-
// update queue
227225
updateQueueProps(executor, props);
228226
doRefreshWrapper(executorWrapper, props);
229227
}
230228

231-
private static void doRefreshWrapper(ExecutorWrapper executorWrapper, DtpExecutorProps props) {
232-
233-
if (StringUtils.isNotBlank(props.getThreadPoolAliasName())) {
234-
executorWrapper.setThreadPoolAliasName(props.getThreadPoolAliasName());
235-
}
236-
ExecutorAdapter<?> executor = executorWrapper.getExecutor();
237-
// update reject handler
238-
executorWrapper.setRejectEnhanced(props.isRejectEnhanced());
239-
if (!Objects.equals(executor.getRejectHandlerType(), props.getRejectedHandlerType())) {
240-
val rejectHandler = RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType());
241-
executorWrapper.setRejectHandler(rejectHandler);
242-
}
243-
244-
// update task wrappers
245-
List<TaskWrapper> taskWrappers = TaskWrappers.getInstance().getByNames(props.getTaskWrapperNames());
246-
executorWrapper.setTaskWrappers(taskWrappers);
247-
248-
executorWrapper.setThreadPoolAliasName(props.getThreadPoolAliasName());
249-
executorWrapper.setNotifyItems(props.getNotifyItems());
250-
executorWrapper.setPlatformIds(props.getPlatformIds());
251-
executorWrapper.setNotifyEnabled(props.isNotifyEnabled());
252-
executorWrapper.setPreStartAllCoreThreads(props.isPreStartAllCoreThreads());
253-
executorWrapper.setWaitForTasksToCompleteOnShutdown(props.isWaitForTasksToCompleteOnShutdown());
254-
executorWrapper.setAwaitTerminationSeconds(props.getAwaitTerminationSeconds());
255-
executorWrapper.setAwareNames(props.getAwareNames());
256-
257-
// update notify related
258-
NotifyHelper.updateNotifyInfo(executorWrapper, props, dtpProperties.getPlatforms());
259-
260-
// update aware related
261-
AwareManager.refresh(executorWrapper, props);
262-
}
263-
264229
/**
265230
* Why does it seem so complicated to handle this?
266231
* Although JDK9 solves this bug, we need to ensure that corePoolSize is less than or equal to maximumPoolSize,
@@ -306,6 +271,39 @@ private static void updateQueueProps(ExecutorAdapter<?> executor, DtpExecutorPro
306271
props.getThreadPoolName(), blockingQueue.getClass().getSimpleName());
307272
}
308273

274+
private static void doRefreshWrapper(ExecutorWrapper executorWrapper, DtpExecutorProps props) {
275+
276+
if (StringUtils.isNotBlank(props.getThreadPoolAliasName())) {
277+
executorWrapper.setThreadPoolAliasName(props.getThreadPoolAliasName());
278+
}
279+
ExecutorAdapter<?> executor = executorWrapper.getExecutor();
280+
// update reject handler
281+
executorWrapper.setRejectEnhanced(props.isRejectEnhanced());
282+
if (!Objects.equals(executor.getRejectHandlerType(), props.getRejectedHandlerType())) {
283+
val rejectHandler = RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType());
284+
executorWrapper.setRejectHandler(rejectHandler);
285+
}
286+
287+
// update task wrappers
288+
List<TaskWrapper> taskWrappers = TaskWrappers.getInstance().getByNames(props.getTaskWrapperNames());
289+
executorWrapper.setTaskWrappers(taskWrappers);
290+
291+
executorWrapper.setThreadPoolAliasName(props.getThreadPoolAliasName());
292+
executorWrapper.setNotifyItems(props.getNotifyItems());
293+
executorWrapper.setPlatformIds(props.getPlatformIds());
294+
executorWrapper.setNotifyEnabled(props.isNotifyEnabled());
295+
executorWrapper.setPreStartAllCoreThreads(props.isPreStartAllCoreThreads());
296+
executorWrapper.setWaitForTasksToCompleteOnShutdown(props.isWaitForTasksToCompleteOnShutdown());
297+
executorWrapper.setAwaitTerminationSeconds(props.getAwaitTerminationSeconds());
298+
executorWrapper.setAwareNames(props.getAwareNames());
299+
300+
// update notify related
301+
NotifyHelper.updateNotifyInfo(executorWrapper, props, dtpProperties.getPlatforms());
302+
303+
// update aware related
304+
AwareManager.refresh(executorWrapper, props);
305+
}
306+
309307
@Override
310308
protected void onContextRefreshedEvent(ContextRefreshedEvent event) {
311309
val executors = Optional.ofNullable(dtpProperties.getExecutors()).orElse(Collections.emptyList());
@@ -314,15 +312,10 @@ protected void onContextRefreshedEvent(ContextRefreshedEvent event) {
314312
remoteExecutors = executors.stream()
315313
.map(DtpExecutorProps::getThreadPoolName)
316314
.collect(Collectors.toSet());
315+
executors.forEach(DtpRegistry::refresh);
317316
}
318317
val registeredExecutors = Sets.newHashSet(EXECUTOR_REGISTRY.keySet());
319318
val localExecutors = CollectionUtils.subtract(registeredExecutors, remoteExecutors);
320-
321-
// refresh just for non-dtp executors
322-
val nonDtpExecutors = executors.stream().filter(e -> !e.isDtp()).collect(toList());
323-
if (CollectionUtils.isNotEmpty(nonDtpExecutors)) {
324-
nonDtpExecutors.forEach(DtpRegistry::refresh);
325-
}
326319
log.info("DtpRegistry has been initialized, remote executors: {}, local executors: {}",
327320
remoteExecutors, localExecutors);
328321
}

core/src/main/java/org/dromara/dynamictp/core/notifier/manager/NotifyHelper.java

Lines changed: 20 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,12 @@
2424
import lombok.val;
2525
import org.apache.commons.collections4.CollectionUtils;
2626
import org.dromara.dynamictp.common.em.NotifyItemEnum;
27-
import org.dromara.dynamictp.common.entity.DtpExecutorProps;
2827
import org.dromara.dynamictp.common.entity.NotifyItem;
2928
import org.dromara.dynamictp.common.entity.NotifyPlatform;
3029
import org.dromara.dynamictp.common.entity.TpExecutorProps;
3130
import org.dromara.dynamictp.common.properties.DtpProperties;
3231
import org.dromara.dynamictp.common.spring.ApplicationContextHolder;
3332
import org.dromara.dynamictp.common.util.StreamUtil;
34-
import org.dromara.dynamictp.core.executor.DtpExecutor;
3533
import org.dromara.dynamictp.core.support.ExecutorWrapper;
3634

3735
import java.util.Collection;
@@ -110,26 +108,6 @@ public static Optional<NotifyItem> getNotifyItem(ExecutorWrapper executor, Notif
110108
.findFirst();
111109
}
112110

113-
public static void fillPlatforms(List<String> platformIds,
114-
List<NotifyPlatform> platforms,
115-
List<NotifyItem> notifyItems) {
116-
if (CollectionUtils.isEmpty(platforms) || CollectionUtils.isEmpty(notifyItems)) {
117-
return;
118-
}
119-
List<String> globalPlatformIds = StreamUtil.fetchProperty(platforms, NotifyPlatform::getPlatformId);
120-
// notifyItem > executor > global
121-
notifyItems.forEach(n -> {
122-
if (CollectionUtils.isNotEmpty(n.getPlatformIds())) {
123-
// intersection of notifyItem and global
124-
n.setPlatformIds((List<String>) CollectionUtils.intersection(globalPlatformIds, n.getPlatformIds()));
125-
} else if (CollectionUtils.isNotEmpty(platformIds)) {
126-
n.setPlatformIds((List<String>) CollectionUtils.intersection(globalPlatformIds, platformIds));
127-
} else {
128-
n.setPlatformIds(globalPlatformIds);
129-
}
130-
});
131-
}
132-
133111
public static Optional<NotifyPlatform> getPlatform(String platformId) {
134112
Map<String, NotifyPlatform> platformMap = getAllPlatforms();
135113
return Optional.ofNullable(platformMap.get(platformId));
@@ -143,23 +121,6 @@ public static Map<String, NotifyPlatform> getAllPlatforms() {
143121
return StreamUtil.toMap(dtpProperties.getPlatforms(), NotifyPlatform::getPlatformId);
144122
}
145123

146-
public static void initNotify(DtpExecutor executor) {
147-
val dtpProperties = ApplicationContextHolder.getBean(DtpProperties.class);
148-
val platforms = dtpProperties.getPlatforms();
149-
if (CollectionUtils.isEmpty(platforms)) {
150-
executor.setNotifyItems(Lists.newArrayList());
151-
executor.setPlatformIds(Lists.newArrayList());
152-
log.warn("DynamicTp notify, no notify platforms configured for [{}]", executor.getThreadPoolName());
153-
return;
154-
}
155-
if (CollectionUtils.isEmpty(executor.getNotifyItems())) {
156-
log.warn("DynamicTp notify, no notify items configured for [{}]", executor.getThreadPoolName());
157-
return;
158-
}
159-
fillPlatforms(executor.getPlatformIds(), platforms, executor.getNotifyItems());
160-
AlarmManager.initAlarm(executor.getThreadPoolName(), executor.getNotifyItems());
161-
}
162-
163124
public static void updateNotifyInfo(ExecutorWrapper executorWrapper,
164125
TpExecutorProps props,
165126
List<NotifyPlatform> platforms) {
@@ -175,18 +136,6 @@ public static void updateNotifyInfo(ExecutorWrapper executorWrapper,
175136
executorWrapper.setNotifyEnabled(props.isNotifyEnabled());
176137
}
177138

178-
public static void updateNotifyInfo(DtpExecutor executor, DtpExecutorProps props, List<NotifyPlatform> platforms) {
179-
// update notify items
180-
val allNotifyItems = mergeAllNotifyItems(props.getNotifyItems());
181-
refreshNotify(executor.getThreadPoolName(),
182-
props.getPlatformIds(),
183-
platforms,
184-
executor.getNotifyItems(),
185-
allNotifyItems);
186-
executor.setNotifyItems(allNotifyItems);
187-
executor.setPlatformIds(props.getPlatformIds());
188-
}
189-
190139
private static void refreshNotify(String poolName,
191140
List<String> platformIds,
192141
List<NotifyPlatform> platforms,
@@ -202,4 +151,24 @@ private static void refreshNotify(String poolName,
202151
AlarmManager.initAlarm(poolName, x);
203152
});
204153
}
154+
155+
private static void fillPlatforms(List<String> platformIds,
156+
List<NotifyPlatform> platforms,
157+
List<NotifyItem> notifyItems) {
158+
if (CollectionUtils.isEmpty(platforms) || CollectionUtils.isEmpty(notifyItems)) {
159+
return;
160+
}
161+
List<String> globalPlatformIds = StreamUtil.fetchProperty(platforms, NotifyPlatform::getPlatformId);
162+
// notifyItem > executor > global
163+
notifyItems.forEach(n -> {
164+
if (CollectionUtils.isNotEmpty(n.getPlatformIds())) {
165+
// intersection of notifyItem and global
166+
n.setPlatformIds((List<String>) CollectionUtils.intersection(globalPlatformIds, n.getPlatformIds()));
167+
} else if (CollectionUtils.isNotEmpty(platformIds)) {
168+
n.setPlatformIds((List<String>) CollectionUtils.intersection(globalPlatformIds, platformIds));
169+
} else {
170+
n.setPlatformIds(globalPlatformIds);
171+
}
172+
});
173+
}
205174
}

core/src/main/java/org/dromara/dynamictp/core/support/DtpLifecycleSupport.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.dromara.dynamictp.core.support;
1919

2020
import lombok.extern.slf4j.Slf4j;
21-
import org.dromara.dynamictp.core.executor.DtpExecutor;
2221
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
2322

2423
import java.util.Objects;
@@ -56,10 +55,8 @@ public static void initialize(ExecutorWrapper executorWrapper) {
5655
*/
5756
public static void destroy(ExecutorWrapper executorWrapper) {
5857
ExecutorService executorService = null;
59-
if (executorWrapper.isDtpExecutor()) {
60-
executorService = (DtpExecutor) executorWrapper.getExecutor();
61-
} else if (executorWrapper.isThreadPoolExecutor()) {
62-
executorService = ((ThreadPoolExecutorAdapter) executorWrapper.getExecutor()).getOriginal();
58+
if (executorWrapper.isExecutorService()) {
59+
executorService = (ExecutorService) executorWrapper.getExecutor().getOriginal();
6360
}
6461
if (Objects.isNull(executorService)) {
6562
return;

core/src/main/java/org/dromara/dynamictp/core/support/ExecutorWrapper.java

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@
2727
import org.dromara.dynamictp.core.executor.DtpExecutor;
2828
import org.dromara.dynamictp.core.notifier.capture.CapturedExecutor;
2929
import org.dromara.dynamictp.core.notifier.manager.AlarmManager;
30-
import org.dromara.dynamictp.core.notifier.manager.NotifyHelper;
3130
import org.dromara.dynamictp.core.reject.RejectHandlerGetter;
3231
import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper;
3332
import org.springframework.beans.BeanUtils;
3433

3534
import java.util.List;
3635
import java.util.Set;
3736
import java.util.concurrent.Executor;
37+
import java.util.concurrent.ExecutorService;
3838
import java.util.concurrent.RejectedExecutionHandler;
3939
import java.util.concurrent.ThreadPoolExecutor;
4040

@@ -190,19 +190,15 @@ public ExecutorWrapper capture() {
190190
* Initialize.
191191
*/
192192
public void initialize() {
193-
if (isDtpExecutor()) {
194-
DtpExecutor dtpExecutor = (DtpExecutor) getExecutor();
195-
initialize(dtpExecutor);
196-
AwareManager.register(this);
197-
} else if (isThreadPoolExecutor()) {
198-
AwareManager.register(this);
193+
AwareManager.register(this);
194+
if (isThreadPoolExecutor()) {
195+
initialize((ThreadPoolExecutor) getExecutor().getOriginal());
199196
}
200197
}
201198

202-
public void initialize(DtpExecutor dtpExecutor) {
203-
NotifyHelper.initNotify(dtpExecutor);
199+
private void initialize(ThreadPoolExecutor executor) {
204200
if (preStartAllCoreThreads) {
205-
dtpExecutor.prestartAllCoreThreads();
201+
executor.prestartAllCoreThreads();
206202
}
207203
// reset reject handler in initialize phase according to rejectEnhanced
208204
setRejectHandler(RejectHandlerGetter.buildRejectedHandler(getRejectHandlerType()));
@@ -214,16 +210,20 @@ public void initialize(DtpExecutor dtpExecutor) {
214210
* @return boolean
215211
*/
216212
public boolean isDtpExecutor() {
217-
return this.executor instanceof DtpExecutor;
213+
return this.executor.getOriginal() instanceof DtpExecutor;
218214
}
219215

220216
/**
221217
* whether is ThreadPoolExecutor
222218
*
223219
* @return boolean
224220
*/
221+
public boolean isExecutorService() {
222+
return this.executor.getOriginal() instanceof ExecutorService;
223+
}
224+
225225
public boolean isThreadPoolExecutor() {
226-
return this.executor instanceof ThreadPoolExecutorAdapter;
226+
return this.executor.getOriginal() instanceof ThreadPoolExecutor;
227227
}
228228

229229
/**
@@ -238,13 +238,6 @@ public void setTaskWrappers(List<TaskWrapper> taskWrappers) {
238238
}
239239
}
240240

241-
public void setRejectEnhanced(boolean rejectEnhanced) {
242-
this.rejectEnhanced = rejectEnhanced;
243-
if (isDtpExecutor()) {
244-
((DtpExecutor) executor).setRejectEnhanced(rejectEnhanced);
245-
}
246-
}
247-
248241
public void setRejectHandler(RejectedExecutionHandler handler) {
249242
this.rejectHandlerType = handler.getClass().getSimpleName();
250243
if (!isRejectEnhanced()) {

core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolBuilder.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,6 @@ private DtpExecutor buildDtpExecutor(ThreadPoolBuilder builder) {
582582
DtpExecutor dtpExecutor = createInternal(builder);
583583
dtpExecutor.setThreadPoolName(builder.threadPoolName);
584584
dtpExecutor.allowCoreThreadTimeOut(builder.allowCoreThreadTimeOut);
585-
dtpExecutor.setRejectEnhanced(builder.rejectEnhanced);
586585
dtpExecutor.setTaskWrappers(builder.taskWrappers);
587586
dtpExecutor.setNotifyItems(builder.notifyItems);
588587
dtpExecutor.setPlatformIds(builder.platformIds);

0 commit comments

Comments
 (0)