Skip to content

Commit 33fb0ac

Browse files
author
zengqiao
committed
补充FutureUtil类
1 parent 3d74e60 commit 33fb0ac

File tree

2 files changed

+190
-0
lines changed

2 files changed

+190
-0
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.xiaojukeji.kafka.manager.common.entity.ao.common;
2+
3+
import lombok.Getter;
4+
5+
import java.util.concurrent.Delayed;
6+
import java.util.concurrent.Future;
7+
import java.util.concurrent.TimeUnit;
8+
9+
@Getter
10+
public class FutureTaskDelayQueueData<T> implements Delayed {
11+
private final String taskName;
12+
13+
private final Future<T> futureTask;
14+
15+
private final long timeoutTimeUnitMs;
16+
17+
private final long createTimeUnitMs;
18+
19+
public FutureTaskDelayQueueData(String taskName, Future<T> futureTask, long timeoutTimeUnitMs) {
20+
this.taskName = taskName;
21+
this.futureTask = futureTask;
22+
this.timeoutTimeUnitMs = timeoutTimeUnitMs;
23+
this.createTimeUnitMs = System.currentTimeMillis();
24+
}
25+
26+
@Override
27+
public long getDelay(TimeUnit unit) {
28+
return unit.convert(timeoutTimeUnitMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
29+
}
30+
31+
@Override
32+
public int compareTo(Delayed delayed) {
33+
FutureTaskDelayQueueData<T> other = (FutureTaskDelayQueueData<T>) delayed;
34+
if (this.timeoutTimeUnitMs == other.timeoutTimeUnitMs) {
35+
return (this.timeoutTimeUnitMs + "_" + this.createTimeUnitMs).compareTo((other.timeoutTimeUnitMs + "_" + other.createTimeUnitMs));
36+
}
37+
38+
return (this.timeoutTimeUnitMs - other.timeoutTimeUnitMs) <= 0 ? -1: 1;
39+
}
40+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package com.xiaojukeji.kafka.manager.common.utils;
2+
3+
import com.xiaojukeji.kafka.manager.common.entity.ao.common.FutureTaskDelayQueueData;
4+
import com.xiaojukeji.kafka.manager.common.utils.factory.DefaultThreadFactory;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
import java.util.Map;
11+
import java.util.concurrent.*;
12+
13+
/**
14+
* Future工具类
15+
*/
16+
public class FutureUtil<T> {
17+
private static final Logger LOGGER = LoggerFactory.getLogger(FutureUtil.class);
18+
19+
private ThreadPoolExecutor executor;
20+
21+
private Map<Long/*currentThreadId*/, DelayQueue<FutureTaskDelayQueueData<T>>> futuresMap;
22+
23+
private FutureUtil() {
24+
}
25+
26+
public static <T> FutureUtil<T> init(String name, int corePoolSize, int maxPoolSize, int queueSize) {
27+
FutureUtil<T> futureUtil = new FutureUtil<>();
28+
29+
futureUtil.executor = new ThreadPoolExecutor(
30+
corePoolSize,
31+
maxPoolSize,
32+
3000,
33+
TimeUnit.MILLISECONDS,
34+
new LinkedBlockingDeque<>(queueSize),
35+
new DefaultThreadFactory("KM-FutureUtil-" + name),
36+
new ThreadPoolExecutor.DiscardOldestPolicy() //对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。
37+
);
38+
39+
futureUtil.futuresMap = new ConcurrentHashMap<>();
40+
return futureUtil;
41+
}
42+
43+
/**
44+
* 必须配合 waitExecute使用 否则容易会撑爆内存
45+
*/
46+
public FutureUtil<T> runnableTask(String taskName, Integer timeoutUnisMs, Callable<T> callable) {
47+
Long currentThreadId = Thread.currentThread().getId();
48+
49+
futuresMap.putIfAbsent(currentThreadId, new DelayQueue<>());
50+
51+
DelayQueue<FutureTaskDelayQueueData<T>> delayQueueData = futuresMap.get(currentThreadId);
52+
53+
delayQueueData.put(new FutureTaskDelayQueueData<>(taskName, executor.submit(callable), timeoutUnisMs + System.currentTimeMillis()));
54+
55+
return this;
56+
}
57+
58+
public FutureUtil<T> runnableTask(String taskName, Integer timeoutUnisMs, Runnable runnable) {
59+
Long currentThreadId = Thread.currentThread().getId();
60+
61+
futuresMap.putIfAbsent(currentThreadId, new DelayQueue<>());
62+
63+
DelayQueue<FutureTaskDelayQueueData<T>> delayQueueData = futuresMap.get(currentThreadId);
64+
65+
delayQueueData.put(new FutureTaskDelayQueueData<T>(taskName, (Future<T>) executor.submit(runnable), timeoutUnisMs + System.currentTimeMillis()));
66+
67+
return this;
68+
}
69+
70+
public void waitExecute() {
71+
this.waitResult();
72+
}
73+
74+
public void waitExecute(Integer stepWaitTimeUnitMs) {
75+
this.waitResult(stepWaitTimeUnitMs);
76+
}
77+
78+
public List<T> waitResult() {
79+
return waitResult(null);
80+
}
81+
82+
/**
83+
* 等待结果
84+
* @param stepWaitTimeUnitMs 超时时间达到后,没有完成时,继续等待的时间
85+
*/
86+
public List<T> waitResult(Integer stepWaitTimeUnitMs) {
87+
Long currentThreadId = Thread.currentThread().getId();
88+
89+
DelayQueue<FutureTaskDelayQueueData<T>> delayQueueData = futuresMap.remove(currentThreadId);
90+
if(delayQueueData == null || delayQueueData.isEmpty()) {
91+
return new ArrayList<>();
92+
}
93+
94+
List<T> resultList = new ArrayList<>();
95+
while (!delayQueueData.isEmpty()) {
96+
try {
97+
// 不进行阻塞,直接获取第一个任务
98+
FutureTaskDelayQueueData<T> queueData = delayQueueData.peek();
99+
if (queueData.getFutureTask().isDone()) {
100+
// 如果第一个已经完成了,则移除掉第一个,然后获取其result
101+
delayQueueData.remove(queueData);
102+
resultList.add(queueData.getFutureTask().get());
103+
continue;
104+
}
105+
106+
// 如果第一个未完成,则阻塞10ms,判断是否达到超时时间了。
107+
// 这里的10ms不建议设置较大,因为任务可能在这段时间内完成了,此时如果设置的较大,会导致迟迟不能返回,从而影响接口调用的性能
108+
queueData = delayQueueData.poll(10, TimeUnit.MILLISECONDS);
109+
if (queueData == null) {
110+
continue;
111+
}
112+
113+
// 在到达超时时间后,任务没有完成,但是没有完成的原因可能是因为任务一直处于等待状态导致的。
114+
// 因此这里再给一段补充时间,看这段时间内是否可以完成任务。
115+
stepWaitResult(queueData, stepWaitTimeUnitMs);
116+
117+
// 达到超时时间
118+
if (queueData.getFutureTask().isDone()) {
119+
// 任务已经完成
120+
resultList.add(queueData.getFutureTask().get());
121+
continue;
122+
}
123+
124+
// 达到超时时间,但是任务未完成,则打印日志并强制取消
125+
LOGGER.error("class=FutureUtil||method=waitExecute||taskName={}||msg=cancel task", queueData.getTaskName());
126+
127+
queueData.getFutureTask().cancel(true);
128+
} catch (Exception e) {
129+
LOGGER.error("class=FutureUtil||method=waitExecute||msg=exception", e);
130+
}
131+
}
132+
133+
return resultList;
134+
}
135+
136+
private T stepWaitResult(FutureTaskDelayQueueData<T> queueData, Integer stepWaitTimeUnitMs) {
137+
if (stepWaitTimeUnitMs == null) {
138+
return null;
139+
}
140+
141+
try {
142+
return queueData.getFutureTask().get(stepWaitTimeUnitMs, TimeUnit.MILLISECONDS);
143+
} catch (Exception e) {
144+
// 达到超时时间,但是任务未完成,则打印日志并强制取消
145+
LOGGER.error("class=FutureUtil||method=stepWaitResult||taskName={}||errMsg=exception", queueData.getTaskName(), e);
146+
}
147+
148+
return null;
149+
}
150+
}

0 commit comments

Comments
 (0)