Skip to content

Commit 793c780

Browse files
WYAOBOzengqiao
authored andcommitted
[Bugfix]修复mm2列表请求超时(#949)
调整代码结构
1 parent ec6f063 commit 793c780

File tree

2 files changed

+72
-9
lines changed

2 files changed

+72
-9
lines changed

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
4242
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
4343
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService;
44+
import com.xiaojukeji.know.streaming.km.core.utils.ApiCallThreadPoolService;
4445
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
4546
import org.apache.commons.lang.StringUtils;
4647
import org.springframework.beans.factory.annotation.Autowired;
@@ -296,18 +297,16 @@ public PaginationResult<ClusterMirrorMakerOverviewVO> getClusterMirrorMakersOver
296297

297298
List<ClusterMirrorMakerOverviewVO> mirrorMakerOverviewVOList = this.convert2ClusterMirrorMakerOverviewVO(mirrorMakerList, connectClusterList, latestMetricsResult.getData());
298299

299-
PaginationResult<ClusterMirrorMakerOverviewVO> voPaginationResult = this.pagingMirrorMakerInLocal(mirrorMakerOverviewVOList, dto);
300+
List<ClusterMirrorMakerOverviewVO> mirrorMakerVOList = this.completeClusterInfo(mirrorMakerOverviewVOList);
301+
302+
PaginationResult<ClusterMirrorMakerOverviewVO> voPaginationResult = this.pagingMirrorMakerInLocal(mirrorMakerVOList, dto);
300303

301304
if (voPaginationResult.failed()) {
302305
LOGGER.error("method=ClusterMirrorMakerOverviewVO||clusterPhyId={}||result={}||errMsg=pagination in local failed", clusterPhyId, voPaginationResult);
303306

304307
return PaginationResult.buildFailure(voPaginationResult, dto);
305308
}
306309

307-
//这里再补充源集群和目的集群信息,减少网络请求。
308-
this.completeClusterInfo(voPaginationResult.getData().getBizData());
309-
310-
311310
// 查询历史指标
312311
Result<List<MetricMultiLinesVO>> lineMetricsResult = mirrorMakerMetricService.listMirrorMakerClusterMetricsFromES(
313312
clusterPhyId,
@@ -596,14 +595,31 @@ public static List<ClusterMirrorMakerOverviewVO> supplyData2ClusterMirrorMakerOv
596595
return voList;
597596
}
598597

599-
private void completeClusterInfo(List<ClusterMirrorMakerOverviewVO> mirrorMakerVOList) {
598+
private List<ClusterMirrorMakerOverviewVO> completeClusterInfo(List<ClusterMirrorMakerOverviewVO> mirrorMakerVOList) {
599+
600+
Map<String, KSConnectorInfo> connectorInfoMap = new HashMap<>();
600601

601602
for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) {
602-
Result<KSConnectorInfo> connectorInfoRet = connectorService.getConnectorInfoFromCluster(mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName());
603-
if (!connectorInfoRet.hasData()) {
603+
ApiCallThreadPoolService.runnableTask(String.format("method=completeClusterInfo||connectClusterId=%d||connectorName=%s||getMirrorMakerInfo", mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName()),
604+
3000
605+
, () -> {
606+
Result<KSConnectorInfo> connectorInfoRet = connectorService.getConnectorInfoFromCluster(mirrorMakerVO.getConnectClusterId(), mirrorMakerVO.getConnectorName());
607+
if (connectorInfoRet.hasData()) {
608+
connectorInfoMap.put(mirrorMakerVO.getConnectClusterId() + mirrorMakerVO.getConnectorName(), connectorInfoRet.getData());
609+
}
610+
611+
return connectorInfoRet.getData();
612+
});
613+
}
614+
615+
ApiCallThreadPoolService.waitResult(1000);
616+
617+
List<ClusterMirrorMakerOverviewVO> newMirrorMakerVOList = new ArrayList<>();
618+
for (ClusterMirrorMakerOverviewVO mirrorMakerVO : mirrorMakerVOList) {
619+
KSConnectorInfo connectorInfo = connectorInfoMap.get(mirrorMakerVO.getConnectClusterId() + mirrorMakerVO.getConnectorName());
620+
if (connectorInfo == null) {
604621
continue;
605622
}
606-
KSConnectorInfo connectorInfo = connectorInfoRet.getData();
607623

608624
String sourceClusterAlias = connectorInfo.getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_ALIAS_FIELD_NAME);
609625
String targetClusterAlias = connectorInfo.getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_ALIAS_FIELD_NAME);
@@ -627,6 +643,10 @@ private void completeClusterInfo(List<ClusterMirrorMakerOverviewVO> mirrorMakerV
627643
}
628644
}
629645

646+
newMirrorMakerVOList.add(mirrorMakerVO);
647+
630648
}
649+
650+
return newMirrorMakerVOList;
631651
}
632652
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.xiaojukeji.know.streaming.km.core.utils;
2+
3+
import com.xiaojukeji.know.streaming.km.common.utils.FutureWaitUtil;
4+
import lombok.NoArgsConstructor;
5+
import org.springframework.beans.factory.annotation.Value;
6+
import org.springframework.stereotype.Service;
7+
8+
import javax.annotation.PostConstruct;
9+
import java.util.concurrent.Callable;
10+
11+
/**
12+
* @author wyb
13+
* @date 2023/2/22
14+
*/
15+
@Service
16+
@NoArgsConstructor
17+
public class ApiCallThreadPoolService {
18+
@Value(value = "${thread-pool.api.thread-num:2}")
19+
private Integer threadNum;
20+
21+
@Value(value = "${thread-pool.api.queue-size:500}")
22+
private Integer queueSize;
23+
24+
private static FutureWaitUtil<Object> apiFutureUtil;
25+
26+
@PostConstruct
27+
private void init() {
28+
apiFutureUtil = FutureWaitUtil.init(
29+
"ApiCallTP",
30+
threadNum,
31+
threadNum,
32+
queueSize
33+
);
34+
}
35+
36+
public static void runnableTask(String taskName, Integer timeoutUnisMs, Callable<Object> callable) {
37+
apiFutureUtil.runnableTask(taskName, timeoutUnisMs, callable);
38+
}
39+
40+
public static void waitResult(Integer stepWaitTimeUnitMs) {
41+
apiFutureUtil.waitResult(stepWaitTimeUnitMs);
42+
}
43+
}

0 commit comments

Comments
 (0)