44import com .xiaojukeji .kafka .manager .common .entity .ResultStatus ;
55import com .xiaojukeji .kafka .manager .common .entity .pojo .ClusterDO ;
66import com .xiaojukeji .kafka .manager .common .entity .pojo .TopicDO ;
7+ import com .xiaojukeji .kafka .manager .common .exception .ConfigException ;
8+ import com .xiaojukeji .kafka .manager .common .zookeeper .ZkConfigImpl ;
79import com .xiaojukeji .kafka .manager .service .config .BaseTest ;
810import org .springframework .beans .factory .annotation .Autowired ;
911import org .testng .Assert ;
@@ -24,6 +26,8 @@ public class AdminServiceTest extends BaseTest {
2426 */
2527 private final static String REAL_TOPIC1_IN_ZK = "moduleTest" ;
2628
29+ private final static String REAL_TOPIC1_IN_ZK2 = "expandPartitionTopic" ;
30+
2731 /**
2832 * 集群共包括三个broker:1,2,3, 该topic 2分区 3副本因子,在broker1,2,3上
2933 */
@@ -55,6 +59,23 @@ public class AdminServiceTest extends BaseTest {
5559
5660 private final static String ADMIN = "admin" ;
5761
62+ private final static String REAL_PHYSICAL_CLUSTER_NAME = "LogiKM_moduleTest" ;
63+
64+ // private final static String ZOOKEEPER_ADDRESS = "10.190.46.198:2181,10.190.14.237:2181,10.190.50.65:2181/xg";
65+ private final static String ZOOKEEPER_ADDRESS = "10.190.12.242:2181,10.190.25.160:2181,10.190.25.41:2181/wyc" ;
66+
67+ // private final static String BOOTSTRAP_SERVERS = "10.190.46.198:9093,10.190.14.237:9093,10.190.50.65:9093";
68+ private final static String BOOTSTRAP_SERVERS = "10.190.12.242:9093,10.190.25.160:9093,10.190.25.41:9093" ;
69+
70+ private final static String SECURITY_PROTOCOL = "{ \t \" security.protocol\" : \" SASL_PLAINTEXT\" , \t \" sasl.mechanism\" : \" PLAIN\" , \t \" sasl.jaas.config\" : \" org.apache.kafka.common.security.plain.PlainLoginModule required username=\\ \" dkm_admin\\ \" password=\\ \" km_kMl4N8as1Kp0CCY\\ \" ;\" }" ;
71+
72+ // 优先副本节点在zk上的路径
73+ private final static String ZK_NODE_PATH_PREFERRED = "/admin/preferred_replica_election" ;
74+
75+ // 创建的topic节点在zk上的路径;brokers节点下的
76+ private final static String ZK_NODE_PATH_BROKERS_TOPIC = "/brokers/topics/createTopicTest" ;
77+ // config节点下的
78+ private final static String ZK_NODE_PATH_CONFIG_TOPIC = "/config/topics/createTopicTest" ;
5879
5980 @ Autowired
6081 private AdminService adminService ;
@@ -75,24 +96,32 @@ private TopicDO getTopicDO() {
7596 public ClusterDO getClusterDO () {
7697 ClusterDO clusterDO = new ClusterDO ();
7798 clusterDO .setId (REAL_CLUSTER_ID_IN_MYSQL );
78- clusterDO .setClusterName ("LogiKM_moduleTest" );
79- clusterDO .setZookeeper ("10.190.46.198:2181,10.190.14.237:2181,10.190.50.65:2181/xg" );
80- clusterDO .setBootstrapServers ("10.190.46.198:9093,10.190.14.237:9093,10.190.50.65:9093" );
81- clusterDO .setSecurityProperties ("{ \t \" security.protocol \" : \" SASL_PLAINTEXT \" , \t \" sasl.mechanism \" : \" PLAIN \" , \t \" sasl.jaas.config \" : \" org.apache.kafka.common.security.plain.PlainLoginModule required username= \\ \" dkm_admin \\ \" password= \\ \" km_kMl4N8as1Kp0CCY \\ \" ; \" }" );
99+ clusterDO .setClusterName (REAL_PHYSICAL_CLUSTER_NAME );
100+ clusterDO .setZookeeper (ZOOKEEPER_ADDRESS );
101+ clusterDO .setBootstrapServers (BOOTSTRAP_SERVERS );
102+ clusterDO .setSecurityProperties (SECURITY_PROTOCOL );
82103 clusterDO .setStatus (1 );
83104 clusterDO .setGmtCreate (new Date ());
84105 clusterDO .setGmtModify (new Date ());
85106 return clusterDO ;
86107 }
87108
88109 @ Test (description = "测试创建topic" )
89- public void createTopicTest () {
110+ public void createTopicTest () throws ConfigException {
90111 // broker not exist
91112 createTopic2BrokerNotExistTest ();
92113 // success to create topic
93114 createTopic2SuccessTest ();
94115 // failure to create topic, topic already exists
95116 createTopic2FailureTest ();
117+
118+ // 创建成功后,数据库和zk中会存在该Topic,需要删除防止影响后面测试
119+ // 写入数据库的整个Test结束后回滚,因此只用删除zk上的topic节点
120+ ZkConfigImpl zkConfig = new ZkConfigImpl (ZOOKEEPER_ADDRESS );
121+ zkConfig .delete (ZK_NODE_PATH_BROKERS_TOPIC );
122+ zkConfig .delete (ZK_NODE_PATH_CONFIG_TOPIC );
123+ zkConfig .close ();
124+
96125 }
97126
98127 private void createTopic2BrokerNotExistTest () {
@@ -103,7 +132,7 @@ private void createTopic2BrokerNotExistTest() {
103132 topicDO ,
104133 1 ,
105134 1 ,
106- 1L ,
135+ INVALID_REGION_ID ,
107136 Arrays .asList (INVALID_BROKER_ID ),
108137 new Properties (),
109138 ADMIN ,
@@ -163,9 +192,18 @@ private void deleteTopic2FailureTest() {
163192
164193 private void deleteTopic2SuccessTest () {
165194 TopicDO topicDO = getTopicDO ();
166- topicManagerService .addTopic (topicDO );
167-
168195 ClusterDO clusterDO = getClusterDO ();
196+ ResultStatus result = adminService .createTopic (
197+ clusterDO ,
198+ topicDO ,
199+ 1 ,
200+ 1 ,
201+ INVALID_REGION_ID ,
202+ Arrays .asList (REAL_BROKER_ID_IN_ZK ),
203+ new Properties (),
204+ ADMIN ,
205+ ADMIN );
206+ Assert .assertEquals (result .getCode (), ResultStatus .SUCCESS .getCode ());
169207 ResultStatus resultStatus = adminService .deleteTopic (
170208 clusterDO ,
171209 CREATE_TOPIC_TEST ,
@@ -175,36 +213,52 @@ private void deleteTopic2SuccessTest() {
175213 }
176214
177215 @ Test (description = "测试优先副本选举状态" )
178- public void preferredReplicaElectionStatusTest () {
216+ public void preferredReplicaElectionStatusTest () throws ConfigException {
179217 // running
180218 preferredReplicaElectionStatus2RunningTest ();
181219 // not running
182220 preferredReplicaElectionStatus2NotRunningTest ();
183221 }
184222
185- private void preferredReplicaElectionStatus2RunningTest () {
223+ private void preferredReplicaElectionStatus2RunningTest () throws ConfigException {
186224 // zk上需要创建/admin/preferred_replica_election节点
225+ ZkConfigImpl zkConfig = new ZkConfigImpl (ZOOKEEPER_ADDRESS );
226+ zkConfig .setOrCreatePersistentNodeStat (ZK_NODE_PATH_PREFERRED , "" );
187227 ClusterDO clusterDO = getClusterDO ();
188228 TaskStatusEnum taskStatusEnum = adminService .preferredReplicaElectionStatus (clusterDO );
189229 Assert .assertEquals (taskStatusEnum .getCode (), TaskStatusEnum .RUNNING .getCode ());
230+
231+ // 删除之前创建的节点,防止影响后续测试
232+ zkConfig .delete (ZK_NODE_PATH_PREFERRED );
233+ zkConfig .close ();
190234 }
191235
192- private void preferredReplicaElectionStatus2NotRunningTest () {
236+ private void preferredReplicaElectionStatus2NotRunningTest () throws ConfigException {
193237 ClusterDO clusterDO = getClusterDO ();
194238 // zk上无/admin/preferred_replica_election节点
195239 TaskStatusEnum taskStatusEnum = adminService .preferredReplicaElectionStatus (clusterDO );
196240 Assert .assertEquals (taskStatusEnum .getCode (), TaskStatusEnum .SUCCEED .getCode ());
241+
242+ // 删除创建的节点,防止影响后续测试
243+ ZkConfigImpl zkConfig = new ZkConfigImpl (ZOOKEEPER_ADDRESS );
244+ zkConfig .delete (ZK_NODE_PATH_PREFERRED );
245+ zkConfig .close ();
197246 }
198247
199248 @ Test (description = "测试集群纬度优先副本选举" )
200- public void preferredReplicaElectionOfCluster2Test () {
249+ public void preferredReplicaElectionOfCluster2Test () throws ConfigException {
201250 ClusterDO clusterDO = getClusterDO ();
202251 ResultStatus resultStatus = adminService .preferredReplicaElection (clusterDO , ADMIN );
203252 Assert .assertEquals (resultStatus .getCode (), ResultStatus .SUCCESS .getCode ());
253+
254+ // 删除创建的节点,防止影响后续测试
255+ ZkConfigImpl zkConfig = new ZkConfigImpl (ZOOKEEPER_ADDRESS );
256+ zkConfig .delete (ZK_NODE_PATH_PREFERRED );
257+ zkConfig .close ();
204258 }
205259
206260 @ Test (description = "Broker纬度优先副本选举" )
207- public void preferredReplicaElectionOfBrokerTest () {
261+ public void preferredReplicaElectionOfBrokerTest () throws ConfigException {
208262 // 参数异常
209263 preferredReplicaElectionOfBroker2ParamIllegalTest ();
210264 // success
@@ -221,18 +275,23 @@ private void preferredReplicaElectionOfBroker2ParamIllegalTest() {
221275 Assert .assertEquals (resultStatus .getCode (), ResultStatus .PARAM_ILLEGAL .getCode ());
222276 }
223277
224- private void preferredReplicaElectionOfBroker2SuccessTest () {
278+ private void preferredReplicaElectionOfBroker2SuccessTest () throws ConfigException {
225279 ClusterDO clusterDO = getClusterDO ();
226280 ResultStatus resultStatus = adminService .preferredReplicaElection (
227281 clusterDO ,
228282 REAL_BROKER_ID_IN_ZK ,
229283 ADMIN
230284 );
231285 Assert .assertEquals (resultStatus .getCode (), ResultStatus .SUCCESS .getCode ());
286+
287+ // 删除创建的节点,防止影响后续测试
288+ ZkConfigImpl zkConfig = new ZkConfigImpl (ZOOKEEPER_ADDRESS );
289+ zkConfig .delete (ZK_NODE_PATH_PREFERRED );
290+ zkConfig .close ();
232291 }
233292
234293 @ Test (description = "Topic纬度优先副本选举" )
235- public void preferredReplicaElectionOfTopicTest () {
294+ public void preferredReplicaElectionOfTopicTest () throws ConfigException {
236295 // topic not exist
237296 preferredReplicaElectionOfTopic2TopicNotExistTest ();
238297 // success
@@ -249,18 +308,23 @@ private void preferredReplicaElectionOfTopic2TopicNotExistTest() {
249308 Assert .assertEquals (resultStatus .getCode (), ResultStatus .TOPIC_NOT_EXIST .getCode ());
250309 }
251310
252- private void preferredReplicaElectionOfTopic2SuccessTest () {
311+ private void preferredReplicaElectionOfTopic2SuccessTest () throws ConfigException {
253312 ClusterDO clusterDO = getClusterDO ();
254313 ResultStatus resultStatus = adminService .preferredReplicaElection (
255314 clusterDO ,
256315 REAL_TOPIC1_IN_ZK ,
257316 ADMIN
258317 );
259318 Assert .assertEquals (resultStatus .getCode (), ResultStatus .SUCCESS .getCode ());
319+
320+ // 删除创建的节点,防止影响后续测试
321+ ZkConfigImpl zkConfig = new ZkConfigImpl (ZOOKEEPER_ADDRESS );
322+ zkConfig .delete (ZK_NODE_PATH_PREFERRED );
323+ zkConfig .close ();
260324 }
261325
262- @ Test (description = "Topic纬度优先副本选举 " )
263- public void preferredReplicaElectionOfPartitionTest () {
326+ @ Test (description = "分区纬度优先副本选举 " )
327+ public void preferredReplicaElectionOfPartitionTest () throws ConfigException {
264328 // topic not exist
265329 preferredReplicaElectionOfPartition2TopicNotExistTest ();
266330 // partition Not Exist
@@ -291,7 +355,7 @@ private void preferredReplicaElectionOfPartition2PartitionNotExistTest() {
291355 Assert .assertEquals (resultStatus .getCode (), ResultStatus .PARTITION_NOT_EXIST .getCode ());
292356 }
293357
294- private void preferredReplicaElectionOfPartition2SuccessTest () {
358+ private void preferredReplicaElectionOfPartition2SuccessTest () throws ConfigException {
295359 ClusterDO clusterDO = getClusterDO ();
296360 ResultStatus resultStatus = adminService .preferredReplicaElection (
297361 clusterDO ,
@@ -300,6 +364,11 @@ private void preferredReplicaElectionOfPartition2SuccessTest() {
300364 ADMIN
301365 );
302366 Assert .assertEquals (resultStatus .getCode (), ResultStatus .SUCCESS .getCode ());
367+
368+ // 删除创建的节点,防止影响后续测试
369+ ZkConfigImpl zkConfig = new ZkConfigImpl (ZOOKEEPER_ADDRESS );
370+ zkConfig .delete (ZK_NODE_PATH_PREFERRED );
371+ zkConfig .close ();
303372 }
304373
305374 @ Test (description = "测试获取Topic配置" )
@@ -338,9 +407,10 @@ public void modifyTopicConfigTest() {
338407 }
339408
340409 @ Test (description = "测试扩分区" )
410+ // 该测试会导致真实topic分区发生变化
341411 public void expandPartitionsTest () {
342412 // broker not exist
343- expandPartitions2BrokerNotExistTest ();
413+ // expandPartitions2BrokerNotExistTest();
344414 // success
345415 expandPartitions2SuccessTest ();
346416 }
@@ -363,13 +433,13 @@ private void expandPartitions2SuccessTest() {
363433 ClusterDO clusterDO = getClusterDO ();
364434 ResultStatus resultStatus = adminService .expandPartitions (
365435 clusterDO ,
366- REAL_TOPIC1_IN_ZK ,
436+ REAL_TOPIC1_IN_ZK2 ,
367437 2 ,
368438 INVALID_REGION_ID ,
369439 Arrays .asList (REAL_BROKER_ID_IN_ZK ),
370440 ADMIN
371441 );
372- Assert .assertEquals (resultStatus .getCode (), ResultStatus .BROKER_NOT_EXIST .getCode ());
442+ Assert .assertEquals (resultStatus .getCode (), ResultStatus .SUCCESS .getCode ());
373443 }
374444
375445}
0 commit comments