Skip to content

Commit 1c4fbef

Browse files
zengqiaoZQKC
authored andcommitted
[Feature]支持拆分API服务和Job服务部署(#829)
1、JMX检查功能是每一个KS都必须要有的,因此从Task模块移动到Core模块; 2、application.yml中补充Task模块任务的整体开关字段;
1 parent b2f0f69 commit 1c4fbef

File tree

3 files changed

+46
-61
lines changed

3 files changed

+46
-61
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.xiaojukeji.know.streaming.km.core.flusher;
2+
3+
import com.didiglobal.logi.log.ILog;
4+
import com.didiglobal.logi.log.LogFactory;
5+
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
6+
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
7+
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
8+
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
9+
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
10+
import org.springframework.beans.factory.annotation.Autowired;
11+
import org.springframework.scheduling.annotation.Scheduled;
12+
import org.springframework.stereotype.Service;
13+
14+
/**
15+
* JMX连接检查
16+
*/
17+
@Service
18+
public class JmxClientLegalFlusher {
19+
private static final ILog LOGGER = LogFactory.getLog(JmxClientLegalFlusher.class);
20+
21+
@Autowired
22+
private BrokerService brokerService;
23+
24+
@Autowired
25+
private KafkaJMXClient kafkaJMXClient;
26+
27+
@Scheduled(cron="0 0/1 * * * ?")
28+
public void checkJmxClient() {
29+
for (ClusterPhy clusterPhy: LoadedClusterPhyCache.listAll().values()) {
30+
FutureUtil.quickStartupFutureUtil.submitTask(
31+
() -> {
32+
try {
33+
kafkaJMXClient.checkAndRemoveIfIllegal(
34+
clusterPhy.getId(),
35+
brokerService.listAliveBrokersFromDB(clusterPhy.getId())
36+
);
37+
} catch (Exception e) {
38+
LOGGER.error("method=checkJmxClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
39+
}
40+
}
41+
);
42+
}
43+
}
44+
}

km-rest/src/main/resources/application.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ spring:
3131
init-sql: true
3232
init-thread-num: 20
3333
max-thread-num: 50
34-
log-expire: 3 # 日志保存天数,以天为单位
34+
log-expire: 3 # 日志保存天数,以天为单位
3535
app-name: know-streaming
36+
enable: true # true表示开启job任务, false表关闭。KS在部署上可以考虑部署两套服务,一套处理前端请求,一套执行job任务,此时可以通过该字段进行控制
3637
claim-strategy: com.didiglobal.logi.job.core.consensual.RandomConsensual
3738
logi-security: # know-streaming 依赖的 logi-security 模块的数据库的配置,默认与 know-streaming 的数据库配置保持一致即可
3839
jdbc-url: jdbc:mariadb://127.0.0.1:3306/know_streaming?useUnicode=true&characterEncoding=utf8&jdbcCompliantTruncation=true&allowMultiQueries=true&useSSL=false&alwaysAutoGeneratedKeys=true&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=true

km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/client/CheckJmxClientTask.java

Lines changed: 0 additions & 60 deletions
This file was deleted.

0 commit comments

Comments
 (0)