Skip to content

Commit de977a5

Browse files
author
zengqiao
committed
加快添加集群后的信息获取的速度
1 parent 703d685 commit de977a5

File tree

3 files changed

+95
-0
lines changed

3 files changed

+95
-0
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.xiaojukeji.know.streaming.km.common.bean.event.cluster;
2+
3+
import lombok.Getter;
4+
5+
/**
6+
* 集群新增事件
7+
* @author zengqiao
8+
* @date 22/02/25
9+
*/
10+
@Getter
11+
public class ClusterPhyAddedEvent extends ClusterPhyBaseEvent {
12+
public ClusterPhyAddedEvent(Object source, Long clusterPhyId) {
13+
super(source, clusterPhyId);
14+
}
15+
}

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
88
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
99
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
10+
import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent;
1011
import com.xiaojukeji.know.streaming.km.common.bean.po.cluster.ClusterPhyPO;
12+
import com.xiaojukeji.know.streaming.km.common.component.SpringTool;
1113
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
1214
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum;
1315
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum;
@@ -106,6 +108,8 @@ public Long addClusterPhy(ClusterPhyPO clusterPhyPO, String operator) throws Par
106108

107109
log.info("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster finished", clusterPhyPO.getId(), operator);
108110

111+
// 发布添加集群事件
112+
SpringTool.publish(new ClusterPhyAddedEvent(this, clusterPhyPO.getId()));
109113
return clusterPhyPO.getId();
110114
} catch (DuplicateKeyException dke) {
111115
log.warn("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=duplicate data", clusterPhyPO.getId(), operator);
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package com.xiaojukeji.know.streaming.km.task.service.listener;
2+
3+
import com.didiglobal.logi.log.ILog;
4+
import com.didiglobal.logi.log.LogFactory;
5+
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
6+
import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent;
7+
import com.xiaojukeji.know.streaming.km.common.component.SpringTool;
8+
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
9+
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
10+
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
11+
import com.xiaojukeji.know.streaming.km.task.metadata.AbstractAsyncMetadataDispatchTask;
12+
import com.xiaojukeji.know.streaming.km.task.metrics.AbstractAsyncMetricsDispatchTask;
13+
import org.springframework.context.ApplicationListener;
14+
import org.springframework.stereotype.Service;
15+
16+
import java.util.ArrayList;
17+
import java.util.List;
18+
19+
@Service
20+
public class TaskClusterAddedListener implements ApplicationListener<ClusterPhyAddedEvent> {
21+
private static final ILog LOGGER = LogFactory.getLog(TaskClusterAddedListener.class);
22+
23+
@Override
24+
public void onApplicationEvent(ClusterPhyAddedEvent event) {
25+
LOGGER.info("class=TaskClusterAddedListener||method=onApplicationEvent||clusterPhyId={}||msg=listened new cluster", event.getClusterPhyId());
26+
Long now = System.currentTimeMillis();
27+
28+
// 交由KS自定义的线程池,异步执行任务
29+
FutureUtil.quickStartupFutureUtil.submitTask(() -> triggerAllTask(event.getClusterPhyId(), now));
30+
}
31+
32+
private void triggerAllTask(Long clusterPhyId, Long startTimeUnitMs) {
33+
ClusterPhy tempClusterPhy = null;
34+
35+
// 120秒内无加载进来,则直接返回退出
36+
while (System.currentTimeMillis() - startTimeUnitMs <= 120L * 1000L) {
37+
tempClusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId);
38+
if (tempClusterPhy != null) {
39+
break;
40+
}
41+
42+
BackoffUtils.backoff(1000);
43+
}
44+
45+
if (tempClusterPhy == null) {
46+
return;
47+
}
48+
49+
// 获取到之后,再延迟5秒,保证相关的集群都被正常加载进来,这里的5秒不固定
50+
BackoffUtils.backoff(5000);
51+
final ClusterPhy clusterPhy = tempClusterPhy;
52+
53+
// 集群执行集群元信息同步
54+
List<AbstractAsyncMetadataDispatchTask> metadataServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetadataDispatchTask.class).values());
55+
for (AbstractAsyncMetadataDispatchTask dispatchTask: metadataServiceList) {
56+
try {
57+
dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);
58+
} catch (Exception e) {
59+
// ignore
60+
}
61+
}
62+
63+
// 再延迟5秒,保证集群元信息都已被正常同步至DB,这里的5秒不固定
64+
BackoffUtils.backoff(5000);
65+
66+
// 集群集群指标采集
67+
List<AbstractAsyncMetricsDispatchTask> metricsServiceList = new ArrayList<>(SpringTool.getBeansOfType(AbstractAsyncMetricsDispatchTask.class).values());
68+
for (AbstractAsyncMetricsDispatchTask dispatchTask: metricsServiceList) {
69+
try {
70+
dispatchTask.asyncProcessSubTask(clusterPhy, startTimeUnitMs);
71+
} catch (Exception e) {
72+
// ignore
73+
}
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)