Skip to content

Commit a0312be

Browse files
author
zengqiao
committed
Jmx连接的主机IP支持可选择
1 parent 7da712f commit a0312be

File tree

9 files changed

+76
-14
lines changed

9 files changed

+76
-14
lines changed

docs/install_guide/版本升级手册.md

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,30 @@
11
## 6.2、版本升级手册
22

3-
**`2.x`版本 升级至 `3.0.0`版本**
3+
注意:如果想升级至具体版本,需要将你当前版本至你期望使用版本的变更统统执行一遍,然后才能正常使用。
4+
5+
`master` 版本,需要
6+
7+
### 6.2.0、升级至 `master` 版本
8+
9+
10+
**SQL变更**
11+
12+
1、在`ks_km_broker`表增加了一个监听信息字段。
13+
2、为`logi_security_oplog`表operation_methods字段设置默认值''。
14+
因此需要执行下面的sql对数据库表进行更新。
15+
16+
```sql
17+
ALTER TABLE `ks_km_broker`
18+
ADD COLUMN `endpoint_map` VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '监听信息' AFTER `update_time`;
19+
20+
ALTER TABLE `logi_security_oplog`
21+
ALTER COLUMN `operation_methods` set default '';
22+
23+
```
24+
25+
---
26+
27+
### 6.2.1、`2.x`版本 升级至 `3.0.0`版本
428

529
**升级步骤:**
630

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
package com.xiaojukeji.know.streaming.km.common.bean.entity.broker;
22

3+
4+
import com.alibaba.fastjson.TypeReference;
5+
import com.xiaojukeji.know.streaming.km.common.bean.entity.common.IpPortData;
6+
import com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerPO;
7+
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
38
import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.BrokerMetadata;
49
import lombok.AllArgsConstructor;
510
import lombok.Data;
611
import lombok.NoArgsConstructor;
712
import org.apache.kafka.common.Node;
813

914
import java.io.Serializable;
15+
import java.util.Map;
1016

1117
/**
1218
* @author didi
@@ -55,6 +61,11 @@ public class Broker implements Serializable {
5561
*/
5662
private Integer status;
5763

64+
/**
65+
* 监听信息
66+
*/
67+
private Map<String, IpPortData> endpointMap;
68+
5869
public static Broker buildFrom(Long clusterPhyId, Node node, Long startTimestamp) {
5970
Broker metadata = new Broker();
6071
metadata.setClusterPhyId(clusterPhyId);
@@ -78,9 +89,31 @@ public static Broker buildFrom(Long clusterPhyId, Integer brokerId, BrokerMetada
7889
metadata.setStartTimestamp(brokerMetadata.getTimestamp());
7990
metadata.setRack(brokerMetadata.getRack());
8091
metadata.setStatus(1);
92+
metadata.setEndpointMap(brokerMetadata.getEndpointMap());
8193
return metadata;
8294
}
8395

96+
public static Broker buildFrom(BrokerPO brokerPO) {
97+
Broker broker = ConvertUtil.obj2Obj(brokerPO, Broker.class);
98+
String endpointMapStr = brokerPO.getEndpointMap();
99+
if (broker == null || endpointMapStr == null || endpointMapStr.equals("")) {
100+
return broker;
101+
}
102+
103+
// 填充endpoint信息
104+
Map<String, IpPortData> endpointMap = ConvertUtil.str2ObjByJson(endpointMapStr, new TypeReference<Map<String, IpPortData>>(){});
105+
broker.setEndpointMap(endpointMap);
106+
return broker;
107+
}
108+
109+
public String getJmxHost(String endPoint) {
110+
if (endPoint == null || endpointMap == null) {
111+
return host;
112+
}
113+
IpPortData ip = endpointMap.get(endPoint);
114+
return ip == null ? ip.getIp() : host;
115+
}
116+
84117
public boolean alive() {
85118
return status != null && status > 0;
86119
}

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/JmxConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ public class JmxConfig implements Serializable {
2727

2828
@ApiModelProperty(value="SSL情况下的token", example = "KsKmCCY19")
2929
private String token;
30+
31+
@ApiModelProperty(value="使用哪个endpoint网络", example = "EXTERNAL")
32+
private String useWhichEndpoint;
3033
}
3134

3235

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/broker/BrokerPO.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,9 @@ public class BrokerPO extends BasePO {
4242
* Broker状态
4343
*/
4444
private Integer status;
45+
46+
/**
47+
* 监听信息
48+
*/
49+
private String endpointMap;
4550
}

km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,9 @@ public void updateAliveBrokers(Long clusterPhyId, List<Broker> presentAliveBroke
130130

131131
// 如果当前Broker还存活,则更新DB信息
132132
BrokerPO newBrokerPO = ConvertUtil.obj2Obj(presentAliveBroker, BrokerPO.class);
133+
if (presentAliveBroker.getEndpointMap() != null) {
134+
newBrokerPO.setEndpointMap(ConvertUtil.obj2Json(presentAliveBroker.getEndpointMap()));
135+
}
133136
newBrokerPO.setId(inDBBrokerPO.getId());
134137
newBrokerPO.setStatus(Constant.ALIVE);
135138
newBrokerPO.setCreateTime(inDBBrokerPO.getCreateTime());
@@ -203,7 +206,7 @@ public Broker getBroker(Long clusterPhyId, Integer brokerId) {
203206
lambdaQueryWrapper.eq(BrokerPO::getClusterPhyId, clusterPhyId);
204207
lambdaQueryWrapper.eq(BrokerPO::getBrokerId, brokerId);
205208

206-
return ConvertUtil.obj2Obj(brokerDAO.selectOne(lambdaQueryWrapper), Broker.class);
209+
return Broker.buildFrom(brokerDAO.selectOne(lambdaQueryWrapper));
207210
}
208211

209212
@Override
@@ -272,9 +275,8 @@ public Integer countAllBrokers() {
272275
/**************************************************** private method ****************************************************/
273276

274277
private List<Broker> listAllBrokersAndUpdateCache(Long clusterPhyId) {
275-
List<Broker> allBrokerList = ConvertUtil.list2List(this.getAllBrokerPOsFromDB(clusterPhyId), Broker.class);
278+
List<Broker> allBrokerList = getAllBrokerPOsFromDB(clusterPhyId).stream().map(elem -> Broker.buildFrom(elem)).collect(Collectors.toList());
276279
brokersCache.put(clusterPhyId, allBrokerList);
277-
278280
return allBrokerList;
279281
}
280282

km-dist/init/sql/ddl-ks-km.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ CREATE TABLE `ks_km_broker` (
1313
`status` int(16) NOT NULL DEFAULT '0' COMMENT '状态: 1存活,0未存活',
1414
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
1515
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
16+
`endpoint_map` varchar(1024) NOT NULL DEFAULT '' COMMENT '监听信息',
1617
PRIMARY KEY (`id`),
1718
UNIQUE KEY `uniq_cluster_phy_id_broker_id` (`cluster_phy_id`,`broker_id`)
1819
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='Broker信息表';

km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaJMXClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ private JmxConnectorWrap createJmxConnectorWrap(ClusterPhy clusterPhy, Integer b
165165
clusterPhy.getId(),
166166
brokerId,
167167
broker.getStartTimestamp(),
168-
broker.getHost(),
169-
broker.getJmxPort() != null? broker.getJmxPort(): jmxConfig.getJmxPort(),
168+
jmxConfig != null ? broker.getJmxHost(jmxConfig.getUseWhichEndpoint()) : broker.getHost(),
169+
broker.getJmxPort() != null ? broker.getJmxPort() : jmxConfig.getJmxPort(),
170170
jmxConfig
171171
);
172172

@@ -191,6 +191,6 @@ private Broker getBrokerFromDB(Long clusterPhyId, Integer brokerId) {
191191
lambdaQueryWrapper.eq(BrokerPO::getStatus, Constant.ALIVE);
192192

193193
BrokerPO brokerPO = brokerDAO.selectOne(lambdaQueryWrapper);
194-
return ConvertUtil.obj2Obj(brokerPO, Broker.class);
194+
return Broker.buildFrom(brokerPO);
195195
}
196196
}

km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/broker/BrokerDAO.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,4 @@
66

77
@Repository
88
public interface BrokerDAO extends BaseMapper<BrokerPO> {
9-
int replace(BrokerPO brokerPO);
109
}

km-persistence/src/main/resources/mybatis/BrokerMapper.xml

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,7 @@
1414
<result column="jmx_port" property="jmxPort" />
1515
<result column="start_timestamp" property="startTimestamp" />
1616
<result column="status" property="status" />
17+
<result column="endpoint_map" property="endpointMap"/>
1718
</resultMap>
1819

19-
<insert id="replace" parameterType="com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerPO">
20-
REPLACE ks_km_broker
21-
(cluster_phy_id, broker_id, host, port, jmx_port, start_timestamp, status, update_time)
22-
VALUES
23-
(#{clusterPhyId}, #{brokerId}, #{host}, #{port}, #{jmxPort}, #{startTimestamp}, #{status}, #{updateTime})
24-
</insert>
2520
</mapper>

0 commit comments

Comments
 (0)