Skip to content

Commit 8328b07

Browse files
committed
feat: add set-replication-clusters command for topics
1 parent d9dd3db commit 8328b07

File tree

3 files changed

+122
-0
lines changed

3 files changed

+122
-0
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package topic
2+
3+
import (
4+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
5+
"github.com/pkg/errors"
6+
"github.com/spf13/cobra"
7+
"github.com/spf13/pflag"
8+
9+
"github.com/streamnative/pulsarctl/pkg/cmdutils"
10+
)
11+
12+
func SetReplicationClustersCmd(vc *cmdutils.VerbCmd) {
13+
desc := cmdutils.LongDescription{}
14+
desc.CommandUsedFor = "Set the replication clusters for a topic"
15+
desc.CommandPermission = "This command requires tenant admin permissions."
16+
17+
var examples []cmdutils.Example
18+
setReplication := cmdutils.Example{
19+
Desc: "Set the replication clusters for a topic",
20+
Command: "pulsarctl topics set-replication-clusters tenant/namespace/topic --clusters cluster1,cluster2",
21+
}
22+
examples = append(examples, setReplication)
23+
desc.CommandExamples = examples
24+
25+
var out []cmdutils.Output
26+
successOut := cmdutils.Output{
27+
Desc: "normal output",
28+
Out: "Set the replication clusters for [topic] successfully",
29+
}
30+
31+
noTopicName := cmdutils.Output{
32+
Desc: "you must specify a tenant/namespace/topic name, please check if the tenant/namespace/topic name is provided",
33+
Out: "[✖] the topic name is not specified or the topic name is specified more than one",
34+
}
35+
36+
tenantNotExistError := cmdutils.Output{
37+
Desc: "the tenant does not exist",
38+
Out: "[✖] code: 404 reason: Tenant does not exist",
39+
}
40+
41+
nsNotExistError := cmdutils.Output{
42+
Desc: "the namespace does not exist",
43+
Out: "[✖] code: 404 reason: Namespace (tenant/namespace) does not exist",
44+
}
45+
46+
out = append(out, successOut, noTopicName, tenantNotExistError, nsNotExistError)
47+
desc.CommandOutput = out
48+
49+
vc.SetDescription(
50+
"set-replication-clusters",
51+
"Set the replication clusters for a topic",
52+
desc.ToString(),
53+
desc.ExampleToString(),
54+
"set-replication-clusters",
55+
)
56+
57+
var clusters []string
58+
59+
vc.FlagSetGroup.InFlagSet("Set replication clusters", func(flagSet *pflag.FlagSet) {
60+
flagSet.StringSliceVarP(&clusters, "clusters", "c", nil,
61+
"Replication cluster ids.")
62+
_ = cobra.MarkFlagRequired(flagSet, "clusters")
63+
})
64+
vc.EnableOutputFlagSet()
65+
66+
vc.SetRunFuncWithNameArg(func() error {
67+
return doSetReplicationClusters(vc, clusters)
68+
}, "the topic name is not specified or the topic name is specified more than one")
69+
}
70+
71+
func doSetReplicationClusters(vc *cmdutils.VerbCmd, clusters []string) error {
72+
topic := vc.NameArg
73+
74+
if len(clusters) == 0 {
75+
return errors.New("clusters cannot be empty")
76+
}
77+
78+
admin := cmdutils.NewPulsarClient()
79+
topicName, err := utils.GetTopicName(topic)
80+
if err != nil {
81+
return err
82+
}
83+
84+
err = admin.Topics().SetReplicationClusters(*topicName, clusters)
85+
if err == nil {
86+
vc.Command.Printf("Set the replication clusters successfully on [%s]\n", topic)
87+
}
88+
89+
return err
90+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package topic
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"github.com/onsi/gomega"
8+
"github.com/streamnative/pulsarctl/pkg/test"
9+
)
10+
11+
func TestSetReplicationClustersCmd(t *testing.T) {
12+
g := gomega.NewWithT(t)
13+
14+
topic := fmt.Sprintf("test-replication-clusters-topic-%s", test.RandomSuffix())
15+
16+
args := []string{"create", topic, "0"}
17+
_, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args)
18+
g.Expect(execErr).Should(gomega.BeNil())
19+
20+
args = []string{"set-replication-clusters", topic, "--clusters", "standalone"}
21+
out, execErr, nameErr, cmdErr := TestTopicCommands(SetReplicationClustersCmd, args)
22+
g.Expect(execErr).Should(gomega.BeNil())
23+
g.Expect(nameErr).Should(gomega.BeNil())
24+
g.Expect(cmdErr).Should(gomega.BeNil())
25+
g.Expect(out).ShouldNot(gomega.BeNil())
26+
g.Expect(out.String()).ShouldNot(gomega.BeEmpty())
27+
28+
// Since there is no get-replication-clusters command in this PR, we only test the set command success.
29+
// In a real scenario, we might want to verify using the client or adding a get command.
30+
// The set command output verification implies the call was successful.
31+
}

pkg/ctl/topic/topic.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
9595
RemoveInactiveTopicCmd,
9696
SetDispatchRateCmd,
9797
RemoveDispatchRateCmd,
98+
SetReplicationClustersCmd,
9899
}
99100

100101
cmdutils.AddVerbCmds(flagGrouping, resourceCmd, commands...)

0 commit comments

Comments
 (0)