@@ -166,8 +166,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
166166 for i , consumerGroup := range consumerGroups {
167167 log .Info ("Processing consumer group" + consumerGroup + " " + strconv .Itoa (i + 1 ) + "/" + strconv .Itoa (len (consumerGroups )))
168168 addConsumerGroupToTargetClusterCommand := buildAddConsumerGroupToClusterCommand (consumerGroup , targetCluster , nameServer )
169- reqLogger .Info ("AddConsumerGroupToTargetClusterCommand: " + addConsumerGroupToTargetClusterCommand )
170- cmd := exec .Command (cons .BasicCommand , cons . AdminToolDir , addConsumerGroupToTargetClusterCommand )
169+ reqLogger .Info ("AddConsumerGroupToTargetClusterCommand: " + strings . Join ( addConsumerGroupToTargetClusterCommand , " " ) )
170+ cmd := exec .Command (cons .BasicCommand , addConsumerGroupToTargetClusterCommand ... )
171171 output , err := cmd .Output ()
172172 // validate command output
173173 if err != nil || ! isUpdateConsumerGroupSuccess (string (output )) {
@@ -182,8 +182,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
182182 // step2: add consumer group to target cluster
183183 status = 2
184184 addTopicToTargetClusterCommand := buildAddTopicToClusterCommand (topic , targetCluster , nameServer )
185- reqLogger .Info ("addTopicToTargetClusterCommand: " + addTopicToTargetClusterCommand )
186- cmd := exec .Command (cons .BasicCommand , cons . AdminToolDir , addTopicToTargetClusterCommand )
185+ reqLogger .Info ("addTopicToTargetClusterCommand: " + strings . Join ( addTopicToTargetClusterCommand , " " ) )
186+ cmd := exec .Command (cons .BasicCommand , addTopicToTargetClusterCommand ... )
187187 output , err := cmd .Output ()
188188 // validate command output
189189 if err != nil || ! isUpdateTopicCommandSuccess (string (output )) {
@@ -197,8 +197,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
197197 // step3: stop write in source cluster topic
198198 status = 3
199199 stopSourceClusterTopicWriteCommand := buildStopClusterTopicWriteCommand (topic , sourceCluster , nameServer )
200- reqLogger .Info ("stopSourceClusterTopicWriteCommand: " + stopSourceClusterTopicWriteCommand )
201- cmd = exec .Command (cons .BasicCommand , cons . AdminToolDir , stopSourceClusterTopicWriteCommand )
200+ reqLogger .Info ("stopSourceClusterTopicWriteCommand: " + strings . Join ( stopSourceClusterTopicWriteCommand , " " ) )
201+ cmd = exec .Command (cons .BasicCommand , stopSourceClusterTopicWriteCommand ... )
202202 output , err = cmd .Output ()
203203 // validate command output
204204 if err != nil || ! isUpdateTopicCommandSuccess (string (output )) {
@@ -215,8 +215,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
215215 log .Info ("Processing consumer group" + consumerGroup + " " + strconv .Itoa (i + 1 ) + "/" + strconv .Itoa (len (consumerGroups )))
216216 for {
217217 checkConsumeProgressCommand := buildCheckConsumeProgressCommand (consumerGroup , nameServer )
218- reqLogger .Info ("checkConsumeProgressCommand: " + checkConsumeProgressCommand )
219- cmd = exec .Command (cons .BasicCommand , cons . AdminToolDir , checkConsumeProgressCommand )
218+ reqLogger .Info ("checkConsumeProgressCommand: " + strings . Join ( checkConsumeProgressCommand , " " ) )
219+ cmd = exec .Command (cons .BasicCommand , checkConsumeProgressCommand ... )
220220 output , err = cmd .Output ()
221221 if err != nil || ! isCheckConsumeProcessCommandSuccess (string (output )) {
222222 reqLogger .Error (err , "Failed to check consumerGroup " + consumerGroup + " with output: " + string (output ))
@@ -237,8 +237,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
237237 // step5: delete topic in source cluster
238238 status = 5
239239 deleteSourceClusterTopicCommand := buildDeleteSourceClusterTopicCommand (topic , sourceCluster , nameServer )
240- reqLogger .Info ("deleteSourceClusterTopicCommand: " + deleteSourceClusterTopicCommand )
241- cmd = exec .Command (cons .BasicCommand , cons . AdminToolDir , deleteSourceClusterTopicCommand )
240+ reqLogger .Info ("deleteSourceClusterTopicCommand: " + strings . Join ( deleteSourceClusterTopicCommand , " " ) )
241+ cmd = exec .Command (cons .BasicCommand , deleteSourceClusterTopicCommand ... )
242242 output , err = cmd .Output ()
243243 if err != nil || ! isDeleteTopicCommandSuccess (string (output )) {
244244 reqLogger .Error (err , "Failed to delete Topic " + topic + " in SourceCluster " + sourceCluster + " with output: " + string (output ))
@@ -253,8 +253,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
253253 for i , consumerGroup := range consumerGroups {
254254 log .Info ("Processing consumer group" + consumerGroup + " " + strconv .Itoa (i + 1 ) + "/" + strconv .Itoa (len (consumerGroups )))
255255 deleteConsumerGroupCommand := buildDeleteConsumeGroupCommand (consumerGroup , sourceCluster , nameServer )
256- reqLogger .Info ("deleteConsumerGroupCommand: " + deleteConsumerGroupCommand )
257- cmd = exec .Command (cons .BasicCommand , cons . AdminToolDir , deleteConsumerGroupCommand )
256+ reqLogger .Info ("deleteConsumerGroupCommand: " + strings . Join ( deleteConsumerGroupCommand , " " ) )
257+ cmd = exec .Command (cons .BasicCommand , deleteConsumerGroupCommand ... )
258258 output , err = cmd .Output ()
259259 if err != nil || ! isDeleteConsumerGroupSuccess (string (output )) {
260260 reqLogger .Error (err , "Failed to delete consumer group " + consumerGroup + " in SourceCluster " + sourceCluster + " with output: " + string (output ))
@@ -270,8 +270,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
270270 for i , consumerGroup := range consumerGroups {
271271 log .Info ("Processing consumer group" + consumerGroup + " " + strconv .Itoa (i + 1 ) + "/" + strconv .Itoa (len (consumerGroups )))
272272 createRetryTopicCommand := buildAddRetryTopicToClusterCommand (consumerGroup , targetCluster , nameServer )
273- reqLogger .Info ("createRetryTopicCommand: " + createRetryTopicCommand )
274- cmd = exec .Command (cons .BasicCommand , cons . AdminToolDir , createRetryTopicCommand )
273+ reqLogger .Info ("createRetryTopicCommand: " + strings . Join ( createRetryTopicCommand , " " ) )
274+ cmd = exec .Command (cons .BasicCommand , createRetryTopicCommand ... )
275275 output , err = cmd .Output ()
276276 if err != nil || ! isUpdateTopicCommandSuccess (string (output )) {
277277 reqLogger .Error (err , "Failed to create retry topic of consumer group " + consumerGroup + " in TargetCluster " + targetCluster + " with output: " + string (output ))
@@ -290,8 +290,8 @@ func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcil
290290
291291func undoStopWrite (topic string , cluster string , nameServer string ) {
292292 addTopicToClusterCommand := buildUndoStopWriteCommand (topic , cluster , nameServer )
293- log .Info ("undoStopWrite: " + addTopicToClusterCommand )
294- cmd := exec .Command (cons .BasicCommand , cons . AdminToolDir , addTopicToClusterCommand )
293+ log .Info ("undoStopWrite: " + strings . Join ( addTopicToClusterCommand , " " ) )
294+ cmd := exec .Command (cons .BasicCommand , addTopicToClusterCommand ... )
295295 output , err := cmd .Output ()
296296 if err != nil || ! isUpdateTopicCommandSuccess (string (output )) {
297297 log .Error (err , "Failed to undo stop write topic with output: " + string (output ))
@@ -301,8 +301,8 @@ func undoStopWrite(topic string, cluster string, nameServer string) {
301301
302302func undoDeleteTopic (topic string , cluster string , nameServer string ) {
303303 addTopicToClusterCommand := buildAddTopicToClusterCommand (topic , cluster , nameServer )
304- log .Info ("undoDeleteTopic: " + addTopicToClusterCommand )
305- cmd := exec .Command (cons .BasicCommand , cons . AdminToolDir , addTopicToClusterCommand )
304+ log .Info ("undoDeleteTopic: " + strings . Join ( addTopicToClusterCommand , " " ) )
305+ cmd := exec .Command (cons .BasicCommand , addTopicToClusterCommand ... )
306306 output , err := cmd .Output ()
307307 if err != nil || ! isUpdateTopicCommandSuccess (string (output )) {
308308 log .Error (err , "Failed to undo delete topic with output: " + string (output ))
@@ -313,8 +313,8 @@ func undoDeleteTopic(topic string, cluster string, nameServer string) {
313313func undoDeleteConsumeGroup (consumerGroups []string , cluster string , nameServer string ) {
314314 for _ , consumerGroup := range consumerGroups {
315315 addConsumerGroupToTargetClusterCommand := buildAddConsumerGroupToClusterCommand (consumerGroup , cluster , nameServer )
316- log .Info ("undoDeleteConsumeGroup: " + addConsumerGroupToTargetClusterCommand )
317- cmd := exec .Command (cons .BasicCommand , cons . AdminToolDir , addConsumerGroupToTargetClusterCommand )
316+ log .Info ("undoDeleteConsumeGroup: " + strings . Join ( addConsumerGroupToTargetClusterCommand , " " ) )
317+ cmd := exec .Command (cons .BasicCommand , addConsumerGroupToTargetClusterCommand ... )
318318 output , err := cmd .Output ()
319319 if err != nil || ! isUpdateConsumerGroupSuccess (string (output )) {
320320 log .Error (err , "Failed to undo delete consume group with output: " + string (output ))
@@ -326,7 +326,7 @@ func undoDeleteConsumeGroup(consumerGroups []string, cluster string, nameServer
326326func getConsumerGroupByTopic (topic string , nameServer string ) []string {
327327 var consumerGroups []string
328328 topicListCmd := buildTopicListCommand (nameServer )
329- cmd := exec .Command (cons .BasicCommand , cons . AdminToolDir , topicListCmd )
329+ cmd := exec .Command (cons .BasicCommand , topicListCmd ... )
330330 output , err := cmd .Output ()
331331 if err != nil || ! isTopicListSuccess (string (output )) {
332332 log .Error (err , "Failed to list topic with output: " + string (output ))
@@ -371,8 +371,9 @@ func isUpdateConsumerGroupSuccess(s string) bool {
371371 return strings .Contains (s , "groupName" )
372372}
373373
374- func buildUndoStopWriteCommand (topic string , cluster string , nameServer string ) string {
374+ func buildUndoStopWriteCommand (topic string , cluster string , nameServer string ) [] string {
375375 cmdOpts := []string {
376+ cons .AdminToolDir ,
376377 "updatetopic" ,
377378 "-t" ,
378379 topic ,
@@ -387,21 +388,23 @@ func buildUndoStopWriteCommand(topic string, cluster string, nameServer string)
387388 "-n" ,
388389 nameServer ,
389390 }
390- return strings . Join ( cmdOpts , " " )
391+ return cmdOpts
391392}
392393
393- func buildTopicListCommand (nameServer string ) string {
394+ func buildTopicListCommand (nameServer string ) [] string {
394395 cmdOpts := []string {
396+ cons .AdminToolDir ,
395397 "topiclist" ,
396398 "-c" ,
397399 "-n" ,
398400 nameServer ,
399401 }
400- return strings . Join ( cmdOpts , " " )
402+ return cmdOpts
401403}
402404
403- func buildAddRetryTopicToClusterCommand (consumerGroup string , cluster string , nameServer string ) string {
405+ func buildAddRetryTopicToClusterCommand (consumerGroup string , cluster string , nameServer string ) [] string {
404406 cmdOpts := []string {
407+ cons .AdminToolDir ,
405408 "updatetopic" ,
406409 "-t" ,
407410 "%RETRY%" + consumerGroup ,
@@ -416,7 +419,7 @@ func buildAddRetryTopicToClusterCommand(consumerGroup string, cluster string, na
416419 "-n" ,
417420 nameServer ,
418421 }
419- return strings . Join ( cmdOpts , " " )
422+ return cmdOpts
420423}
421424
422425func getClusterBrokerNames (cluster string ) []string {
@@ -446,8 +449,9 @@ func isConsumeFinished(output string, topic string, cluster string) bool {
446449 return true
447450}
448451
449- func buildDeleteConsumeGroupCommand (consumerGroup string , cluster string , nameServer string ) string {
452+ func buildDeleteConsumeGroupCommand (consumerGroup string , cluster string , nameServer string ) [] string {
450453 cmdOpts := []string {
454+ cons .AdminToolDir ,
451455 "deleteSubGroup" ,
452456 "-g" ,
453457 consumerGroup ,
@@ -456,11 +460,12 @@ func buildDeleteConsumeGroupCommand(consumerGroup string, cluster string, nameSe
456460 "-n" ,
457461 nameServer ,
458462 }
459- return strings . Join ( cmdOpts , " " )
463+ return cmdOpts
460464}
461465
462- func buildDeleteSourceClusterTopicCommand (topic string , sourceCluster string , nameServer string ) string {
466+ func buildDeleteSourceClusterTopicCommand (topic string , sourceCluster string , nameServer string ) [] string {
463467 cmdOpts := []string {
468+ cons .AdminToolDir ,
464469 "deletetopic" ,
465470 "-t" ,
466471 topic ,
@@ -469,22 +474,24 @@ func buildDeleteSourceClusterTopicCommand(topic string, sourceCluster string, na
469474 "-n" ,
470475 nameServer ,
471476 }
472- return strings . Join ( cmdOpts , " " )
477+ return cmdOpts
473478}
474479
475- func buildCheckConsumeProgressCommand (consumerGroup string , nameServer string ) string {
480+ func buildCheckConsumeProgressCommand (consumerGroup string , nameServer string ) [] string {
476481 cmdOpts := []string {
482+ cons .AdminToolDir ,
477483 "consumerprogress" ,
478484 "-g" ,
479485 consumerGroup ,
480486 "-n" ,
481487 nameServer ,
482488 }
483- return strings . Join ( cmdOpts , " " )
489+ return cmdOpts
484490}
485491
486- func buildStopClusterTopicWriteCommand (topic string , cluster string , nameServer string ) string {
492+ func buildStopClusterTopicWriteCommand (topic string , cluster string , nameServer string ) [] string {
487493 cmdOpts := []string {
494+ cons .AdminToolDir ,
488495 "updatetopic" ,
489496 "-t" ,
490497 topic ,
@@ -497,11 +504,12 @@ func buildStopClusterTopicWriteCommand(topic string, cluster string, nameServer
497504 "-n" ,
498505 nameServer ,
499506 }
500- return strings . Join ( cmdOpts , " " )
507+ return cmdOpts
501508}
502509
503- func buildAddConsumerGroupToClusterCommand (consumerGroup string , cluster string , nameServer string ) string {
510+ func buildAddConsumerGroupToClusterCommand (consumerGroup string , cluster string , nameServer string ) [] string {
504511 cmdOpts := []string {
512+ cons .AdminToolDir ,
505513 "updatesubgroup" ,
506514 "-g" ,
507515 consumerGroup ,
@@ -514,11 +522,12 @@ func buildAddConsumerGroupToClusterCommand(consumerGroup string, cluster string,
514522 "-n" ,
515523 nameServer ,
516524 }
517- return strings . Join ( cmdOpts , " " )
525+ return cmdOpts
518526}
519527
520- func buildAddTopicToClusterCommand (topic string , cluster string , nameServer string ) string {
528+ func buildAddTopicToClusterCommand (topic string , cluster string , nameServer string ) [] string {
521529 cmdOpts := []string {
530+ cons .AdminToolDir ,
522531 "updatetopic" ,
523532 "-t" ,
524533 topic ,
@@ -527,5 +536,5 @@ func buildAddTopicToClusterCommand(topic string, cluster string, nameServer stri
527536 "-n" ,
528537 nameServer ,
529538 }
530- return strings . Join ( cmdOpts , " " )
539+ return cmdOpts
531540}
0 commit comments